promql: refactor: move collection of results into aggregation()

We don't need to check for duplicates as aggregation cannot generate them.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-02-27 08:19:01 +00:00
parent bd9bdccb22
commit 59548b8a0b

View file

@ -1292,7 +1292,6 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
}
func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string) (Matrix, annotations.Annotations) {
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
originalNumSamples := ev.currentSamples
var warnings annotations.Annotations
@ -1315,11 +1314,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
var vector Vector // Input vectors for the function.
biggestLen := len(inputMatrix)
enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)}
type seriesAndTimestamp struct {
Series
ts int64
}
seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash.
seriess := make(map[uint64]Series, biggestLen) // Output series by series hash.
tempNumSamples := ev.currentSamples
// Initialise series helpers with the grouping key.
@ -1367,7 +1362,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
// Make the function call.
enh.Ts = ts
result, ws := ev.aggregation(aggExpr, sortedGrouping, param, vector, bufHelper, enh)
result, ws := ev.aggregation(aggExpr, sortedGrouping, param, vector, bufHelper, enh, seriess)
enh.Out = result[:0] // Reuse result vector.
warnings.Merge(ws)
@ -1386,9 +1381,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
if result.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
}
mat := make(Matrix, len(result))
for i, s := range result {
if s.H == nil {
@ -1401,32 +1393,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
ev.samplesStats.UpdatePeak(ev.currentSamples)
return mat, warnings
}
// Add samples in output vector to output series.
for _, sample := range result {
h := sample.Metric.Hash()
ss, ok := seriess[h]
if ok {
if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate.
ev.errorf("vector cannot contain metrics with the same labelset")
}
ss.ts = ts
} else {
ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts}
}
if sample.H == nil {
if ss.Floats == nil {
ss.Floats = getFPointSlice(numSteps)
}
ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F})
} else {
if ss.Histograms == nil {
ss.Histograms = getHPointSlice(numSteps)
}
ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H})
}
seriess[h] = ss
}
}
// Reuse the original point slice.
@ -1437,7 +1403,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
// Assemble the output matrix. By the time we get here we know we don't have too many samples.
mat := make(Matrix, 0, len(seriess))
for _, ss := range seriess {
mat = append(mat, ss.Series)
mat = append(mat, ss)
}
ev.currentSamples = originalNumSamples + mat.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)
@ -2778,7 +2744,7 @@ type groupedAggregation struct {
// aggregation evaluates an aggregation operation on a Vector. The provided grouping labels
// must be sorted.
func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Vector, annotations.Annotations) {
op := e.Op
without := e.Without
var annos annotations.Annotations
@ -3055,7 +3021,36 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par
H: aggr.histogramValue,
})
}
return enh.Out, annos
ts := enh.Ts
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
return enh.Out, annos
}
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
// Add samples in output vector to output series.
for _, sample := range enh.Out {
h := sample.Metric.Hash()
ss, ok := seriess[h]
if !ok {
ss = Series{Metric: sample.Metric}
}
if sample.H == nil {
if ss.Floats == nil {
ss.Floats = getFPointSlice(numSteps)
}
ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F})
} else {
if ss.Histograms == nil {
ss.Histograms = getHPointSlice(numSteps)
}
ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H})
}
seriess[h] = ss
}
return nil, annos
}
// aggregationK evaluates count_values on vec.