promql(histograms): Change sample total calculation for histograms

Signed-off-by: Marc Tuduri <marctc@protonmail.com>
This commit is contained in:
Marc Tuduri 2023-07-28 10:49:36 +02:00
parent 16af86734f
commit 8fededf6ad
No known key found for this signature in database
GPG key ID: 761973D5AE312AF4
4 changed files with 154 additions and 29 deletions

View file

@ -338,6 +338,33 @@ func (h *FloatHistogram) Equals(h2 *FloatHistogram) bool {
return true return true
} }
// Size returns the size of the whole fields histogram in bytes.
// NOTE: this is only valid for 64 bit architectures.
func (fh *FloatHistogram) Size() int {
// Size of each slice separately
posSpanSize := len(fh.PositiveSpans) * 8 // 8 bytes (int32 + uint32)
negSpanSize := len(fh.NegativeSpans) * 8 // 8 bytes (int32 + uint32)
posBucketSize := len(fh.PositiveBuckets) * 8 // 8 bytes (float64)
negBucketSize := len(fh.NegativeBuckets) * 8 // 8 bytes (float64)
// Total size of the struct
// fh is 8 bytes
// fh.CounterResetHint is 1 byte
// fh.Schema is 4 bytes
// fh.ZeroThreshold is 8 bytes
// fh.ZeroCount is 8 bytes
// fh.Count is 8 bytes
// fh.Sum is 8 bytes
// fh.PositiveSpans is 24 bytes
// fh.NegativeSpans is 24 bytes
// fh.PositiveBuckets is 24 bytes
// fh.NegativeBuckets is 24 bytes
structSize := 141
return structSize + posSpanSize + negSpanSize + posBucketSize + negBucketSize
}
// Compact eliminates empty buckets at the beginning and end of each span, then // Compact eliminates empty buckets at the beginning and end of each span, then
// merges spans that are consecutive or at most maxEmptyBuckets apart, and // merges spans that are consecutive or at most maxEmptyBuckets apart, and
// finally splits spans that contain more consecutive empty buckets than // finally splits spans that contain more consecutive empty buckets than

View file

@ -2341,3 +2341,55 @@ func TestFloatHistogramEquals(t *testing.T) {
notEquals(h1, *hNegBucketNaN) notEquals(h1, *hNegBucketNaN)
equals(*hNegBucketNaN, *hNegBucketNaN) equals(*hNegBucketNaN, *hNegBucketNaN)
} }
func TestFloatHistogramSize(t *testing.T) {
cases := []struct {
name string
fh *FloatHistogram
expected int
}{
{
"without spans and buckets",
&FloatHistogram{ // 8 bytes
CounterResetHint: 0, // 1 byte
Schema: 1, // 4 bytes
ZeroThreshold: 0.01, // 8 bytes
ZeroCount: 5.5, // 8 bytes
Count: 3493.3, // 8 bytes
Sum: 2349209.324, // 8 bytes
PositiveSpans: nil, // 24 bytes
PositiveBuckets: nil, // 24 bytes
NegativeSpans: nil, // 24 bytes
NegativeBuckets: nil, // 24 bytes
},
8 + 1 + 4 + 8 + 8 + 8 + 8 + 24 + 24 + 24 + 24,
},
{
"complete struct",
&FloatHistogram{ // 8 bytes
CounterResetHint: 0, // 1 byte
Schema: 1, // 4 bytes
ZeroThreshold: 0.01, // 8 bytes
ZeroCount: 5.5, // 8 bytes
Count: 3493.3, // 8 bytes
Sum: 2349209.324, // 8 bytes
PositiveSpans: []Span{ // 24 bytes
{-2, 1}, // 2 * 4 bytes
{2, 3}, // 2 * 4 bytes
},
PositiveBuckets: []float64{1, 3.3, 4.2, 0.1}, // 24 bytes + 4 * 8 bytes
NegativeSpans: []Span{ // 24 bytes
{3, 2}, // 2 * 4 bytes
{3, 2}}, // 2 * 4 bytes
NegativeBuckets: []float64{3.1, 3, 1.234e5, 1000}, // 24 bytes + 4 * 8 bytes
},
8 + 1 + 4 + 8 + 8 + 8 + 8 + (24 + 2*4 + 2*4) + (24 + 2*4 + 2*4) + (24 + 4*8) + (24 + 4*8),
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
require.Equal(t, c.expected, c.fh.Size())
})
}
}

View file

@ -1224,10 +1224,11 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
enh.Out = result[:0] // Reuse result vector. enh.Out = result[:0] // Reuse result vector.
warnings.Merge(ws) warnings.Merge(ws)
ev.currentSamples += len(result) vecNumSamples := result.TotalSamples()
ev.currentSamples += vecNumSamples
// When we reset currentSamples to tempNumSamples during the next iteration of the loop it also // When we reset currentSamples to tempNumSamples during the next iteration of the loop it also
// needs to include the samples from the result here, as they're still in memory. // needs to include the samples from the result here, as they're still in memory.
tempNumSamples += len(result) tempNumSamples += vecNumSamples
ev.samplesStats.UpdatePeak(ev.currentSamples) ev.samplesStats.UpdatePeak(ev.currentSamples)
if ev.currentSamples > ev.maxSamples { if ev.currentSamples > ev.maxSamples {
@ -1323,12 +1324,10 @@ func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSele
Range: subq.Range, Range: subq.Range,
VectorSelector: vs, VectorSelector: vs,
} }
totalSamples := 0
for _, s := range mat { for _, s := range mat {
totalSamples += len(s.Floats) + len(s.Histograms)
vs.Series = append(vs.Series, NewStorageSeries(s)) vs.Series = append(vs.Series, NewStorageSeries(s))
} }
return ms, totalSamples, ws return ms, mat.TotalSamples(), ws
} }
// eval evaluates the given expression as the given AST expression node requires. // eval evaluates the given expression as the given AST expression node requires.
@ -1470,7 +1469,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
it := storage.NewBuffer(selRange) it := storage.NewBuffer(selRange)
var chkIter chunkenc.Iterator var chkIter chunkenc.Iterator
for i, s := range selVS.Series { for i, s := range selVS.Series {
ev.currentSamples -= len(floats) + len(histograms) ev.currentSamples -= len(floats) + totalHPointSize(histograms)
if floats != nil { if floats != nil {
floats = floats[:0] floats = floats[:0]
} }
@ -1514,7 +1513,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
// Make the function call. // Make the function call.
outVec, annos := call(inArgs, e.Args, enh) outVec, annos := call(inArgs, e.Args, enh)
warnings.Merge(annos) warnings.Merge(annos)
ev.samplesStats.IncrementSamplesAtStep(step, int64(len(floats)+len(histograms))) ev.samplesStats.IncrementSamplesAtStep(step, int64(len(floats)+totalHPointSize(histograms)))
enh.Out = outVec[:0] enh.Out = outVec[:0]
if len(outVec) > 0 { if len(outVec) > 0 {
@ -1533,10 +1532,11 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
// Only buffer stepRange milliseconds from the second step on. // Only buffer stepRange milliseconds from the second step on.
it.ReduceDelta(stepRange) it.ReduceDelta(stepRange)
} }
if len(ss.Floats)+len(ss.Histograms) > 0 { histSamples := totalHPointSize(ss.Histograms)
if ev.currentSamples+len(ss.Floats)+len(ss.Histograms) <= ev.maxSamples { if len(ss.Floats)+histSamples > 0 {
if ev.currentSamples+len(ss.Floats)+histSamples <= ev.maxSamples {
mat = append(mat, ss) mat = append(mat, ss)
ev.currentSamples += len(ss.Floats) + len(ss.Histograms) ev.currentSamples += len(ss.Floats) + histSamples
} else { } else {
ev.error(ErrTooManySamples(env)) ev.error(ErrTooManySamples(env))
} }
@ -1545,7 +1545,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
} }
ev.samplesStats.UpdatePeak(ev.currentSamples) ev.samplesStats.UpdatePeak(ev.currentSamples)
ev.currentSamples -= len(floats) + len(histograms) ev.currentSamples -= len(floats) + totalHPointSize(histograms)
putFPointSlice(floats) putFPointSlice(floats)
putHPointSlice(histograms) putHPointSlice(histograms)
@ -1692,14 +1692,18 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
ss.Floats = getFPointSlice(numSteps) ss.Floats = getFPointSlice(numSteps)
} }
ss.Floats = append(ss.Floats, FPoint{F: f, T: ts}) ss.Floats = append(ss.Floats, FPoint{F: f, T: ts})
ev.currentSamples++
ev.samplesStats.IncrementSamplesAtStep(step, 1)
} else { } else {
if ss.Histograms == nil { if ss.Histograms == nil {
ss.Histograms = getHPointSlice(numSteps) ss.Histograms = getHPointSlice(numSteps)
} }
ss.Histograms = append(ss.Histograms, HPoint{H: h, T: ts}) point := HPoint{H: h, T: ts}
ss.Histograms = append(ss.Histograms, point)
histSize := point.histogramSize()
ev.currentSamples += histSize
ev.samplesStats.IncrementSamplesAtStep(step, int64(histSize))
} }
ev.samplesStats.IncrementSamplesAtStep(step, 1)
ev.currentSamples++
} else { } else {
ev.error(ErrTooManySamples(env)) ev.error(ErrTooManySamples(env))
} }
@ -1807,13 +1811,15 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
T: ts, T: ts,
F: mat[i].Floats[0].F, F: mat[i].Floats[0].F,
}) })
ev.currentSamples++
} else { } else {
mat[i].Histograms = append(mat[i].Histograms, HPoint{ point := HPoint{
T: ts, T: ts,
H: mat[i].Histograms[0].H, H: mat[i].Histograms[0].H,
}) }
mat[i].Histograms = append(mat[i].Histograms, point)
ev.currentSamples += point.histogramSize()
} }
ev.currentSamples++
if ev.currentSamples > ev.maxSamples { if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env)) ev.error(ErrTooManySamples(env))
} }
@ -1857,9 +1863,14 @@ func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.Vec
F: f, F: f,
H: h, H: h,
}) })
histSize := 0
if h != nil {
histSize := h.Size() / 16 // 16 bytes per sample.
ev.currentSamples += histSize
}
ev.currentSamples++ ev.currentSamples++
ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, 1)
ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, int64(1+histSize))
if ev.currentSamples > ev.maxSamples { if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env)) ev.error(ErrTooManySamples(env))
} }
@ -1981,10 +1992,10 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota
} }
ss.Floats, ss.Histograms = ev.matrixIterSlice(it, mint, maxt, nil, nil) ss.Floats, ss.Histograms = ev.matrixIterSlice(it, mint, maxt, nil, nil)
totalLen := int64(len(ss.Floats)) + int64(len(ss.Histograms)) totalSize := int64(len(ss.Floats)) + int64(totalHPointSize(ss.Histograms))
ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, totalLen) ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, totalSize)
if totalLen > 0 { if totalSize > 0 {
matrix = append(matrix, ss) matrix = append(matrix, ss)
} else { } else {
putFPointSlice(ss.Floats) putFPointSlice(ss.Floats)
@ -2040,13 +2051,13 @@ func (ev *evaluator) matrixIterSlice(
var drop int var drop int
for drop = 0; histograms[drop].T < mint; drop++ { // nolint:revive for drop = 0; histograms[drop].T < mint; drop++ { // nolint:revive
} }
ev.currentSamples -= drop
copy(histograms, histograms[drop:]) copy(histograms, histograms[drop:])
histograms = histograms[:len(histograms)-drop] histograms = histograms[:len(histograms)-drop]
ev.currentSamples -= totalHPointSize(histograms)
// Only append points with timestamps after the last timestamp we have. // Only append points with timestamps after the last timestamp we have.
mintHistograms = histograms[len(histograms)-1].T + 1 mintHistograms = histograms[len(histograms)-1].T + 1
} else { } else {
ev.currentSamples -= len(histograms) ev.currentSamples -= totalHPointSize(histograms)
if histograms != nil { if histograms != nil {
histograms = histograms[:0] histograms = histograms[:0]
} }
@ -2075,11 +2086,12 @@ loop:
if ev.currentSamples >= ev.maxSamples { if ev.currentSamples >= ev.maxSamples {
ev.error(ErrTooManySamples(env)) ev.error(ErrTooManySamples(env))
} }
ev.currentSamples++ point := HPoint{T: t, H: h}
if histograms == nil { if histograms == nil {
histograms = getHPointSlice(16) histograms = getHPointSlice(16)
} }
histograms = append(histograms, HPoint{T: t, H: h}) histograms = append(histograms, point)
ev.currentSamples += point.histogramSize()
} }
case chunkenc.ValFloat: case chunkenc.ValFloat:
t, f := buf.At() t, f := buf.At()
@ -2110,8 +2122,9 @@ loop:
if histograms == nil { if histograms == nil {
histograms = getHPointSlice(16) histograms = getHPointSlice(16)
} }
histograms = append(histograms, HPoint{T: t, H: h}) point := HPoint{T: t, H: h}
ev.currentSamples++ histograms = append(histograms, point)
ev.currentSamples += point.histogramSize()
} }
case chunkenc.ValFloat: case chunkenc.ValFloat:
t, f := it.At() t, f := it.At()

View file

@ -168,6 +168,23 @@ func (p HPoint) MarshalJSON() ([]byte, error) {
return json.Marshal([...]interface{}{float64(p.T) / 1000, h}) return json.Marshal([...]interface{}{float64(p.T) / 1000, h})
} }
// histogramSize returns the size of the HPoint compared to the size of an FPoint.
// The total size is calculated considering the histogram timestamp (p.T - 8 bytes),
// and then a number of bytes in the histogram.
// This sum is divided by 16, as samples are 16 bytes.
func (p HPoint) histogramSize() int {
return (p.H.Size() + 8) / 16
}
// totalHPointSize returns the total number of samples in the given slice of HPoints.
func totalHPointSize(histograms []HPoint) int {
var total int
for _, h := range histograms {
total += h.histogramSize()
}
return total
}
// Sample is a single sample belonging to a metric. It represents either a float // Sample is a single sample belonging to a metric. It represents either a float
// sample or a histogram sample. If H is nil, it is a float sample. Otherwise, // sample or a histogram sample. If H is nil, it is a float sample. Otherwise,
// it is a histogram sample. // it is a histogram sample.
@ -226,6 +243,21 @@ func (vec Vector) String() string {
return strings.Join(entries, "\n") return strings.Join(entries, "\n")
} }
// TotalSamples returns the total number of samples in the series within a vector.
// Float samples have a weight of 1 in this number, while histogram samples have a higher
// weight according to their size compared with the size of a float sample.
// See HPoint.histogramSize for details.
func (vec Vector) TotalSamples() int {
numSamples := 0
for _, sample := range vec {
numSamples++
if sample.H != nil {
numSamples += sample.H.Size() / 16
}
}
return numSamples
}
// ContainsSameLabelset checks if a vector has samples with the same labelset // ContainsSameLabelset checks if a vector has samples with the same labelset
// Such a behavior is semantically undefined // Such a behavior is semantically undefined
// https://github.com/prometheus/prometheus/issues/4562 // https://github.com/prometheus/prometheus/issues/4562
@ -264,10 +296,11 @@ func (m Matrix) String() string {
} }
// TotalSamples returns the total number of samples in the series within a matrix. // TotalSamples returns the total number of samples in the series within a matrix.
// It takes into account the number of samples in the histograms using the histogramSize method.
func (m Matrix) TotalSamples() int { func (m Matrix) TotalSamples() int {
numSamples := 0 numSamples := 0
for _, series := range m { for _, series := range m {
numSamples += len(series.Floats) + len(series.Histograms) numSamples += len(series.Floats) + totalHPointSize(series.Histograms)
} }
return numSamples return numSamples
} }