add SHS chunk recoding and head cutting to head block (no tests yet)

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>
This commit is contained in:
Dieter Plaetinck 2021-07-02 18:58:20 +03:00
parent bef872bf3c
commit 99ae04bb6f
4 changed files with 54 additions and 44 deletions

View file

@ -126,7 +126,7 @@ func (c *HistoChunk) Appender() (Appender, error) {
return nil, err
}
a := &histoAppender{
a := &HistoAppender{
b: &c.b,
schema: it.schema,
@ -192,7 +192,7 @@ func (c *HistoChunk) Iterator(it Iterator) Iterator {
return c.iterator(it)
}
type histoAppender struct {
type HistoAppender struct {
b *bstream
// Metadata:
@ -230,12 +230,36 @@ func putUvarint(b *bstream, buf []byte, x uint64) {
}
}
func (a *histoAppender) Append(int64, float64) {}
func (a *HistoAppender) Append(int64, float64) {}
// Appendable returns whether the chunk can be appended to, and if so
// whether any recoding needs to happen using the provided interjections
// (in case of any new buckets, positive or negative range, respectively)
// The chunk is not appendable if:
// * the schema has changed
// * the zerobucket threshold has changed
// * any buckets disappeared
func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]interjection, []interjection, bool) {
// TODO zerothreshold
if h.Schema != a.schema {
return nil, nil, false
}
posInterjections, ok := compareSpans(a.posSpans, h.PositiveSpans)
if !ok {
return nil, nil, false
}
negInterjections, ok := compareSpans(a.negSpans, h.NegativeSpans)
if !ok {
return nil, nil, false
}
return posInterjections, negInterjections, ok
}
// AppendHistogram appends a SparseHistogram to the chunk. We assume the
// histogram is properly structured. E.g. that the number of pos/neg buckets
// used corresponds to the number conveyed by the pos/neg span structures.
func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
// callers must call Appendable() first and act accordingly!
func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
var tDelta, cntDelta, zcntDelta int64
num := binary.BigEndian.Uint16(a.b.bytes())
@ -265,19 +289,6 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
putVarint(a.b, a.buf64, buck)
}
case 1:
// TODO if zerobucket thresh or schema is different, we should create a new chunk
posInterjections, _ := compareSpans(a.posSpans, h.PositiveSpans)
//if !ok {
// TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead
//}
negInterjections, _ := compareSpans(a.negSpans, h.NegativeSpans)
//if !ok {
// TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead
//}
if len(posInterjections) > 0 || len(negInterjections) > 0 {
// new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one.
a.recode(posInterjections, negInterjections, h.PositiveSpans, h.NegativeSpans)
}
tDelta = t - a.t
cntDelta = int64(h.Count) - int64(a.cnt)
@ -300,19 +311,6 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
a.negbucketsDelta[i] = delta
}
default:
// TODO if zerobucket thresh or schema is different, we should create a new chunk
posInterjections, _ := compareSpans(a.posSpans, h.PositiveSpans)
//if !ok {
// TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead
//}
negInterjections, _ := compareSpans(a.negSpans, h.NegativeSpans)
//if !ok {
// TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead
//}
if len(posInterjections) > 0 || len(negInterjections) > 0 {
// new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one.
a.recode(posInterjections, negInterjections, h.PositiveSpans, h.NegativeSpans)
}
tDelta = t - a.t
cntDelta = int64(h.Count) - int64(a.cnt)
zcntDelta = int64(h.ZeroCount) - int64(a.zcnt)
@ -357,13 +355,14 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
}
// recode converts the current chunk to accommodate an expansion of the set of
// Recode converts the current chunk to accommodate an expansion of the set of
// (positive and/or negative) buckets used, according to the provided interjections, resulting in
// the honoring of the provided new posSpans and negSpans
// note: the decode-recode can probably be done more efficiently, but that's for a future optimization
func (a *histoAppender) recode(posInterjections, negInterjections []interjection, posSpans, negSpans []histogram.Span) {
func (a *HistoAppender) Recode(posInterjections, negInterjections []interjection, posSpans, negSpans []histogram.Span) (Chunk, Appender) {
it := newHistoIterator(a.b.bytes())
app, err := NewHistoChunk().Appender()
hc := NewHistoChunk()
app, err := hc.Appender()
if err != nil {
panic(err)
}
@ -381,20 +380,13 @@ func (a *histoAppender) recode(posInterjections, negInterjections []interjection
if len(negInterjections) > 0 {
hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negbuckets, negInterjections)
}
// there is no risk of infinite recursion here as all histograms get appended with the same schema (number of buckets)
app.AppendHistogram(tOld, hOld)
}
// adopt the new appender into ourselves
// we skip porting some fields like schema, t, cnt and zcnt, sum because they didn't change between our old chunk and the recoded one
app2 := app.(*histoAppender)
a.b = app2.b
a.posSpans, a.negSpans = posSpans, negSpans
a.posbuckets, a.negbuckets = app2.posbuckets, app2.negbuckets
a.posbucketsDelta, a.negbucketsDelta = app2.posbucketsDelta, app2.negbucketsDelta
return hc, app
}
func (a *histoAppender) writeSumDelta(v float64) {
func (a *HistoAppender) writeSumDelta(v float64) {
vDelta := math.Float64bits(v) ^ math.Float64bits(a.sum)
if vDelta == 0 {

View file

@ -191,7 +191,7 @@ func TestHistoChunkBucketChanges(t *testing.T) {
// TODO is this okay?
// the appender can rewrite its own bytes slice but it is not able to update the HistoChunk, so our histochunk is outdated until we update it manually
c.b = *(app.(*histoAppender).b)
c.b = *(app.(*HistoAppender).b)
require.Equal(t, 2, c.NumSamples())
// because the 2nd histogram has expanded buckets, we should expect all histograms (in particular the first)

View file

@ -150,7 +150,7 @@ type xorAppender struct {
}
func (a *xorAppender) AppendHistogram(t int64, h histogram.SparseHistogram) {
panic("cannot call xorAppender.AppendHistogram().")
//panic("cannot call xorAppender.AppendHistogram().")
}
func (a *xorAppender) Append(t int64, v float64) {

View file

@ -2624,6 +2624,24 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen
return sampleInOrder, chunkCreated
}
if !chunkCreated {
// Head controls the execution of recoding, so that we own the proper chunk reference afterwards
app, _ := s.app.(*chunkenc.HistoAppender)
posInterjections, negInterjections, ok := app.Appendable(sh)
// we have 3 cases here
// !ok -> we need to cut a new chunk
// ok but we have interjections -> existing chunk needs recoding before we can append our histogram
// ok and no interjections -> chunk is ready to support our histogram
if !ok {
c = s.cutNewHeadChunk(t, chunkenc.EncSHS, chunkDiskMapper)
chunkCreated = true
} else if len(posInterjections) > 0 || len(negInterjections) > 0 {
// new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one.
s.headChunk.chunk, s.app = app.Recode(posInterjections, negInterjections, sh.PositiveSpans, sh.NegativeSpans)
}
}
s.app.AppendHistogram(t, sh)
c.maxTime = t