From 1f69dcfa6bfb5c53dacbe33b8aca45a6b7540cc8 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Thu, 14 Dec 2023 11:28:15 +0100 Subject: [PATCH 1/4] Fix reusing float histograms In https://github.com/prometheus/prometheus/pull/13276 we started reusing float histogram objects to reduce allocations in PromQL. That PR introduces a bug where histogram pointers gets copied to the beginning of the histograms slice, but are still kept in the end of the slice. When a new histogram is read into the last element, it can overwrite a previous element because the pointer is the same. This commit fixes the issue by moving outdated points to the end of the slice so that we don't end up with duplicate pointers in the same buffer. In other words, the slice gets rotated so that old objects can get reused. Signed-off-by: Filip Petkovski --- promql/engine.go | 5 +++ promql/engine_test.go | 91 +++++++++++++++++++++++++++++--------- tsdb/tsdbutil/histogram.go | 8 ++++ 3 files changed, 82 insertions(+), 22 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 16b8ee5002..12755663dc 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2052,7 +2052,12 @@ func (ev *evaluator) matrixIterSlice( var drop int for drop = 0; histograms[drop].T < mint; drop++ { } + // Rotate the buffer around the drop index so that points before mint can be + // reused to store new histograms. + tail := make([]HPoint, drop) + copy(tail, histograms[:drop]) copy(histograms, histograms[drop:]) + copy(histograms[len(histograms)-drop:], tail) histograms = histograms[:len(histograms)-drop] ev.currentSamples -= totalHPointSize(histograms) // Only append points with timestamps after the last timestamp we have. diff --git a/promql/engine_test.go b/promql/engine_test.go index 9ab54dd16c..105cdc10d5 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -3169,28 +3169,75 @@ func TestNativeHistogramRate(t *testing.T) { } require.NoError(t, app.Commit()) - queryString := fmt.Sprintf("rate(%s[1m])", seriesName) - qry, err := engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) - require.NoError(t, err) - res := qry.Exec(context.Background()) - require.NoError(t, res.Err) - vector, err := res.Vector() - require.NoError(t, err) - require.Len(t, vector, 1) - actualHistogram := vector[0].H - expectedHistogram := &histogram.FloatHistogram{ - CounterResetHint: histogram.GaugeType, - Schema: 1, - ZeroThreshold: 0.001, - ZeroCount: 1. / 15., - Count: 9. / 15., - Sum: 1.226666666666667, - PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, - PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, - NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, - NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, - } - require.Equal(t, expectedHistogram, actualHistogram) + queryString := fmt.Sprintf("rate(%s[45s])", seriesName) + t.Run("instant_query", func(t *testing.T) { + qry, err := engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) + require.NoError(t, err) + res := qry.Exec(context.Background()) + require.NoError(t, res.Err) + vector, err := res.Vector() + require.NoError(t, err) + require.Len(t, vector, 1) + actualHistogram := vector[0].H + expectedHistogram := &histogram.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Schema: 1, + ZeroThreshold: 0.001, + ZeroCount: 1. / 15., + Count: 9. / 15., + Sum: 1.2266666666666663, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + } + require.Equal(t, expectedHistogram, actualHistogram) + }) + + t.Run("range_query", func(t *testing.T) { + step := 30 * time.Second + start := timestamp.Time(int64(5 * time.Minute / time.Millisecond)) + end := start.Add(step) + qry, err := engine.NewRangeQuery(context.Background(), storage, nil, queryString, start, end, step) + require.NoError(t, err) + res := qry.Exec(context.Background()) + require.NoError(t, res.Err) + matrix, err := res.Matrix() + require.NoError(t, err) + require.Len(t, matrix, 1) + require.Len(t, matrix[0].Histograms, 2) + actualHistograms := matrix[0].Histograms + expectedHistograms := []HPoint{{ + T: 300000, + H: &histogram.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Schema: 1, + ZeroThreshold: 0.001, + ZeroCount: 1. / 15., + Count: 9. / 15., + Sum: 1.2266666666666663, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + }, + }, { + T: 330000, + H: &histogram.FloatHistogram{ + CounterResetHint: histogram.GaugeType, + Schema: 1, + ZeroThreshold: 0.001, + ZeroCount: 1. / 15., + Count: 9. / 15., + Sum: 1.2266666666666663, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + }, + }} + require.Equal(t, expectedHistograms, actualHistograms) + }) } func TestNativeFloatHistogramRate(t *testing.T) { diff --git a/tsdb/tsdbutil/histogram.go b/tsdb/tsdbutil/histogram.go index 0847f81a8a..bb8d49b202 100644 --- a/tsdb/tsdbutil/histogram.go +++ b/tsdb/tsdbutil/histogram.go @@ -30,6 +30,14 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) { return r } +func GenerateTestHistogramsWithUnknownResetHint(n int) []*histogram.Histogram { + hs := GenerateTestHistograms(n) + for i := range hs { + hs[i].CounterResetHint = histogram.UnknownCounterReset + } + return hs +} + // GenerateTestHistogram but it is up to the user to set any known counter reset hint. func GenerateTestHistogram(i int) *histogram.Histogram { return &histogram.Histogram{ From 5df3820c7a5e099f0cdf8c503a01ac4b0498cf96 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Mon, 25 Dec 2023 11:19:16 +0100 Subject: [PATCH 2/4] Copy last histogram point Signed-off-by: Filip Petkovski --- promql/engine.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 12755663dc..5d6f7b9a08 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2126,17 +2126,20 @@ loop: // The sought sample might also be in the range. switch soughtValueType { case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: - t, h := it.AtFloatHistogram() - if t == maxt && !value.IsStaleNaN(h.Sum) { - if ev.currentSamples >= ev.maxSamples { - ev.error(ErrTooManySamples(env)) + t := it.AtT() + if t == maxt { + _, h := it.AtFloatHistogram() + if !value.IsStaleNaN(h.Sum) { + if ev.currentSamples >= ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + if histograms == nil { + histograms = getHPointSlice(16) + } + point := HPoint{T: t, H: h.Copy()} + histograms = append(histograms, point) + ev.currentSamples += point.size() } - if histograms == nil { - histograms = getHPointSlice(16) - } - point := HPoint{T: t, H: h} - histograms = append(histograms, point) - ev.currentSamples += point.size() } case chunkenc.ValFloat: t, f := it.At() From 35f9620cd1f212b37cc3074bbd59bdfae713f87d Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Mon, 25 Dec 2023 11:30:29 +0100 Subject: [PATCH 3/4] Expand benchmark Signed-off-by: Filip Petkovski --- promql/bench_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/promql/bench_test.go b/promql/bench_test.go index 13eba3714e..b7a4978de2 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -296,8 +296,12 @@ func BenchmarkNativeHistograms(b *testing.B) { query: "sum(native_histogram_series)", }, { - name: "sum rate", - query: "sum(rate(native_histogram_series[1m]))", + name: "sum rate with short rate interval", + query: "sum(rate(native_histogram_series[2m]))", + }, + { + name: "sum rate with long rate interval", + query: "sum(rate(native_histogram_series[20m]))", }, } From 0e1ae1d1caa65c398d0024a4ecb7977405e15a07 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Mon, 25 Dec 2023 11:41:07 +0100 Subject: [PATCH 4/4] Add comment Signed-off-by: Filip Petkovski --- promql/engine.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/promql/engine.go b/promql/engine.go index 5d6f7b9a08..2ea37dae60 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2136,6 +2136,8 @@ loop: if histograms == nil { histograms = getHPointSlice(16) } + // The last sample comes directly from the iterator, so we need to copy it to + // avoid having the same reference twice in the buffer. point := HPoint{T: t, H: h.Copy()} histograms = append(histograms, point) ev.currentSamples += point.size()