From 583f3e587cf7b8f94b325ae72906815d5be480f4 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 23 Jan 2024 17:02:14 +0100 Subject: [PATCH] Optimize histogram iterators (#13340) Optimize histogram iterators Histogram iterators allocate new objects in the AtHistogram and AtFloatHistogram methods, which makes calculating rates over long ranges expensive. In #13215 we allowed an existing object to be reused when converting an integer histogram to a float histogram. This commit follows the same idea and allows injecting an existing object in the AtHistogram and AtFloatHistogram methods. When the injected value is nil, iterators allocate new histograms, otherwise they populate and return the injected object. The commit also adds a CopyTo method to Histogram and FloatHistogram which is used in the BufferedIterator to overwrite items in the ring instead of making new copies. Note that a specialized HPoint pool is needed for all of this to work (`matrixSelectorHPool`). --------- Signed-off-by: Filip Petkovski Co-authored-by: George Krajcsovits --- cmd/promtool/tsdb.go | 8 +- model/histogram/float_histogram.go | 40 +++++++- model/histogram/float_histogram_test.go | 112 ++++++++++++++++++++++ model/histogram/histogram.go | 32 ++++++- model/histogram/histogram_test.go | 122 ++++++++++++++++++++++++ promql/engine.go | 67 +++++++++---- promql/value.go | 4 +- rules/manager_test.go | 2 +- storage/buffer.go | 35 +++++-- storage/buffer_test.go | 8 +- storage/memoized_iterator.go | 4 +- storage/merge.go | 8 +- storage/merge_test.go | 12 +-- storage/remote/codec.go | 8 +- storage/remote/codec_test.go | 18 ++-- storage/series.go | 12 +-- tsdb/block_test.go | 4 +- tsdb/chunkenc/chunk.go | 39 +++++--- tsdb/chunkenc/float_histogram.go | 53 +++++++--- tsdb/chunkenc/float_histogram_test.go | 12 +-- tsdb/chunkenc/histogram.go | 115 ++++++++++++++++------ tsdb/chunkenc/histogram_test.go | 26 ++--- tsdb/chunkenc/xor.go | 4 +- tsdb/chunks/chunks.go | 4 +- tsdb/compact_test.go | 4 +- tsdb/db_test.go | 8 +- tsdb/head_test.go | 8 +- tsdb/querier.go | 24 ++--- tsdb/querier_test.go | 8 +- tsdb/tsdbblockutil.go | 4 +- web/federate.go | 2 +- 31 files changed, 621 insertions(+), 186 deletions(-) diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index a9239d937..4bba8421c 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -667,7 +667,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb. it := fhchk.Iterator(nil) bucketCount := 0 for it.Next() == chunkenc.ValFloatHistogram { - _, f := it.AtFloatHistogram() + _, f := it.AtFloatHistogram(nil) bucketCount += len(f.PositiveBuckets) bucketCount += len(f.NegativeBuckets) } @@ -682,7 +682,7 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb. it := hchk.Iterator(nil) bucketCount := 0 for it.Next() == chunkenc.ValHistogram { - _, f := it.AtHistogram() + _, f := it.AtHistogram(nil) bucketCount += len(f.PositiveBuckets) bucketCount += len(f.NegativeBuckets) } @@ -745,11 +745,11 @@ func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []str fmt.Printf("%s %g %d\n", lbs, val, ts) } for it.Next() == chunkenc.ValFloatHistogram { - ts, fh := it.AtFloatHistogram() + ts, fh := it.AtFloatHistogram(nil) fmt.Printf("%s %s %d\n", lbs, fh.String(), ts) } for it.Next() == chunkenc.ValHistogram { - ts, h := it.AtHistogram() + ts, h := it.AtHistogram(nil) fmt.Printf("%s %s %d\n", lbs, h.String(), ts) } if it.Err() != nil { diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index bf89f2a47..19a92b3d5 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -53,21 +53,28 @@ type FloatHistogram struct { // Copy returns a deep copy of the Histogram. func (h *FloatHistogram) Copy() *FloatHistogram { - c := *h + c := FloatHistogram{ + CounterResetHint: h.CounterResetHint, + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: h.ZeroCount, + Count: h.Count, + Sum: h.Sum, + } - if h.PositiveSpans != nil { + if len(h.PositiveSpans) != 0 { c.PositiveSpans = make([]Span, len(h.PositiveSpans)) copy(c.PositiveSpans, h.PositiveSpans) } - if h.NegativeSpans != nil { + if len(h.NegativeSpans) != 0 { c.NegativeSpans = make([]Span, len(h.NegativeSpans)) copy(c.NegativeSpans, h.NegativeSpans) } - if h.PositiveBuckets != nil { + if len(h.PositiveBuckets) != 0 { c.PositiveBuckets = make([]float64, len(h.PositiveBuckets)) copy(c.PositiveBuckets, h.PositiveBuckets) } - if h.NegativeBuckets != nil { + if len(h.NegativeBuckets) != 0 { c.NegativeBuckets = make([]float64, len(h.NegativeBuckets)) copy(c.NegativeBuckets, h.NegativeBuckets) } @@ -75,6 +82,29 @@ func (h *FloatHistogram) Copy() *FloatHistogram { return &c } +// CopyTo makes a deep copy into the given FloatHistogram. +// The destination object has to be a non-nil pointer. +func (h *FloatHistogram) CopyTo(to *FloatHistogram) { + to.CounterResetHint = h.CounterResetHint + to.Schema = h.Schema + to.ZeroThreshold = h.ZeroThreshold + to.ZeroCount = h.ZeroCount + to.Count = h.Count + to.Sum = h.Sum + + to.PositiveSpans = resize(to.PositiveSpans, len(h.PositiveSpans)) + copy(to.PositiveSpans, h.PositiveSpans) + + to.NegativeSpans = resize(to.NegativeSpans, len(h.NegativeSpans)) + copy(to.NegativeSpans, h.NegativeSpans) + + to.PositiveBuckets = resize(to.PositiveBuckets, len(h.PositiveBuckets)) + copy(to.PositiveBuckets, h.PositiveBuckets) + + to.NegativeBuckets = resize(to.NegativeBuckets, len(h.NegativeBuckets)) + copy(to.NegativeBuckets, h.NegativeBuckets) +} + // CopyToSchema works like Copy, but the returned deep copy has the provided // target schema, which must be ≤ the original schema (i.e. it must have a lower // resolution). diff --git a/model/histogram/float_histogram_test.go b/model/histogram/float_histogram_test.go index 3d20960f6..49fb77ab0 100644 --- a/model/histogram/float_histogram_test.go +++ b/model/histogram/float_histogram_test.go @@ -142,6 +142,118 @@ func TestFloatHistogramMul(t *testing.T) { } } +func TestFloatHistogramCopy(t *testing.T) { + cases := []struct { + name string + orig *FloatHistogram + expected *FloatHistogram + }{ + { + name: "without buckets", + orig: &FloatHistogram{}, + expected: &FloatHistogram{}, + }, + { + name: "with buckets", + orig: &FloatHistogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []float64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []float64{5, 3, 1.234e5, 1000}, + }, + expected: &FloatHistogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []float64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []float64{5, 3, 1.234e5, 1000}, + }, + }, + { + name: "with empty buckets and non empty capacity", + orig: &FloatHistogram{ + PositiveSpans: make([]Span, 0, 1), + PositiveBuckets: make([]float64, 0, 1), + NegativeSpans: make([]Span, 0, 1), + NegativeBuckets: make([]float64, 0, 1), + }, + expected: &FloatHistogram{}, + }, + } + + for _, tcase := range cases { + t.Run(tcase.name, func(t *testing.T) { + hCopy := tcase.orig.Copy() + + // Modify a primitive value in the original histogram. + tcase.orig.Sum++ + require.Equal(t, tcase.expected, hCopy) + assertDeepCopyFHSpans(t, tcase.orig, hCopy, tcase.expected) + }) + } +} + +func TestFloatHistogramCopyTo(t *testing.T) { + cases := []struct { + name string + orig *FloatHistogram + expected *FloatHistogram + }{ + { + name: "without buckets", + orig: &FloatHistogram{}, + expected: &FloatHistogram{}, + }, + { + name: "with buckets", + orig: &FloatHistogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []float64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []float64{5, 3, 1.234e5, 1000}, + }, + expected: &FloatHistogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []float64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []float64{5, 3, 1.234e5, 1000}, + }, + }, + { + name: "with empty buckets and non empty capacity", + orig: &FloatHistogram{ + PositiveSpans: make([]Span, 0, 1), + PositiveBuckets: make([]float64, 0, 1), + NegativeSpans: make([]Span, 0, 1), + NegativeBuckets: make([]float64, 0, 1), + }, + expected: &FloatHistogram{}, + }, + } + + for _, tcase := range cases { + t.Run(tcase.name, func(t *testing.T) { + hCopy := &FloatHistogram{} + tcase.orig.CopyTo(hCopy) + + // Modify a primitive value in the original histogram. + tcase.orig.Sum++ + require.Equal(t, tcase.expected, hCopy) + assertDeepCopyFHSpans(t, tcase.orig, hCopy, tcase.expected) + }) + } +} + +func assertDeepCopyFHSpans(t *testing.T, orig, hCopy, expected *FloatHistogram) { + // Do an in-place expansion of an original spans slice. + orig.PositiveSpans = expandSpans(orig.PositiveSpans) + orig.PositiveSpans[len(orig.PositiveSpans)-1] = Span{1, 2} + + hCopy.PositiveSpans = expandSpans(hCopy.PositiveSpans) + expected.PositiveSpans = expandSpans(expected.PositiveSpans) + // Expand the copy spans and assert that modifying the original has not affected the copy. + require.Equal(t, expected, hCopy) +} + func TestFloatHistogramDiv(t *testing.T) { cases := []struct { name string diff --git a/model/histogram/histogram.go b/model/histogram/histogram.go index f4d292b34..d40adeb62 100644 --- a/model/histogram/histogram.go +++ b/model/histogram/histogram.go @@ -83,7 +83,14 @@ type Span struct { // Copy returns a deep copy of the Histogram. func (h *Histogram) Copy() *Histogram { - c := *h + c := Histogram{ + CounterResetHint: h.CounterResetHint, + Schema: h.Schema, + ZeroThreshold: h.ZeroThreshold, + ZeroCount: h.ZeroCount, + Count: h.Count, + Sum: h.Sum, + } if len(h.PositiveSpans) != 0 { c.PositiveSpans = make([]Span, len(h.PositiveSpans)) @@ -105,6 +112,29 @@ func (h *Histogram) Copy() *Histogram { return &c } +// CopyTo makes a deep copy into the given Histogram object. +// The destination object has to be a non-nil pointer. +func (h *Histogram) CopyTo(to *Histogram) { + to.CounterResetHint = h.CounterResetHint + to.Schema = h.Schema + to.ZeroThreshold = h.ZeroThreshold + to.ZeroCount = h.ZeroCount + to.Count = h.Count + to.Sum = h.Sum + + to.PositiveSpans = resize(to.PositiveSpans, len(h.PositiveSpans)) + copy(to.PositiveSpans, h.PositiveSpans) + + to.NegativeSpans = resize(to.NegativeSpans, len(h.NegativeSpans)) + copy(to.NegativeSpans, h.NegativeSpans) + + to.PositiveBuckets = resize(to.PositiveBuckets, len(h.PositiveBuckets)) + copy(to.PositiveBuckets, h.PositiveBuckets) + + to.NegativeBuckets = resize(to.NegativeBuckets, len(h.NegativeBuckets)) + copy(to.NegativeBuckets, h.NegativeBuckets) +} + // String returns a string representation of the Histogram. func (h *Histogram) String() string { var sb strings.Builder diff --git a/model/histogram/histogram_test.go b/model/histogram/histogram_test.go index 9a64faaaa..14a948e64 100644 --- a/model/histogram/histogram_test.go +++ b/model/histogram/histogram_test.go @@ -604,6 +604,128 @@ func TestHistogramEquals(t *testing.T) { notEquals(*hStale, *hNaN) } +func TestHistogramCopy(t *testing.T) { + cases := []struct { + name string + orig *Histogram + expected *Histogram + }{ + { + name: "without buckets", + orig: &Histogram{}, + expected: &Histogram{}, + }, + { + name: "with buckets", + orig: &Histogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []int64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []int64{5, 3, 1.234e5, 1000}, + }, + expected: &Histogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []int64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []int64{5, 3, 1.234e5, 1000}, + }, + }, + { + name: "with empty buckets and non empty capacity", + orig: &Histogram{ + PositiveSpans: make([]Span, 0, 1), + PositiveBuckets: make([]int64, 0, 1), + NegativeSpans: make([]Span, 0, 1), + NegativeBuckets: make([]int64, 0, 1), + }, + expected: &Histogram{}, + }, + } + + for _, tcase := range cases { + t.Run(tcase.name, func(t *testing.T) { + hCopy := tcase.orig.Copy() + + // Modify a primitive value in the original histogram. + tcase.orig.Sum++ + require.Equal(t, tcase.expected, hCopy) + assertDeepCopyHSpans(t, tcase.orig, hCopy, tcase.expected) + }) + } +} + +func TestHistogramCopyTo(t *testing.T) { + cases := []struct { + name string + orig *Histogram + expected *Histogram + }{ + { + name: "without buckets", + orig: &Histogram{}, + expected: &Histogram{}, + }, + { + name: "with buckets", + orig: &Histogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []int64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []int64{5, 3, 1.234e5, 1000}, + }, + expected: &Histogram{ + PositiveSpans: []Span{{-2, 1}}, + PositiveBuckets: []int64{1, 3, -3, 42}, + NegativeSpans: []Span{{3, 2}}, + NegativeBuckets: []int64{5, 3, 1.234e5, 1000}, + }, + }, + { + name: "with empty buckets and non empty capacity", + orig: &Histogram{ + PositiveSpans: make([]Span, 0, 1), + PositiveBuckets: make([]int64, 0, 1), + NegativeSpans: make([]Span, 0, 1), + NegativeBuckets: make([]int64, 0, 1), + }, + expected: &Histogram{}, + }, + } + + for _, tcase := range cases { + t.Run(tcase.name, func(t *testing.T) { + hCopy := &Histogram{} + tcase.orig.CopyTo(hCopy) + + // Modify a primitive value in the original histogram. + tcase.orig.Sum++ + require.Equal(t, tcase.expected, hCopy) + assertDeepCopyHSpans(t, tcase.orig, hCopy, tcase.expected) + }) + } +} + +func assertDeepCopyHSpans(t *testing.T, orig, hCopy, expected *Histogram) { + // Do an in-place expansion of an original spans slice. + orig.PositiveSpans = expandSpans(orig.PositiveSpans) + orig.PositiveSpans[len(orig.PositiveSpans)-1] = Span{1, 2} + + hCopy.PositiveSpans = expandSpans(hCopy.PositiveSpans) + expected.PositiveSpans = expandSpans(expected.PositiveSpans) + // Expand the copy spans and assert that modifying the original has not affected the copy. + require.Equal(t, expected, hCopy) +} + +func expandSpans(spans []Span) []Span { + n := len(spans) + if cap(spans) > n { + spans = spans[:n+1] + } else { + spans = append(spans, Span{}) + } + return spans +} + func TestHistogramCompact(t *testing.T) { cases := []struct { name string diff --git a/promql/engine.go b/promql/engine.go index 7165631e0..8c8afd181 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -66,6 +66,9 @@ const ( // The getHPointSlice and getFPointSlice functions are called with an estimated size which often can be // over-estimated. maxPointsSliceSize = 5000 + + // The default buffer size for points used by the matrix selector. + matrixSelectorSliceSize = 16 ) type engineMetrics struct { @@ -1564,7 +1567,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio ev.currentSamples -= len(floats) + totalHPointSize(histograms) putFPointSlice(floats) - putHPointSlice(histograms) + putMatrixSelectorHPointSlice(histograms) // The absent_over_time function returns 0 or 1 series. So far, the matrix // contains multiple series. The following code will create a new series @@ -1940,6 +1943,13 @@ func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, no var ( fPointPool zeropool.Pool[[]FPoint] hPointPool zeropool.Pool[[]HPoint] + + // matrixSelectorHPool holds reusable histogram slices used by the matrix + // selector. The key difference between this pool and the hPointPool is that + // slices returned by this pool should never hold multiple copies of the same + // histogram pointer since histogram objects are reused across query evaluation + // steps. + matrixSelectorHPool zeropool.Pool[[]HPoint] ) func getFPointSlice(sz int) []FPoint { @@ -1982,6 +1992,20 @@ func putHPointSlice(p []HPoint) { } } +func getMatrixSelectorHPoints() []HPoint { + if p := matrixSelectorHPool.Get(); p != nil { + return p + } + + return make([]HPoint, 0, matrixSelectorSliceSize) +} + +func putMatrixSelectorHPointSlice(p []HPoint) { + if p != nil { + matrixSelectorHPool.Put(p[:0]) + } +} + // matrixSelector evaluates a *parser.MatrixSelector expression. func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annotations.Annotations) { var ( @@ -2106,13 +2130,13 @@ loop: // Values in the buffer are guaranteed to be smaller than maxt. if t >= mintHistograms { if histograms == nil { - histograms = getHPointSlice(16) + histograms = getMatrixSelectorHPoints() } n := len(histograms) if n < cap(histograms) { histograms = histograms[:n+1] } else { - histograms = append(histograms, HPoint{}) + histograms = append(histograms, HPoint{H: &histogram.FloatHistogram{}}) } histograms[n].T, histograms[n].H = buf.AtFloatHistogram(histograms[n].H) if value.IsStaleNaN(histograms[n].H.Sum) { @@ -2145,23 +2169,28 @@ loop: // The sought sample might also be in the range. switch soughtValueType { case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: - t := it.AtT() - if t == maxt { - _, h := it.AtFloatHistogram() - if !value.IsStaleNaN(h.Sum) { - if ev.currentSamples >= ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } - if histograms == nil { - histograms = getHPointSlice(16) - } - // The last sample comes directly from the iterator, so we need to copy it to - // avoid having the same reference twice in the buffer. - point := HPoint{T: t, H: h.Copy()} - histograms = append(histograms, point) - ev.currentSamples += point.size() - } + if it.AtT() != maxt { + break } + if histograms == nil { + histograms = getMatrixSelectorHPoints() + } + n := len(histograms) + if n < cap(histograms) { + histograms = histograms[:n+1] + } else { + histograms = append(histograms, HPoint{H: &histogram.FloatHistogram{}}) + } + histograms[n].T, histograms[n].H = it.AtFloatHistogram(histograms[n].H) + if value.IsStaleNaN(histograms[n].H.Sum) { + histograms = histograms[:n] + break + } + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + ev.currentSamples += histograms[n].size() + case chunkenc.ValFloat: t, f := it.At() if t == maxt && !value.IsStaleNaN(f) { diff --git a/promql/value.go b/promql/value.go index 28cf3fe31..f129137d8 100644 --- a/promql/value.go +++ b/promql/value.go @@ -464,11 +464,11 @@ func (ssi *storageSeriesIterator) At() (t int64, v float64) { return ssi.currT, ssi.currF } -func (ssi *storageSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { +func (ssi *storageSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { panic(errors.New("storageSeriesIterator: AtHistogram not supported")) } -func (ssi *storageSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (ssi *storageSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { return ssi.currT, ssi.currH } diff --git a/rules/manager_test.go b/rules/manager_test.go index 6418c5a37..3feae51de 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -1397,7 +1397,7 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) { it := s.Iterator(nil) require.Equal(t, chunkenc.ValFloatHistogram, it.Next()) - tsp, fh := it.AtFloatHistogram() + tsp, fh := it.AtFloatHistogram(nil) require.Equal(t, ts.Add(10*time.Second).UnixMilli(), tsp) require.Equal(t, expHist, fh) require.Equal(t, chunkenc.ValNone, it.Next()) diff --git a/storage/buffer.go b/storage/buffer.go index d19f841d4..b3c789e97 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -24,6 +24,9 @@ import ( // BufferedSeriesIterator wraps an iterator with a look-back buffer. type BufferedSeriesIterator struct { + hReader histogram.Histogram + fhReader histogram.FloatHistogram + it chunkenc.Iterator buf *sampleRing delta int64 @@ -118,10 +121,10 @@ func (b *BufferedSeriesIterator) Next() chunkenc.ValueType { t, f := b.it.At() b.buf.addF(fSample{t: t, f: f}) case chunkenc.ValHistogram: - t, h := b.it.AtHistogram() + t, h := b.it.AtHistogram(&b.hReader) b.buf.addH(hSample{t: t, h: h}) case chunkenc.ValFloatHistogram: - t, fh := b.it.AtFloatHistogram() + t, fh := b.it.AtFloatHistogram(&b.fhReader) b.buf.addFH(fhSample{t: t, fh: fh}) default: panic(fmt.Errorf("BufferedSeriesIterator: unknown value type %v", b.valueType)) @@ -140,13 +143,13 @@ func (b *BufferedSeriesIterator) At() (int64, float64) { } // AtHistogram returns the current histogram element of the iterator. -func (b *BufferedSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { - return b.it.AtHistogram() +func (b *BufferedSeriesIterator) AtHistogram(fh *histogram.Histogram) (int64, *histogram.Histogram) { + return b.it.AtHistogram(fh) } // AtFloatHistogram returns the current float-histogram element of the iterator. -func (b *BufferedSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { - return b.it.AtFloatHistogram() +func (b *BufferedSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + return b.it.AtFloatHistogram(fh) } // AtT returns the current timestamp of the iterator. @@ -378,7 +381,11 @@ func (it *SampleRingIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (in if it.fh == nil { return it.t, it.h.ToFloat(fh) } - return it.t, it.fh + if fh != nil { + it.fh.CopyTo(fh) + return it.t, fh + } + return it.t, it.fh.Copy() } func (it *SampleRingIterator) AtT() int64 { @@ -672,7 +679,12 @@ func addH(s hSample, buf []hSample, r *sampleRing) []hSample { } } - buf[r.i] = s + buf[r.i].t = s.t + if buf[r.i].h == nil { + buf[r.i].h = s.h.Copy() + } else { + s.h.CopyTo(buf[r.i].h) + } r.l++ // Free head of the buffer of samples that just fell out of the range. @@ -711,7 +723,12 @@ func addFH(s fhSample, buf []fhSample, r *sampleRing) []fhSample { } } - buf[r.i] = s + buf[r.i].t = s.t + if buf[r.i].fh == nil { + buf[r.i].fh = s.fh.Copy() + } else { + s.fh.CopyTo(buf[r.i].fh) + } r.l++ // Free head of the buffer of samples that just fell out of the range. diff --git a/storage/buffer_test.go b/storage/buffer_test.go index 12e6ff0f0..61074c212 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -277,11 +277,11 @@ func (m *mockSeriesIterator) At() (int64, float64) { return m.at() } func (m *mockSeriesIterator) Next() chunkenc.ValueType { return m.next() } func (m *mockSeriesIterator) Err() error { return m.err() } -func (m *mockSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { +func (m *mockSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { return 0, nil // Not really mocked. } -func (m *mockSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (m *mockSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { return 0, nil // Not really mocked. } @@ -303,11 +303,11 @@ func (it *fakeSeriesIterator) At() (int64, float64) { return it.idx * it.step, 123 // Value doesn't matter. } -func (it *fakeSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { +func (it *fakeSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { return it.idx * it.step, &histogram.Histogram{} // Value doesn't matter. } -func (it *fakeSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *fakeSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { return it.idx * it.step, &histogram.FloatHistogram{} // Value doesn't matter. } diff --git a/storage/memoized_iterator.go b/storage/memoized_iterator.go index cb9fdeef4..4ab2aa5d7 100644 --- a/storage/memoized_iterator.go +++ b/storage/memoized_iterator.go @@ -113,7 +113,7 @@ func (b *MemoizedSeriesIterator) Next() chunkenc.ValueType { b.prevFloatHistogram = nil case chunkenc.ValHistogram, chunkenc.ValFloatHistogram: b.prevValue = 0 - b.prevTime, b.prevFloatHistogram = b.it.AtFloatHistogram() + b.prevTime, b.prevFloatHistogram = b.it.AtFloatHistogram(nil) } b.valueType = b.it.Next() @@ -133,7 +133,7 @@ func (b *MemoizedSeriesIterator) At() (int64, float64) { // AtFloatHistogram returns the current float-histogram element of the iterator. func (b *MemoizedSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { - return b.it.AtFloatHistogram() + return b.it.AtFloatHistogram(nil) } // Err returns the last encountered error. diff --git a/storage/merge.go b/storage/merge.go index bcb0f66fb..38897449b 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -525,11 +525,11 @@ func (c *chainSampleIterator) At() (t int64, v float64) { return c.curr.At() } -func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) { +func (c *chainSampleIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { if c.curr == nil { panic("chainSampleIterator.AtHistogram called before first .Next or after .Next returned false.") } - t, h := c.curr.AtHistogram() + t, h := c.curr.AtHistogram(h) // If the current sample is not consecutive with the previous one, we // cannot be sure anymore about counter resets for counter histograms. // TODO(beorn7): If a `NotCounterReset` sample is followed by a @@ -542,11 +542,11 @@ func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) { return t, h } -func (c *chainSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (c *chainSampleIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { if c.curr == nil { panic("chainSampleIterator.AtFloatHistogram called before first .Next or after .Next returned false.") } - t, fh := c.curr.AtFloatHistogram() + t, fh := c.curr.AtFloatHistogram(fh) // If the current sample is not consecutive with the previous one, we // cannot be sure anymore about counter resets for counter histograms. // TODO(beorn7): If a `NotCounterReset` sample is followed by a diff --git a/storage/merge_test.go b/storage/merge_test.go index 02c2a3409..05e1c7527 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1173,10 +1173,10 @@ func TestChainSampleIteratorSeek(t *testing.T) { t, f := merged.At() actual = append(actual, fSample{t, f}) case chunkenc.ValHistogram: - t, h := merged.AtHistogram() + t, h := merged.AtHistogram(nil) actual = append(actual, hSample{t, h}) case chunkenc.ValFloatHistogram: - t, fh := merged.AtFloatHistogram() + t, fh := merged.AtFloatHistogram(nil) actual = append(actual, fhSample{t, fh}) } s, err := ExpandSamples(merged, nil) @@ -1259,10 +1259,10 @@ func TestChainSampleIteratorSeekHistogramCounterResetHint(t *testing.T) { t, f := merged.At() actual = append(actual, fSample{t, f}) case chunkenc.ValHistogram: - t, h := merged.AtHistogram() + t, h := merged.AtHistogram(nil) actual = append(actual, hSample{t, h}) case chunkenc.ValFloatHistogram: - t, fh := merged.AtFloatHistogram() + t, fh := merged.AtFloatHistogram(nil) actual = append(actual, fhSample{t, fh}) } s, err := ExpandSamples(merged, nil) @@ -1629,11 +1629,11 @@ func (e errIterator) At() (int64, float64) { return 0, 0 } -func (e errIterator) AtHistogram() (int64, *histogram.Histogram) { +func (e errIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { return 0, nil } -func (e errIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (e errIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { return 0, nil } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index ffab821a5..9cf1ed871 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -152,10 +152,10 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, Value: val, }) case chunkenc.ValHistogram: - ts, h := iter.AtHistogram() + ts, h := iter.AtHistogram(nil) histograms = append(histograms, HistogramToHistogramProto(ts, h)) case chunkenc.ValFloatHistogram: - ts, fh := iter.AtFloatHistogram() + ts, fh := iter.AtFloatHistogram(nil) histograms = append(histograms, FloatHistogramToHistogramProto(ts, fh)) default: return nil, ss.Warnings(), fmt.Errorf("unrecognized value type: %s", valType) @@ -475,7 +475,7 @@ func (c *concreteSeriesIterator) At() (t int64, v float64) { } // AtHistogram implements chunkenc.Iterator. -func (c *concreteSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { +func (c *concreteSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { if c.curValType != chunkenc.ValHistogram { panic("iterator is not on an integer histogram sample") } @@ -484,7 +484,7 @@ func (c *concreteSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { } // AtFloatHistogram implements chunkenc.Iterator. -func (c *concreteSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (c *concreteSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { switch c.curValType { case chunkenc.ValHistogram: fh := c.series.histograms[c.histogramsCur] diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index dbc22a377..0451953cb 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -278,31 +278,31 @@ func TestConcreteSeriesIterator_HistogramSamples(t *testing.T) { // Seek to the first sample with ts=1. require.Equal(t, chunkenc.ValHistogram, it.Seek(1)) - ts, v := it.AtHistogram() + ts, v := it.AtHistogram(nil) require.Equal(t, int64(1), ts) require.Equal(t, histograms[0], v) // Seek one further, next sample still has ts=1. require.Equal(t, chunkenc.ValHistogram, it.Next()) - ts, v = it.AtHistogram() + ts, v = it.AtHistogram(nil) require.Equal(t, int64(1), ts) require.Equal(t, histograms[1], v) // Seek again to 1 and make sure we stay where we are. require.Equal(t, chunkenc.ValHistogram, it.Seek(1)) - ts, v = it.AtHistogram() + ts, v = it.AtHistogram(nil) require.Equal(t, int64(1), ts) require.Equal(t, histograms[1], v) // Another seek. require.Equal(t, chunkenc.ValHistogram, it.Seek(3)) - ts, v = it.AtHistogram() + ts, v = it.AtHistogram(nil) require.Equal(t, int64(3), ts) require.Equal(t, histograms[3], v) // And we don't go back. require.Equal(t, chunkenc.ValHistogram, it.Seek(2)) - ts, v = it.AtHistogram() + ts, v = it.AtHistogram(nil) require.Equal(t, int64(3), ts) require.Equal(t, histograms[3], v) @@ -347,12 +347,12 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) { fh *histogram.FloatHistogram ) require.Equal(t, chunkenc.ValHistogram, it.Next()) - ts, h = it.AtHistogram() + ts, h = it.AtHistogram(nil) require.Equal(t, int64(1), ts) require.Equal(t, histograms[0], h) require.Equal(t, chunkenc.ValHistogram, it.Next()) - ts, h = it.AtHistogram() + ts, h = it.AtHistogram(nil) require.Equal(t, int64(2), ts) require.Equal(t, histograms[1], h) @@ -393,13 +393,13 @@ func TestConcreteSeriesIterator_FloatAndHistogramSamples(t *testing.T) { require.Equal(t, 8., v) require.Equal(t, chunkenc.ValHistogram, it.Next()) - ts, h = it.AtHistogram() + ts, h = it.AtHistogram(nil) require.Equal(t, int64(16), ts) require.Equal(t, histograms[10], h) // Getting a float histogram from an int histogram works. require.Equal(t, chunkenc.ValHistogram, it.Next()) - ts, fh = it.AtFloatHistogram() + ts, fh = it.AtFloatHistogram(nil) require.Equal(t, int64(17), ts) expected := HistogramProtoToFloatHistogram(HistogramToHistogramProto(int64(17), histograms[11])) require.Equal(t, expected, fh) diff --git a/storage/series.go b/storage/series.go index b111505aa..eba11b4d9 100644 --- a/storage/series.go +++ b/storage/series.go @@ -123,12 +123,12 @@ func (it *listSeriesIterator) At() (int64, float64) { return s.T(), s.F() } -func (it *listSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { +func (it *listSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { s := it.samples.Get(it.idx) return s.T(), s.H() } -func (it *listSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *listSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { s := it.samples.Get(it.idx) return s.T(), s.FH() } @@ -337,7 +337,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { t, v = seriesIter.At() app.Append(t, v) case chunkenc.ValHistogram: - t, h = seriesIter.AtHistogram() + t, h = seriesIter.AtHistogram(nil) newChk, recoded, app, err = app.AppendHistogram(nil, t, h, false) if err != nil { return errChunksIterator{err: err} @@ -352,7 +352,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { chk = newChk } case chunkenc.ValFloatHistogram: - t, fh = seriesIter.AtFloatHistogram() + t, fh = seriesIter.AtFloatHistogram(nil) newChk, recoded, app, err = app.AppendFloatHistogram(nil, t, fh, false) if err != nil { return errChunksIterator{err: err} @@ -438,10 +438,10 @@ func ExpandSamples(iter chunkenc.Iterator, newSampleFn func(t int64, f float64, } result = append(result, newSampleFn(t, f, nil, nil)) case chunkenc.ValHistogram: - t, h := iter.AtHistogram() + t, h := iter.AtHistogram(nil) result = append(result, newSampleFn(t, 0, h, nil)) case chunkenc.ValFloatHistogram: - t, fh := iter.AtFloatHistogram() + t, fh := iter.AtFloatHistogram(nil) result = append(result, newSampleFn(t, 0, nil, fh)) } } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index c155b4451..d48418b57 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -555,10 +555,10 @@ func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir str t, v := it.At() ref, err = app.Append(ref, lset, t, v) case chunkenc.ValHistogram: - t, h := it.AtHistogram() + t, h := it.AtHistogram(nil) ref, err = app.AppendHistogram(ref, lset, t, h, nil) case chunkenc.ValFloatHistogram: - t, fh := it.AtFloatHistogram() + t, fh := it.AtFloatHistogram(nil) ref, err = app.AppendHistogram(ref, lset, t, nil, fh) default: err = fmt.Errorf("unknown sample type %s", typ.String()) diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 0126f1fbd..21c41257b 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -131,16 +131,20 @@ type Iterator interface { // At returns the current timestamp/value pair if the value is a float. // Before the iterator has advanced, the behaviour is unspecified. At() (int64, float64) - // AtHistogram returns the current timestamp/value pair if the value is - // a histogram with integer counts. Before the iterator has advanced, - // the behaviour is unspecified. - AtHistogram() (int64, *histogram.Histogram) + // AtHistogram returns the current timestamp/value pair if the value is a + // histogram with integer counts. Before the iterator has advanced, the behaviour + // is unspecified. + // The method accepts an optional Histogram object which will be + // reused when not nil. Otherwise, a new Histogram object will be allocated. + AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) // AtFloatHistogram returns the current timestamp/value pair if the // value is a histogram with floating-point counts. It also works if the // value is a histogram with integer counts, in which case a // FloatHistogram copy of the histogram is returned. Before the iterator // has advanced, the behaviour is unspecified. - AtFloatHistogram() (int64, *histogram.FloatHistogram) + // The method accepts an optional FloatHistogram object which will be + // reused when not nil. Otherwise, a new FloatHistogram object will be allocated. + AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) // AtT returns the current timestamp. // Before the iterator has advanced, the behaviour is unspecified. AtT() int64 @@ -222,9 +226,11 @@ func (it *mockSeriesIterator) At() (int64, float64) { return it.timeStamps[it.currIndex], it.values[it.currIndex] } -func (it *mockSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { return math.MinInt64, nil } +func (it *mockSeriesIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { + return math.MinInt64, nil +} -func (it *mockSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *mockSeriesIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { return math.MinInt64, nil } @@ -249,13 +255,18 @@ func NewNopIterator() Iterator { type nopIterator struct{} -func (nopIterator) Next() ValueType { return ValNone } -func (nopIterator) Seek(int64) ValueType { return ValNone } -func (nopIterator) At() (int64, float64) { return math.MinInt64, 0 } -func (nopIterator) AtHistogram() (int64, *histogram.Histogram) { return math.MinInt64, nil } -func (nopIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { return math.MinInt64, nil } -func (nopIterator) AtT() int64 { return math.MinInt64 } -func (nopIterator) Err() error { return nil } +func (nopIterator) Next() ValueType { return ValNone } +func (nopIterator) Seek(int64) ValueType { return ValNone } +func (nopIterator) At() (int64, float64) { return math.MinInt64, 0 } +func (nopIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { + return math.MinInt64, nil +} + +func (nopIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + return math.MinInt64, nil +} +func (nopIterator) AtT() int64 { return math.MinInt64 } +func (nopIterator) Err() error { return nil } // Pool is used to create and reuse chunk references to avoid allocations. type Pool interface { diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 3d76cdf65..88d189254 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -527,7 +527,7 @@ func (a *FloatHistogramAppender) recode( numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans) for it.Next() == ValFloatHistogram { - tOld, hOld := it.AtFloatHistogram() + tOld, hOld := it.AtFloatHistogram(nil) // We have to newly allocate slices for the modified buckets // here because they are kept by the appender until the next @@ -728,27 +728,50 @@ func (it *floatHistogramIterator) At() (int64, float64) { panic("cannot call floatHistogramIterator.At") } -func (it *floatHistogramIterator) AtHistogram() (int64, *histogram.Histogram) { +func (it *floatHistogramIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { panic("cannot call floatHistogramIterator.AtHistogram") } -func (it *floatHistogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *floatHistogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { if value.IsStaleNaN(it.sum.value) { return it.t, &histogram.FloatHistogram{Sum: it.sum.value} } - it.atFloatHistogramCalled = true - return it.t, &histogram.FloatHistogram{ - CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), - Count: it.cnt.value, - ZeroCount: it.zCnt.value, - Sum: it.sum.value, - ZeroThreshold: it.zThreshold, - Schema: it.schema, - PositiveSpans: it.pSpans, - NegativeSpans: it.nSpans, - PositiveBuckets: it.pBuckets, - NegativeBuckets: it.nBuckets, + if fh == nil { + it.atFloatHistogramCalled = true + return it.t, &histogram.FloatHistogram{ + CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), + Count: it.cnt.value, + ZeroCount: it.zCnt.value, + Sum: it.sum.value, + ZeroThreshold: it.zThreshold, + Schema: it.schema, + PositiveSpans: it.pSpans, + NegativeSpans: it.nSpans, + PositiveBuckets: it.pBuckets, + NegativeBuckets: it.nBuckets, + } } + + fh.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead) + fh.Schema = it.schema + fh.ZeroThreshold = it.zThreshold + fh.ZeroCount = it.zCnt.value + fh.Count = it.cnt.value + fh.Sum = it.sum.value + + fh.PositiveSpans = resize(fh.PositiveSpans, len(it.pSpans)) + copy(fh.PositiveSpans, it.pSpans) + + fh.NegativeSpans = resize(fh.NegativeSpans, len(it.nSpans)) + copy(fh.NegativeSpans, it.nSpans) + + fh.PositiveBuckets = resize(fh.PositiveBuckets, len(it.pBuckets)) + copy(fh.PositiveBuckets, it.pBuckets) + + fh.NegativeBuckets = resize(fh.NegativeBuckets, len(it.nBuckets)) + copy(fh.NegativeBuckets, it.nBuckets) + + return it.t, fh } func (it *floatHistogramIterator) AtT() int64 { diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index 6f5a95fb1..054c17f7d 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -140,7 +140,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { require.NoError(t, it.Err()) var act []floatResult for it.Next() == ValFloatHistogram { - fts, fh := it.AtFloatHistogram() + fts, fh := it.AtFloatHistogram(nil) act = append(act, floatResult{t: fts, h: fh}) } require.NoError(t, it.Err()) @@ -150,7 +150,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { it2 := c.Iterator(it) var act2 []floatResult for it2.Next() == ValFloatHistogram { - fts, fh := it2.AtFloatHistogram() + fts, fh := it2.AtFloatHistogram(nil) act2 = append(act2, floatResult{t: fts, h: fh}) } require.NoError(t, it2.Err()) @@ -164,7 +164,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { it3 := c.iterator(itX) var act3 []floatResult for it3.Next() == ValFloatHistogram { - fts, fh := it3.AtFloatHistogram() + fts, fh := it3.AtFloatHistogram(nil) act3 = append(act3, floatResult{t: fts, h: fh}) } require.NoError(t, it3.Err()) @@ -178,10 +178,10 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { // Below ones should not matter. require.Equal(t, ValFloatHistogram, it4.Seek(exp[mid].t)) require.Equal(t, ValFloatHistogram, it4.Seek(exp[mid].t)) - fts, fh := it4.AtFloatHistogram() + fts, fh := it4.AtFloatHistogram(nil) act4 = append(act4, floatResult{t: fts, h: fh}) for it4.Next() == ValFloatHistogram { - fts, fh := it4.AtFloatHistogram() + fts, fh := it4.AtFloatHistogram(nil) act4 = append(act4, floatResult{t: fts, h: fh}) } require.NoError(t, it4.Err()) @@ -272,7 +272,7 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) { it := c.Iterator(nil) var act []floatResult for it.Next() == ValFloatHistogram { - fts, fh := it.AtFloatHistogram() + fts, fh := it.AtFloatHistogram(nil) act = append(act, floatResult{t: fts, h: fh}) } require.NoError(t, it.Err()) diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index ac84e7a1e..cb09eda26 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -558,7 +558,7 @@ func (a *HistogramAppender) recode( numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans) for it.Next() == ValHistogram { - tOld, hOld := it.AtHistogram() + tOld, hOld := it.AtHistogram(nil) // We have to newly allocate slices for the modified buckets // here because they are kept by the appender until the next @@ -776,42 +776,96 @@ func (it *histogramIterator) At() (int64, float64) { panic("cannot call histogramIterator.At") } -func (it *histogramIterator) AtHistogram() (int64, *histogram.Histogram) { +func (it *histogramIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { if value.IsStaleNaN(it.sum) { return it.t, &histogram.Histogram{Sum: it.sum} } - it.atHistogramCalled = true - return it.t, &histogram.Histogram{ - CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), - Count: it.cnt, - ZeroCount: it.zCnt, - Sum: it.sum, - ZeroThreshold: it.zThreshold, - Schema: it.schema, - PositiveSpans: it.pSpans, - NegativeSpans: it.nSpans, - PositiveBuckets: it.pBuckets, - NegativeBuckets: it.nBuckets, + if h == nil { + it.atHistogramCalled = true + return it.t, &histogram.Histogram{ + CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), + Count: it.cnt, + ZeroCount: it.zCnt, + Sum: it.sum, + ZeroThreshold: it.zThreshold, + Schema: it.schema, + PositiveSpans: it.pSpans, + NegativeSpans: it.nSpans, + PositiveBuckets: it.pBuckets, + NegativeBuckets: it.nBuckets, + } } + + h.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead) + h.Schema = it.schema + h.ZeroThreshold = it.zThreshold + h.ZeroCount = it.zCnt + h.Count = it.cnt + h.Sum = it.sum + + h.PositiveSpans = resize(h.PositiveSpans, len(it.pSpans)) + copy(h.PositiveSpans, it.pSpans) + + h.NegativeSpans = resize(h.NegativeSpans, len(it.nSpans)) + copy(h.NegativeSpans, it.nSpans) + + h.PositiveBuckets = resize(h.PositiveBuckets, len(it.pBuckets)) + copy(h.PositiveBuckets, it.pBuckets) + + h.NegativeBuckets = resize(h.NegativeBuckets, len(it.nBuckets)) + copy(h.NegativeBuckets, it.nBuckets) + + return it.t, h } -func (it *histogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *histogramIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { if value.IsStaleNaN(it.sum) { return it.t, &histogram.FloatHistogram{Sum: it.sum} } - it.atFloatHistogramCalled = true - return it.t, &histogram.FloatHistogram{ - CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), - Count: float64(it.cnt), - ZeroCount: float64(it.zCnt), - Sum: it.sum, - ZeroThreshold: it.zThreshold, - Schema: it.schema, - PositiveSpans: it.pSpans, - NegativeSpans: it.nSpans, - PositiveBuckets: it.pFloatBuckets, - NegativeBuckets: it.nFloatBuckets, + if fh == nil { + it.atFloatHistogramCalled = true + return it.t, &histogram.FloatHistogram{ + CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), + Count: float64(it.cnt), + ZeroCount: float64(it.zCnt), + Sum: it.sum, + ZeroThreshold: it.zThreshold, + Schema: it.schema, + PositiveSpans: it.pSpans, + NegativeSpans: it.nSpans, + PositiveBuckets: it.pFloatBuckets, + NegativeBuckets: it.nFloatBuckets, + } } + + fh.CounterResetHint = counterResetHint(it.counterResetHeader, it.numRead) + fh.Schema = it.schema + fh.ZeroThreshold = it.zThreshold + fh.ZeroCount = float64(it.zCnt) + fh.Count = float64(it.cnt) + fh.Sum = it.sum + + fh.PositiveSpans = resize(fh.PositiveSpans, len(it.pSpans)) + copy(fh.PositiveSpans, it.pSpans) + + fh.NegativeSpans = resize(fh.NegativeSpans, len(it.nSpans)) + copy(fh.NegativeSpans, it.nSpans) + + fh.PositiveBuckets = resize(fh.PositiveBuckets, len(it.pBuckets)) + var currentPositive float64 + for i, b := range it.pBuckets { + currentPositive += float64(b) + fh.PositiveBuckets[i] = currentPositive + } + + fh.NegativeBuckets = resize(fh.NegativeBuckets, len(it.nBuckets)) + var currentNegative float64 + for i, b := range it.nBuckets { + currentNegative += float64(b) + fh.NegativeBuckets[i] = currentNegative + } + + return it.t, fh } func (it *histogramIterator) AtT() int64 { @@ -1056,3 +1110,10 @@ func (it *histogramIterator) readSum() bool { } return true } + +func resize[T any](items []T, n int) []T { + if cap(items) < n { + return make([]T, n) + } + return items[:n] +} diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index 53aee89db..f7609c193 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -141,8 +141,8 @@ func TestHistogramChunkSameBuckets(t *testing.T) { require.NoError(t, it.Err()) var act []result for it.Next() == ValHistogram { - ts, h := it.AtHistogram() - fts, fh := it.AtFloatHistogram() + ts, h := it.AtHistogram(nil) + fts, fh := it.AtFloatHistogram(nil) require.Equal(t, ts, fts) act = append(act, result{t: ts, h: h, fh: fh}) } @@ -153,8 +153,8 @@ func TestHistogramChunkSameBuckets(t *testing.T) { it2 := c.Iterator(it) var act2 []result for it2.Next() == ValHistogram { - ts, h := it2.AtHistogram() - fts, fh := it2.AtFloatHistogram() + ts, h := it2.AtHistogram(nil) + fts, fh := it2.AtFloatHistogram(nil) require.Equal(t, ts, fts) act2 = append(act2, result{t: ts, h: h, fh: fh}) } @@ -169,8 +169,8 @@ func TestHistogramChunkSameBuckets(t *testing.T) { it3 := c.iterator(itX) var act3 []result for it3.Next() == ValHistogram { - ts, h := it3.AtHistogram() - fts, fh := it3.AtFloatHistogram() + ts, h := it3.AtHistogram(nil) + fts, fh := it3.AtFloatHistogram(nil) require.Equal(t, ts, fts) act3 = append(act3, result{t: ts, h: h, fh: fh}) } @@ -185,13 +185,13 @@ func TestHistogramChunkSameBuckets(t *testing.T) { // Below ones should not matter. require.Equal(t, ValHistogram, it4.Seek(exp[mid].t)) require.Equal(t, ValHistogram, it4.Seek(exp[mid].t)) - ts, h = it4.AtHistogram() - fts, fh := it4.AtFloatHistogram() + ts, h = it4.AtHistogram(nil) + fts, fh := it4.AtFloatHistogram(nil) require.Equal(t, ts, fts) act4 = append(act4, result{t: ts, h: h, fh: fh}) for it4.Next() == ValHistogram { - ts, h := it4.AtHistogram() - fts, fh := it4.AtFloatHistogram() + ts, h := it4.AtHistogram(nil) + fts, fh := it4.AtFloatHistogram(nil) require.Equal(t, ts, fts) act4 = append(act4, result{t: ts, h: h, fh: fh}) } @@ -284,8 +284,8 @@ func TestHistogramChunkBucketChanges(t *testing.T) { it := c.Iterator(nil) var act []result for it.Next() == ValHistogram { - ts, h := it.AtHistogram() - fts, fh := it.AtFloatHistogram() + ts, h := it.AtHistogram(nil) + fts, fh := it.AtFloatHistogram(nil) require.Equal(t, ts, fts) act = append(act, result{t: ts, h: h, fh: fh}) } @@ -897,7 +897,7 @@ func TestAtFloatHistogram(t *testing.T) { it := chk.Iterator(nil) i := int64(0) for it.Next() != ValNone { - ts, h := it.AtFloatHistogram() + ts, h := it.AtFloatHistogram(nil) require.Equal(t, i, ts) require.Equal(t, expOutput[i], h, "histogram %d unequal", i) i++ diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index d54e5dbab..07b923831 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -260,11 +260,11 @@ func (it *xorIterator) At() (int64, float64) { return it.t, it.val } -func (it *xorIterator) AtHistogram() (int64, *histogram.Histogram) { +func (it *xorIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { panic("cannot call xorIterator.AtHistogram") } -func (it *xorIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *xorIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { panic("cannot call xorIterator.AtFloatHistogram") } diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 2c6db3637..543b98c28 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -226,10 +226,10 @@ func ChunkMetasToSamples(chunks []Meta) (result []Sample) { t, v := it.At() result = append(result, sample{t: t, f: v}) case chunkenc.ValHistogram: - t, h := it.AtHistogram() + t, h := it.AtHistogram(nil) result = append(result, sample{t: t, h: h}) case chunkenc.ValFloatHistogram: - t, fh := it.AtFloatHistogram() + t, fh := it.AtFloatHistogram(nil) result = append(result, sample{t: t, fh: fh}) default: panic("unexpected value type") diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index f33bb73c1..b2d2ea6e7 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1007,10 +1007,10 @@ func TestCompaction_populateBlock(t *testing.T) { s.t, s.f = iter.At() samples = append(samples, s) case chunkenc.ValHistogram: - s.t, s.h = iter.AtHistogram() + s.t, s.h = iter.AtHistogram(nil) samples = append(samples, s) case chunkenc.ValFloatHistogram: - s.t, s.fh = iter.AtFloatHistogram() + s.t, s.fh = iter.AtFloatHistogram(nil) samples = append(samples, s) default: require.Fail(t, "unexpected value type") diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 3bc094a3d..9e543ed50 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -107,10 +107,10 @@ func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[str ts, v := it.At() samples = append(samples, sample{t: ts, f: v}) case chunkenc.ValHistogram: - ts, h := it.AtHistogram() + ts, h := it.AtHistogram(nil) samples = append(samples, sample{t: ts, h: h}) case chunkenc.ValFloatHistogram: - ts, fh := it.AtFloatHistogram() + ts, fh := it.AtFloatHistogram(nil) samples = append(samples, sample{t: ts, fh: fh}) default: t.Fatalf("unknown sample type in query %s", typ.String()) @@ -6664,10 +6664,10 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { ts, v := it.At() slice = append(slice, sample{t: ts, f: v}) case chunkenc.ValHistogram: - ts, h := it.AtHistogram() + ts, h := it.AtHistogram(nil) slice = append(slice, sample{t: ts, h: h}) case chunkenc.ValFloatHistogram: - ts, h := it.AtFloatHistogram() + ts, h := it.AtFloatHistogram(nil) slice = append(slice, sample{t: ts, fh: h}) default: t.Fatalf("unexpected sample value type %d", typ) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 5c2749bed..653c53a74 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -3377,10 +3377,10 @@ func TestAppendHistogram(t *testing.T) { for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { switch typ { case chunkenc.ValHistogram: - ts, h := it.AtHistogram() + ts, h := it.AtHistogram(nil) actHistograms = append(actHistograms, sample{t: ts, h: h}) case chunkenc.ValFloatHistogram: - ts, fh := it.AtFloatHistogram() + ts, fh := it.AtFloatHistogram(nil) actFloatHistograms = append(actFloatHistograms, sample{t: ts, fh: fh}) } } @@ -4025,10 +4025,10 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) { for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { switch typ { case chunkenc.ValHistogram: - t, h := it.AtHistogram() + t, h := it.AtHistogram(nil) actHistograms = append(actHistograms, timedHistogram{t: t, h: h}) case chunkenc.ValFloatHistogram: - t, h := it.AtFloatHistogram() + t, h := it.AtFloatHistogram(nil) actHistograms = append(actHistograms, timedHistogram{t: t, fh: h}) } } diff --git a/tsdb/querier.go b/tsdb/querier.go index 68aa95746..a692c98f1 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -820,12 +820,12 @@ func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() } -func (p *populateWithDelSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { - return p.curr.AtHistogram() +func (p *populateWithDelSeriesIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { + return p.curr.AtHistogram(h) } -func (p *populateWithDelSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { - return p.curr.AtFloatHistogram() +func (p *populateWithDelSeriesIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + return p.curr.AtFloatHistogram(fh) } func (p *populateWithDelSeriesIterator) AtT() int64 { @@ -937,7 +937,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { break } var h *histogram.Histogram - t, h = p.currDelIter.AtHistogram() + t, h = p.currDelIter.AtHistogram(nil) _, _, app, err = app.AppendHistogram(nil, t, h, true) if err != nil { break @@ -968,7 +968,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { break } var h *histogram.FloatHistogram - t, h = p.currDelIter.AtFloatHistogram() + t, h = p.currDelIter.AtFloatHistogram(nil) _, _, app, err = app.AppendFloatHistogram(nil, t, h, true) if err != nil { break @@ -1054,7 +1054,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { case chunkenc.ValHistogram: { var v *histogram.Histogram - t, v = p.currDelIter.AtHistogram() + t, v = p.currDelIter.AtHistogram(nil) // No need to set prevApp as AppendHistogram will set the // counter reset header for the appender that's returned. newChunk, recoded, app, err = app.AppendHistogram(nil, t, v, false) @@ -1062,7 +1062,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { case chunkenc.ValFloatHistogram: { var v *histogram.FloatHistogram - t, v = p.currDelIter.AtFloatHistogram() + t, v = p.currDelIter.AtFloatHistogram(nil) // No need to set prevApp as AppendHistogram will set the // counter reset header for the appender that's returned. newChunk, recoded, app, err = app.AppendFloatHistogram(nil, t, v, false) @@ -1233,13 +1233,13 @@ func (it *DeletedIterator) At() (int64, float64) { return it.Iter.At() } -func (it *DeletedIterator) AtHistogram() (int64, *histogram.Histogram) { - t, h := it.Iter.AtHistogram() +func (it *DeletedIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { + t, h := it.Iter.AtHistogram(h) return t, h } -func (it *DeletedIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { - t, h := it.Iter.AtFloatHistogram() +func (it *DeletedIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + t, h := it.Iter.AtFloatHistogram(fh) return t, h } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 18d81b85b..1ed59ef1a 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -775,11 +775,11 @@ func (it *mockSampleIterator) At() (int64, float64) { return it.s[it.idx].T(), it.s[it.idx].F() } -func (it *mockSampleIterator) AtHistogram() (int64, *histogram.Histogram) { +func (it *mockSampleIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { return it.s[it.idx].T(), it.s[it.idx].H() } -func (it *mockSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { +func (it *mockSampleIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { return it.s[it.idx].T(), it.s[it.idx].FH() } @@ -1822,12 +1822,12 @@ func checkCurrVal(t *testing.T, valType chunkenc.ValueType, it *populateWithDelS require.Equal(t, int64(expectedTs), ts) require.Equal(t, float64(expectedValue), v) case chunkenc.ValHistogram: - ts, h := it.AtHistogram() + ts, h := it.AtHistogram(nil) require.Equal(t, int64(expectedTs), ts) h.CounterResetHint = histogram.UnknownCounterReset require.Equal(t, tsdbutil.GenerateTestHistogram(expectedValue), h) case chunkenc.ValFloatHistogram: - ts, h := it.AtFloatHistogram() + ts, h := it.AtFloatHistogram(nil) require.Equal(t, int64(expectedTs), ts) h.CounterResetHint = histogram.UnknownCounterReset require.Equal(t, tsdbutil.GenerateTestFloatHistogram(expectedValue), h) diff --git a/tsdb/tsdbblockutil.go b/tsdb/tsdbblockutil.go index d4e43b73c..f7b27c2e0 100644 --- a/tsdb/tsdbblockutil.go +++ b/tsdb/tsdbblockutil.go @@ -73,10 +73,10 @@ func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger l t, v := it.At() ref, err = app.Append(ref, lset, t, v) case chunkenc.ValHistogram: - t, h := it.AtHistogram() + t, h := it.AtHistogram(nil) ref, err = app.AppendHistogram(ref, lset, t, h, nil) case chunkenc.ValFloatHistogram: - t, fh := it.AtFloatHistogram() + t, fh := it.AtFloatHistogram(nil) ref, err = app.AppendHistogram(ref, lset, t, nil, fh) default: return "", fmt.Errorf("unknown sample type %s", typ.String()) diff --git a/web/federate.go b/web/federate.go index 22384a696..62e8c97b7 100644 --- a/web/federate.go +++ b/web/federate.go @@ -123,7 +123,7 @@ Loop: case chunkenc.ValFloat: t, f = it.At() case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: - t, fh = it.AtFloatHistogram() + t, fh = it.AtFloatHistogram(nil) default: sample, ok := it.PeekBack(1) if !ok {