diff --git a/pkg/histogram/sparse_histogram.go b/pkg/histogram/sparse_histogram.go index df6adeb14..0f0732176 100644 --- a/pkg/histogram/sparse_histogram.go +++ b/pkg/histogram/sparse_histogram.go @@ -17,6 +17,18 @@ import ( "math" ) +// SparseHistogram encodes a sparse histogram +// full details: https://docs.google.com/document/d/1cLNv3aufPZb3fNfaJgdaRBZsInZKKIHo9E6HinJVbpM/edit# +// the most tricky bit is how bucket indices represent real bucket boundaries +// +// an example for schema 0 (which doubles the size of consecutive buckets): +// +// buckets syntax (LE,GE) (-2,-1) (-1,-0.5) (-0.5,-0.25) ... (-0.001-0.001) ... (0.25-0.5)(0.5-1) (1-2) .... +// ^ +// zero bucket (here width a width of 0.001) ZB +// pos bucket idx ... -1 0 1 2 3 +// neg bucket idx 3 2 1 0 -1 ... +// actively used bucket indices themselves are represented by the spans type SparseHistogram struct { Count, ZeroCount uint64 Sum, ZeroThreshold float64 diff --git a/tsdb/chunkenc/histo.go b/tsdb/chunkenc/histo.go index 8e6177d43..ce6bd33db 100644 --- a/tsdb/chunkenc/histo.go +++ b/tsdb/chunkenc/histo.go @@ -126,7 +126,7 @@ func (c *HistoChunk) Appender() (Appender, error) { return nil, err } - a := &histoAppender{ + a := &HistoAppender{ b: &c.b, schema: it.schema, @@ -192,7 +192,7 @@ func (c *HistoChunk) Iterator(it Iterator) Iterator { return c.iterator(it) } -type histoAppender struct { +type HistoAppender struct { b *bstream // Metadata: @@ -230,12 +230,36 @@ func putUvarint(b *bstream, buf []byte, x uint64) { } } -func (a *histoAppender) Append(int64, float64) {} +func (a *HistoAppender) Append(int64, float64) {} + +// Appendable returns whether the chunk can be appended to, and if so +// whether any recoding needs to happen using the provided interjections +// (in case of any new buckets, positive or negative range, respectively) +// The chunk is not appendable if: +// * the schema has changed +// * the zerobucket threshold has changed +// * any buckets disappeared +func (a *HistoAppender) Appendable(h histogram.SparseHistogram) ([]interjection, []interjection, bool) { + // TODO zerothreshold + if h.Schema != a.schema { + return nil, nil, false + } + posInterjections, ok := compareSpans(a.posSpans, h.PositiveSpans) + if !ok { + return nil, nil, false + } + negInterjections, ok := compareSpans(a.negSpans, h.NegativeSpans) + if !ok { + return nil, nil, false + } + return posInterjections, negInterjections, ok +} // AppendHistogram appends a SparseHistogram to the chunk. We assume the // histogram is properly structured. E.g. that the number of pos/neg buckets // used corresponds to the number conveyed by the pos/neg span structures. -func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { +// callers must call Appendable() first and act accordingly! +func (a *HistoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { var tDelta, cntDelta, zcntDelta int64 num := binary.BigEndian.Uint16(a.b.bytes()) @@ -265,19 +289,6 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { putVarint(a.b, a.buf64, buck) } case 1: - // TODO if zerobucket thresh or schema is different, we should create a new chunk - posInterjections, _ := compareSpans(a.posSpans, h.PositiveSpans) - //if !ok { - // TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead - //} - negInterjections, _ := compareSpans(a.negSpans, h.NegativeSpans) - //if !ok { - // TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead - //} - if len(posInterjections) > 0 || len(negInterjections) > 0 { - // new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one. - a.recode(posInterjections, negInterjections, h.PositiveSpans, h.NegativeSpans) - } tDelta = t - a.t cntDelta = int64(h.Count) - int64(a.cnt) @@ -300,19 +311,6 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { a.negbucketsDelta[i] = delta } default: - // TODO if zerobucket thresh or schema is different, we should create a new chunk - posInterjections, _ := compareSpans(a.posSpans, h.PositiveSpans) - //if !ok { - // TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead - //} - negInterjections, _ := compareSpans(a.negSpans, h.NegativeSpans) - //if !ok { - // TODO Ganesh this is when we know buckets have dis-appeared and we should create a new chunk instead - //} - if len(posInterjections) > 0 || len(negInterjections) > 0 { - // new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one. - a.recode(posInterjections, negInterjections, h.PositiveSpans, h.NegativeSpans) - } tDelta = t - a.t cntDelta = int64(h.Count) - int64(a.cnt) zcntDelta = int64(h.ZeroCount) - int64(a.zcnt) @@ -357,13 +355,14 @@ func (a *histoAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { } -// recode converts the current chunk to accommodate an expansion of the set of +// Recode converts the current chunk to accommodate an expansion of the set of // (positive and/or negative) buckets used, according to the provided interjections, resulting in // the honoring of the provided new posSpans and negSpans // note: the decode-recode can probably be done more efficiently, but that's for a future optimization -func (a *histoAppender) recode(posInterjections, negInterjections []interjection, posSpans, negSpans []histogram.Span) { +func (a *HistoAppender) Recode(posInterjections, negInterjections []interjection, posSpans, negSpans []histogram.Span) (Chunk, Appender) { it := newHistoIterator(a.b.bytes()) - app, err := NewHistoChunk().Appender() + hc := NewHistoChunk() + app, err := hc.Appender() if err != nil { panic(err) } @@ -381,20 +380,13 @@ func (a *histoAppender) recode(posInterjections, negInterjections []interjection if len(negInterjections) > 0 { hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negbuckets, negInterjections) } - // there is no risk of infinite recursion here as all histograms get appended with the same schema (number of buckets) app.AppendHistogram(tOld, hOld) } - // adopt the new appender into ourselves - // we skip porting some fields like schema, t, cnt and zcnt, sum because they didn't change between our old chunk and the recoded one - app2 := app.(*histoAppender) - a.b = app2.b - a.posSpans, a.negSpans = posSpans, negSpans - a.posbuckets, a.negbuckets = app2.posbuckets, app2.negbuckets - a.posbucketsDelta, a.negbucketsDelta = app2.posbucketsDelta, app2.negbucketsDelta + return hc, app } -func (a *histoAppender) writeSumDelta(v float64) { +func (a *HistoAppender) writeSumDelta(v float64) { vDelta := math.Float64bits(v) ^ math.Float64bits(a.sum) if vDelta == 0 { @@ -434,11 +426,11 @@ type histoIterator struct { numTotal uint16 numRead uint16 - // Meta + // Metadata: schema int32 posSpans, negSpans []histogram.Span - // for the fields that are tracked as dod's + // For the fields that are tracked as dod's. t int64 cnt, zcnt uint64 tDelta, cntDelta, zcntDelta int64 @@ -446,7 +438,7 @@ type histoIterator struct { posbuckets, negbuckets []int64 posbucketsDelta, negbucketsDelta []int64 - // for the fields that are gorilla xor coded + // The sum is Gorilla xor encoded. sum float64 leading uint8 trailing uint8 @@ -640,6 +632,7 @@ func (it *histoIterator) Next() bool { it.negbuckets[i] = it.negbuckets[i] + delta } + it.numRead++ return true } @@ -692,6 +685,7 @@ func (it *histoIterator) Next() bool { it.negbuckets[i] = it.negbuckets[i] + it.negbucketsDelta[i] } + it.numRead++ return true } @@ -760,6 +754,5 @@ func (it *histoIterator) readSum() bool { it.sum = math.Float64frombits(vbits) } - it.numRead++ return true } diff --git a/tsdb/chunkenc/histo_test.go b/tsdb/chunkenc/histo_test.go index 42af9aba8..dc75f7a3e 100644 --- a/tsdb/chunkenc/histo_test.go +++ b/tsdb/chunkenc/histo_test.go @@ -191,7 +191,7 @@ func TestHistoChunkBucketChanges(t *testing.T) { // TODO is this okay? // the appender can rewrite its own bytes slice but it is not able to update the HistoChunk, so our histochunk is outdated until we update it manually - c.b = *(app.(*histoAppender).b) + c.b = *(app.(*HistoAppender).b) require.Equal(t, 2, c.NumSamples()) // because the 2nd histogram has expanded buckets, we should expect all histograms (in particular the first) diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index c1c56c7e6..56eb9bf71 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -150,7 +150,7 @@ type xorAppender struct { } func (a *xorAppender) AppendHistogram(t int64, h histogram.SparseHistogram) { - panic("cannot call xorAppender.AppendHistogram().") + //panic("cannot call xorAppender.AppendHistogram().") } func (a *xorAppender) Append(t int64, v float64) { diff --git a/tsdb/head.go b/tsdb/head.go index dbffe67d4..06a772196 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -2624,6 +2624,24 @@ func (s *memSeries) appendHistogram(t int64, sh histogram.SparseHistogram, appen return sampleInOrder, chunkCreated } + if !chunkCreated { + // Head controls the execution of recoding, so that we own the proper chunk reference afterwards + app, _ := s.app.(*chunkenc.HistoAppender) + posInterjections, negInterjections, ok := app.Appendable(sh) + // we have 3 cases here + // !ok -> we need to cut a new chunk + // ok but we have interjections -> existing chunk needs recoding before we can append our histogram + // ok and no interjections -> chunk is ready to support our histogram + if !ok { + c = s.cutNewHeadChunk(t, chunkenc.EncSHS, chunkDiskMapper) + chunkCreated = true + + } else if len(posInterjections) > 0 || len(negInterjections) > 0 { + // new buckets have appeared. we need to recode all prior histograms within the chunk before we can process this one. + s.headChunk.chunk, s.app = app.Recode(posInterjections, negInterjections, sh.PositiveSpans, sh.NegativeSpans) + } + } + s.app.AppendHistogram(t, sh) c.maxTime = t