From 8fededf6adc1c7c1bd9598329627556b3091f22e Mon Sep 17 00:00:00 2001 From: Marc Tuduri Date: Fri, 28 Jul 2023 10:49:36 +0200 Subject: [PATCH] promql(histograms): Change sample total calculation for histograms Signed-off-by: Marc Tuduri --- model/histogram/float_histogram.go | 27 ++++++++++ model/histogram/float_histogram_test.go | 52 +++++++++++++++++++ promql/engine.go | 69 +++++++++++++++---------- promql/value.go | 35 ++++++++++++- 4 files changed, 154 insertions(+), 29 deletions(-) diff --git a/model/histogram/float_histogram.go b/model/histogram/float_histogram.go index 6ee72f24e4..3c394c85e7 100644 --- a/model/histogram/float_histogram.go +++ b/model/histogram/float_histogram.go @@ -338,6 +338,33 @@ func (h *FloatHistogram) Equals(h2 *FloatHistogram) bool { return true } +// Size returns the size of the whole fields histogram in bytes. +// NOTE: this is only valid for 64 bit architectures. +func (fh *FloatHistogram) Size() int { + // Size of each slice separately + posSpanSize := len(fh.PositiveSpans) * 8 // 8 bytes (int32 + uint32) + negSpanSize := len(fh.NegativeSpans) * 8 // 8 bytes (int32 + uint32) + posBucketSize := len(fh.PositiveBuckets) * 8 // 8 bytes (float64) + negBucketSize := len(fh.NegativeBuckets) * 8 // 8 bytes (float64) + + // Total size of the struct + + // fh is 8 bytes + // fh.CounterResetHint is 1 byte + // fh.Schema is 4 bytes + // fh.ZeroThreshold is 8 bytes + // fh.ZeroCount is 8 bytes + // fh.Count is 8 bytes + // fh.Sum is 8 bytes + // fh.PositiveSpans is 24 bytes + // fh.NegativeSpans is 24 bytes + // fh.PositiveBuckets is 24 bytes + // fh.NegativeBuckets is 24 bytes + structSize := 141 + + return structSize + posSpanSize + negSpanSize + posBucketSize + negBucketSize +} + // Compact eliminates empty buckets at the beginning and end of each span, then // merges spans that are consecutive or at most maxEmptyBuckets apart, and // finally splits spans that contain more consecutive empty buckets than diff --git a/model/histogram/float_histogram_test.go b/model/histogram/float_histogram_test.go index ae8ba3ea2e..7f5c29e3bf 100644 --- a/model/histogram/float_histogram_test.go +++ b/model/histogram/float_histogram_test.go @@ -2341,3 +2341,55 @@ func TestFloatHistogramEquals(t *testing.T) { notEquals(h1, *hNegBucketNaN) equals(*hNegBucketNaN, *hNegBucketNaN) } + +func TestFloatHistogramSize(t *testing.T) { + cases := []struct { + name string + fh *FloatHistogram + expected int + }{ + { + "without spans and buckets", + &FloatHistogram{ // 8 bytes + CounterResetHint: 0, // 1 byte + Schema: 1, // 4 bytes + ZeroThreshold: 0.01, // 8 bytes + ZeroCount: 5.5, // 8 bytes + Count: 3493.3, // 8 bytes + Sum: 2349209.324, // 8 bytes + PositiveSpans: nil, // 24 bytes + PositiveBuckets: nil, // 24 bytes + NegativeSpans: nil, // 24 bytes + NegativeBuckets: nil, // 24 bytes + }, + 8 + 1 + 4 + 8 + 8 + 8 + 8 + 24 + 24 + 24 + 24, + }, + { + "complete struct", + &FloatHistogram{ // 8 bytes + CounterResetHint: 0, // 1 byte + Schema: 1, // 4 bytes + ZeroThreshold: 0.01, // 8 bytes + ZeroCount: 5.5, // 8 bytes + Count: 3493.3, // 8 bytes + Sum: 2349209.324, // 8 bytes + PositiveSpans: []Span{ // 24 bytes + {-2, 1}, // 2 * 4 bytes + {2, 3}, // 2 * 4 bytes + }, + PositiveBuckets: []float64{1, 3.3, 4.2, 0.1}, // 24 bytes + 4 * 8 bytes + NegativeSpans: []Span{ // 24 bytes + {3, 2}, // 2 * 4 bytes + {3, 2}}, // 2 * 4 bytes + NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, // 24 bytes + 4 * 8 bytes + }, + 8 + 1 + 4 + 8 + 8 + 8 + 8 + (24 + 2*4 + 2*4) + (24 + 2*4 + 2*4) + (24 + 4*8) + (24 + 4*8), + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + require.Equal(t, c.expected, c.fh.Size()) + }) + } +} diff --git a/promql/engine.go b/promql/engine.go index 161aa85acb..120964ca0e 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1224,10 +1224,11 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) enh.Out = result[:0] // Reuse result vector. warnings.Merge(ws) - ev.currentSamples += len(result) + vecNumSamples := result.TotalSamples() + ev.currentSamples += vecNumSamples // When we reset currentSamples to tempNumSamples during the next iteration of the loop it also // needs to include the samples from the result here, as they're still in memory. - tempNumSamples += len(result) + tempNumSamples += vecNumSamples ev.samplesStats.UpdatePeak(ev.currentSamples) if ev.currentSamples > ev.maxSamples { @@ -1323,12 +1324,10 @@ func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSele Range: subq.Range, VectorSelector: vs, } - totalSamples := 0 for _, s := range mat { - totalSamples += len(s.Floats) + len(s.Histograms) vs.Series = append(vs.Series, NewStorageSeries(s)) } - return ms, totalSamples, ws + return ms, mat.TotalSamples(), ws } // eval evaluates the given expression as the given AST expression node requires. @@ -1470,7 +1469,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio it := storage.NewBuffer(selRange) var chkIter chunkenc.Iterator for i, s := range selVS.Series { - ev.currentSamples -= len(floats) + len(histograms) + ev.currentSamples -= len(floats) + totalHPointSize(histograms) if floats != nil { floats = floats[:0] } @@ -1514,7 +1513,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio // Make the function call. outVec, annos := call(inArgs, e.Args, enh) warnings.Merge(annos) - ev.samplesStats.IncrementSamplesAtStep(step, int64(len(floats)+len(histograms))) + ev.samplesStats.IncrementSamplesAtStep(step, int64(len(floats)+totalHPointSize(histograms))) enh.Out = outVec[:0] if len(outVec) > 0 { @@ -1533,10 +1532,11 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio // Only buffer stepRange milliseconds from the second step on. it.ReduceDelta(stepRange) } - if len(ss.Floats)+len(ss.Histograms) > 0 { - if ev.currentSamples+len(ss.Floats)+len(ss.Histograms) <= ev.maxSamples { + histSamples := totalHPointSize(ss.Histograms) + if len(ss.Floats)+histSamples > 0 { + if ev.currentSamples+len(ss.Floats)+histSamples <= ev.maxSamples { mat = append(mat, ss) - ev.currentSamples += len(ss.Floats) + len(ss.Histograms) + ev.currentSamples += len(ss.Floats) + histSamples } else { ev.error(ErrTooManySamples(env)) } @@ -1545,7 +1545,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio } ev.samplesStats.UpdatePeak(ev.currentSamples) - ev.currentSamples -= len(floats) + len(histograms) + ev.currentSamples -= len(floats) + totalHPointSize(histograms) putFPointSlice(floats) putHPointSlice(histograms) @@ -1692,14 +1692,18 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio ss.Floats = getFPointSlice(numSteps) } ss.Floats = append(ss.Floats, FPoint{F: f, T: ts}) + ev.currentSamples++ + ev.samplesStats.IncrementSamplesAtStep(step, 1) } else { if ss.Histograms == nil { ss.Histograms = getHPointSlice(numSteps) } - ss.Histograms = append(ss.Histograms, HPoint{H: h, T: ts}) + point := HPoint{H: h, T: ts} + ss.Histograms = append(ss.Histograms, point) + histSize := point.histogramSize() + ev.currentSamples += histSize + ev.samplesStats.IncrementSamplesAtStep(step, int64(histSize)) } - ev.samplesStats.IncrementSamplesAtStep(step, 1) - ev.currentSamples++ } else { ev.error(ErrTooManySamples(env)) } @@ -1807,13 +1811,15 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio T: ts, F: mat[i].Floats[0].F, }) + ev.currentSamples++ } else { - mat[i].Histograms = append(mat[i].Histograms, HPoint{ + point := HPoint{ T: ts, H: mat[i].Histograms[0].H, - }) + } + mat[i].Histograms = append(mat[i].Histograms, point) + ev.currentSamples += point.histogramSize() } - ev.currentSamples++ if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } @@ -1857,9 +1863,14 @@ func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.Vec F: f, H: h, }) - + histSize := 0 + if h != nil { + histSize := h.Size() / 16 // 16 bytes per sample. + ev.currentSamples += histSize + } ev.currentSamples++ - ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, 1) + + ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, int64(1+histSize)) if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } @@ -1981,10 +1992,10 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota } ss.Floats, ss.Histograms = ev.matrixIterSlice(it, mint, maxt, nil, nil) - totalLen := int64(len(ss.Floats)) + int64(len(ss.Histograms)) - ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, totalLen) + totalSize := int64(len(ss.Floats)) + int64(totalHPointSize(ss.Histograms)) + ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, totalSize) - if totalLen > 0 { + if totalSize > 0 { matrix = append(matrix, ss) } else { putFPointSlice(ss.Floats) @@ -2040,13 +2051,13 @@ func (ev *evaluator) matrixIterSlice( var drop int for drop = 0; histograms[drop].T < mint; drop++ { // nolint:revive } - ev.currentSamples -= drop copy(histograms, histograms[drop:]) histograms = histograms[:len(histograms)-drop] + ev.currentSamples -= totalHPointSize(histograms) // Only append points with timestamps after the last timestamp we have. mintHistograms = histograms[len(histograms)-1].T + 1 } else { - ev.currentSamples -= len(histograms) + ev.currentSamples -= totalHPointSize(histograms) if histograms != nil { histograms = histograms[:0] } @@ -2075,11 +2086,12 @@ loop: if ev.currentSamples >= ev.maxSamples { ev.error(ErrTooManySamples(env)) } - ev.currentSamples++ + point := HPoint{T: t, H: h} if histograms == nil { histograms = getHPointSlice(16) } - histograms = append(histograms, HPoint{T: t, H: h}) + histograms = append(histograms, point) + ev.currentSamples += point.histogramSize() } case chunkenc.ValFloat: t, f := buf.At() @@ -2110,8 +2122,9 @@ loop: if histograms == nil { histograms = getHPointSlice(16) } - histograms = append(histograms, HPoint{T: t, H: h}) - ev.currentSamples++ + point := HPoint{T: t, H: h} + histograms = append(histograms, point) + ev.currentSamples += point.histogramSize() } case chunkenc.ValFloat: t, f := it.At() diff --git a/promql/value.go b/promql/value.go index 68e37f37ee..5fa339ad57 100644 --- a/promql/value.go +++ b/promql/value.go @@ -168,6 +168,23 @@ func (p HPoint) MarshalJSON() ([]byte, error) { return json.Marshal([...]interface{}{float64(p.T) / 1000, h}) } +// histogramSize returns the size of the HPoint compared to the size of an FPoint. +// The total size is calculated considering the histogram timestamp (p.T - 8 bytes), +// and then a number of bytes in the histogram. +// This sum is divided by 16, as samples are 16 bytes. +func (p HPoint) histogramSize() int { + return (p.H.Size() + 8) / 16 +} + +// totalHPointSize returns the total number of samples in the given slice of HPoints. +func totalHPointSize(histograms []HPoint) int { + var total int + for _, h := range histograms { + total += h.histogramSize() + } + return total +} + // Sample is a single sample belonging to a metric. It represents either a float // sample or a histogram sample. If H is nil, it is a float sample. Otherwise, // it is a histogram sample. @@ -226,6 +243,21 @@ func (vec Vector) String() string { return strings.Join(entries, "\n") } +// TotalSamples returns the total number of samples in the series within a vector. +// Float samples have a weight of 1 in this number, while histogram samples have a higher +// weight according to their size compared with the size of a float sample. +// See HPoint.histogramSize for details. +func (vec Vector) TotalSamples() int { + numSamples := 0 + for _, sample := range vec { + numSamples++ + if sample.H != nil { + numSamples += sample.H.Size() / 16 + } + } + return numSamples +} + // ContainsSameLabelset checks if a vector has samples with the same labelset // Such a behavior is semantically undefined // https://github.com/prometheus/prometheus/issues/4562 @@ -264,10 +296,11 @@ func (m Matrix) String() string { } // TotalSamples returns the total number of samples in the series within a matrix. +// It takes into account the number of samples in the histograms using the histogramSize method. func (m Matrix) TotalSamples() int { numSamples := 0 for _, series := range m { - numSamples += len(series.Floats) + len(series.Histograms) + numSamples += len(series.Floats) + totalHPointSize(series.Histograms) } return numSamples }