promql: aggregations: skip result vector in range queries

Adjust test to match the lower count, since samples in the vector
are no longer counted.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-02-29 23:21:46 +00:00
parent 59548b8a0b
commit 3851b74db1
2 changed files with 35 additions and 56 deletions

View file

@ -1367,18 +1367,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
enh.Out = result[:0] // Reuse result vector.
warnings.Merge(ws)
vecNumSamples := result.TotalSamples()
ev.currentSamples += vecNumSamples
// When we reset currentSamples to tempNumSamples during the next iteration of the loop it also
// needs to include the samples from the result here, as they're still in memory.
tempNumSamples += vecNumSamples
ev.samplesStats.UpdatePeak(ev.currentSamples)
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
mat := make(Matrix, len(result))
@ -1393,6 +1381,9 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
ev.samplesStats.UpdatePeak(ev.currentSamples)
return mat, warnings
}
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
}
// Reuse the original point slice.
@ -2946,7 +2937,33 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
}
}
// Construct the result Vector from the aggregated groups.
// Construct the result from the aggregated groups.
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
add := func(lbls labels.Labels, f float64, h *histogram.FloatHistogram) {
// If this could be an instant query, build a slice so the result is in consistent order.
if ev.endTimestamp == ev.startTimestamp {
enh.Out = append(enh.Out, Sample{Metric: lbls, F: f, H: h})
} else {
// Otherwise the results are added into seriess elements.
hash := lbls.Hash()
ss, ok := seriess[hash]
if !ok {
ss = Series{Metric: lbls}
}
if h == nil {
if ss.Floats == nil {
ss.Floats = getFPointSlice(numSteps)
}
ss.Floats = append(ss.Floats, FPoint{T: enh.Ts, F: f})
} else {
if ss.Histograms == nil {
ss.Histograms = getHPointSlice(numSteps)
}
ss.Histograms = append(ss.Histograms, HPoint{T: enh.Ts, H: h})
}
seriess[hash] = ss
}
}
for _, aggr := range orderedResult {
switch op {
case parser.AVG:
@ -2976,10 +2993,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
sort.Sort(sort.Reverse(aggr.heap))
}
for _, v := range aggr.heap {
enh.Out = append(enh.Out, Sample{
Metric: v.Metric,
F: v.F,
})
add(v.Metric, v.F, nil)
}
continue // Bypass default append.
@ -2989,10 +3003,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
sort.Sort(sort.Reverse(aggr.reverseHeap))
}
for _, v := range aggr.reverseHeap {
enh.Out = append(enh.Out, Sample{
Metric: v.Metric,
F: v.F,
})
add(v.Metric, v.F, nil)
}
continue // Bypass default append.
@ -3015,42 +3026,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
// For other aggregations, we already have the right value.
}
enh.Out = append(enh.Out, Sample{
Metric: aggr.labels,
F: aggr.floatValue,
H: aggr.histogramValue,
})
add(aggr.labels, aggr.floatValue, aggr.histogramValue)
}
ts := enh.Ts
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
return enh.Out, annos
}
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
// Add samples in output vector to output series.
for _, sample := range enh.Out {
h := sample.Metric.Hash()
ss, ok := seriess[h]
if !ok {
ss = Series{Metric: sample.Metric}
}
if sample.H == nil {
if ss.Floats == nil {
ss.Floats = getFPointSlice(numSteps)
}
ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F})
} else {
if ss.Histograms == nil {
ss.Histograms = getHPointSlice(numSteps)
}
ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H})
}
seriess[h] = ss
}
return nil, annos
return enh.Out, annos
}
// aggregationK evaluates count_values on vec.

View file

@ -966,7 +966,7 @@ load 10s
{
Query: "sum by (b) (max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))",
Start: time.Unix(201, 0),
PeakSamples: 8,
PeakSamples: 7,
TotalSamples: 12, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 12,