Merge pull request #13289 from fpetkovski/fix-histogram-reuse

Fix reusing float histograms
This commit is contained in:
Björn Rabenstein 2023-12-25 22:45:03 +01:00 committed by GitHub
commit 6b8e945388
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 103 additions and 34 deletions

View file

@ -296,8 +296,12 @@ func BenchmarkNativeHistograms(b *testing.B) {
query: "sum(native_histogram_series)", query: "sum(native_histogram_series)",
}, },
{ {
name: "sum rate", name: "sum rate with short rate interval",
query: "sum(rate(native_histogram_series[1m]))", query: "sum(rate(native_histogram_series[2m]))",
},
{
name: "sum rate with long rate interval",
query: "sum(rate(native_histogram_series[20m]))",
}, },
} }

View file

@ -2052,7 +2052,12 @@ func (ev *evaluator) matrixIterSlice(
var drop int var drop int
for drop = 0; histograms[drop].T < mint; drop++ { for drop = 0; histograms[drop].T < mint; drop++ {
} }
// Rotate the buffer around the drop index so that points before mint can be
// reused to store new histograms.
tail := make([]HPoint, drop)
copy(tail, histograms[:drop])
copy(histograms, histograms[drop:]) copy(histograms, histograms[drop:])
copy(histograms[len(histograms)-drop:], tail)
histograms = histograms[:len(histograms)-drop] histograms = histograms[:len(histograms)-drop]
ev.currentSamples -= totalHPointSize(histograms) ev.currentSamples -= totalHPointSize(histograms)
// Only append points with timestamps after the last timestamp we have. // Only append points with timestamps after the last timestamp we have.
@ -2121,17 +2126,22 @@ loop:
// The sought sample might also be in the range. // The sought sample might also be in the range.
switch soughtValueType { switch soughtValueType {
case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: case chunkenc.ValFloatHistogram, chunkenc.ValHistogram:
t, h := it.AtFloatHistogram() t := it.AtT()
if t == maxt && !value.IsStaleNaN(h.Sum) { if t == maxt {
if ev.currentSamples >= ev.maxSamples { _, h := it.AtFloatHistogram()
ev.error(ErrTooManySamples(env)) if !value.IsStaleNaN(h.Sum) {
if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
if histograms == nil {
histograms = getHPointSlice(16)
}
// The last sample comes directly from the iterator, so we need to copy it to
// avoid having the same reference twice in the buffer.
point := HPoint{T: t, H: h.Copy()}
histograms = append(histograms, point)
ev.currentSamples += point.size()
} }
if histograms == nil {
histograms = getHPointSlice(16)
}
point := HPoint{T: t, H: h}
histograms = append(histograms, point)
ev.currentSamples += point.size()
} }
case chunkenc.ValFloat: case chunkenc.ValFloat:
t, f := it.At() t, f := it.At()

View file

@ -3169,28 +3169,75 @@ func TestNativeHistogramRate(t *testing.T) {
} }
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
queryString := fmt.Sprintf("rate(%s[1m])", seriesName) queryString := fmt.Sprintf("rate(%s[45s])", seriesName)
qry, err := engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond))) t.Run("instant_query", func(t *testing.T) {
require.NoError(t, err) qry, err := engine.NewInstantQuery(context.Background(), storage, nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond)))
res := qry.Exec(context.Background()) require.NoError(t, err)
require.NoError(t, res.Err) res := qry.Exec(context.Background())
vector, err := res.Vector() require.NoError(t, res.Err)
require.NoError(t, err) vector, err := res.Vector()
require.Len(t, vector, 1) require.NoError(t, err)
actualHistogram := vector[0].H require.Len(t, vector, 1)
expectedHistogram := &histogram.FloatHistogram{ actualHistogram := vector[0].H
CounterResetHint: histogram.GaugeType, expectedHistogram := &histogram.FloatHistogram{
Schema: 1, CounterResetHint: histogram.GaugeType,
ZeroThreshold: 0.001, Schema: 1,
ZeroCount: 1. / 15., ZeroThreshold: 0.001,
Count: 9. / 15., ZeroCount: 1. / 15.,
Sum: 1.226666666666667, Count: 9. / 15.,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, Sum: 1.2266666666666663,
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
} NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
require.Equal(t, expectedHistogram, actualHistogram) }
require.Equal(t, expectedHistogram, actualHistogram)
})
t.Run("range_query", func(t *testing.T) {
step := 30 * time.Second
start := timestamp.Time(int64(5 * time.Minute / time.Millisecond))
end := start.Add(step)
qry, err := engine.NewRangeQuery(context.Background(), storage, nil, queryString, start, end, step)
require.NoError(t, err)
res := qry.Exec(context.Background())
require.NoError(t, res.Err)
matrix, err := res.Matrix()
require.NoError(t, err)
require.Len(t, matrix, 1)
require.Len(t, matrix[0].Histograms, 2)
actualHistograms := matrix[0].Histograms
expectedHistograms := []HPoint{{
T: 300000,
H: &histogram.FloatHistogram{
CounterResetHint: histogram.GaugeType,
Schema: 1,
ZeroThreshold: 0.001,
ZeroCount: 1. / 15.,
Count: 9. / 15.,
Sum: 1.2266666666666663,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
},
}, {
T: 330000,
H: &histogram.FloatHistogram{
CounterResetHint: histogram.GaugeType,
Schema: 1,
ZeroThreshold: 0.001,
ZeroCount: 1. / 15.,
Count: 9. / 15.,
Sum: 1.2266666666666663,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}},
NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.},
},
}}
require.Equal(t, expectedHistograms, actualHistograms)
})
} }
func TestNativeFloatHistogramRate(t *testing.T) { func TestNativeFloatHistogramRate(t *testing.T) {

View file

@ -30,6 +30,14 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) {
return r return r
} }
func GenerateTestHistogramsWithUnknownResetHint(n int) []*histogram.Histogram {
hs := GenerateTestHistograms(n)
for i := range hs {
hs[i].CounterResetHint = histogram.UnknownCounterReset
}
return hs
}
// GenerateTestHistogram but it is up to the user to set any known counter reset hint. // GenerateTestHistogram but it is up to the user to set any known counter reset hint.
func GenerateTestHistogram(i int) *histogram.Histogram { func GenerateTestHistogram(i int) *histogram.Histogram {
return &histogram.Histogram{ return &histogram.Histogram{