promql: refactor: split out aggregations over range

The new function `rangeEvalAgg` is mostly a copy of `rangeEval`, but
without `initSeries` which we don't need and inlining the callback to
`aggregation()`.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-02-27 06:38:49 +00:00
parent e5f667537c
commit 5f10d17cef

View file

@ -1291,6 +1291,172 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
return mat, warnings
}
func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string) (Matrix, annotations.Annotations) {
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
matrixes := make([]Matrix, 2)
origMatrixes := make([]Matrix, 2)
originalNumSamples := ev.currentSamples
var warnings annotations.Annotations
for i, e := range []parser.Expr{aggExpr.Param, aggExpr.Expr} {
// Functions will take string arguments from the expressions, not the values.
if e != nil && e.Type() != parser.ValueTypeString {
// ev.currentSamples will be updated to the correct value within the ev.eval call.
val, ws := ev.eval(e)
warnings.Merge(ws)
matrixes[i] = val.(Matrix)
// Keep a copy of the original point slices so that they
// can be returned to the pool.
origMatrixes[i] = make(Matrix, len(matrixes[i]))
copy(origMatrixes[i], matrixes[i])
}
}
vectors := make([]Vector, 2) // Input vectors for the function.
args := make([]parser.Value, 2) // Argument to function.
biggestLen := len(matrixes[1])
enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)}
type seriesAndTimestamp struct {
Series
ts int64
}
seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash.
tempNumSamples := ev.currentSamples
seriesHelpers := make([][]EvalSeriesHelper, 2)
bufHelpers := make([][]EvalSeriesHelper, 2)
// Prepare a function to initialise series helpers with the grouping key.
buf := make([]byte, 0, 1024)
seriesHelpers[1] = make([]EvalSeriesHelper, len(matrixes[1]))
bufHelpers[1] = make([]EvalSeriesHelper, len(matrixes[1]))
for si, series := range matrixes[1] {
seriesHelpers[1][si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf)
}
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil {
ev.error(err)
}
// Reset number of samples in memory after each timestamp.
ev.currentSamples = tempNumSamples
// Gather input vectors for this timestamp.
for i := range []parser.Expr{aggExpr.Param, aggExpr.Expr} {
vectors[i] = vectors[i][:0]
bufHelpers[i] = bufHelpers[i][:0]
for si, series := range matrixes[i] {
switch {
case len(series.Floats) > 0 && series.Floats[0].T == ts:
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts})
// Move input vectors forward so we don't have to re-scan the same
// past points at the next step.
matrixes[i][si].Floats = series.Floats[1:]
case len(series.Histograms) > 0 && series.Histograms[0].T == ts:
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts})
matrixes[i][si].Histograms = series.Histograms[1:]
default:
continue
}
if seriesHelpers[i] != nil {
bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
}
ev.currentSamples++
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
}
args[i] = vectors[i]
ev.samplesStats.UpdatePeak(ev.currentSamples)
}
// Make the function call.
enh.Ts = ts
var param float64
if aggExpr.Param != nil {
param = args[0].(Vector)[0].F
}
result, ws := ev.aggregation(aggExpr, sortedGrouping, param, args[1].(Vector), bufHelpers[1], enh)
enh.Out = result[:0] // Reuse result vector.
warnings.Merge(ws)
vecNumSamples := result.TotalSamples()
ev.currentSamples += vecNumSamples
// When we reset currentSamples to tempNumSamples during the next iteration of the loop it also
// needs to include the samples from the result here, as they're still in memory.
tempNumSamples += vecNumSamples
ev.samplesStats.UpdatePeak(ev.currentSamples)
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
if result.ContainsSameLabelset() {
ev.errorf("vector cannot contain metrics with the same labelset")
}
mat := make(Matrix, len(result))
for i, s := range result {
if s.H == nil {
mat[i] = Series{Metric: s.Metric, Floats: []FPoint{{T: ts, F: s.F}}}
} else {
mat[i] = Series{Metric: s.Metric, Histograms: []HPoint{{T: ts, H: s.H}}}
}
}
ev.currentSamples = originalNumSamples + mat.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)
return mat, warnings
}
// Add samples in output vector to output series.
for _, sample := range result {
h := sample.Metric.Hash()
ss, ok := seriess[h]
if ok {
if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate.
ev.errorf("vector cannot contain metrics with the same labelset")
}
ss.ts = ts
} else {
ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts}
}
if sample.H == nil {
if ss.Floats == nil {
ss.Floats = getFPointSlice(numSteps)
}
ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F})
} else {
if ss.Histograms == nil {
ss.Histograms = getHPointSlice(numSteps)
}
ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H})
}
seriess[h] = ss
}
}
// Reuse the original point slices.
for _, m := range origMatrixes {
for _, s := range m {
putFPointSlice(s.Floats)
putHPointSlice(s.Histograms)
}
}
// Assemble the output matrix. By the time we get here we know we don't have too many samples.
mat := make(Matrix, 0, len(seriess))
for _, ss := range seriess {
mat = append(mat, ss.Series)
}
ev.currentSamples = originalNumSamples + mat.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)
return mat, warnings
}
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) {
@ -1343,12 +1509,6 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
sortedGrouping := e.Grouping
slices.Sort(sortedGrouping)
// Prepare a function to initialise series helpers with the grouping key.
buf := make([]byte, 0, 1024)
initSeries := func(series labels.Labels, h *EvalSeriesHelper) {
h.groupingKey, buf = generateGroupingKey(series, sortedGrouping, e.Without, buf)
}
unwrapParenExpr(&e.Param)
param := unwrapStepInvariantExpr(e.Param)
unwrapParenExpr(&param)
@ -1367,13 +1527,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
}, e.Expr)
}
return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
var param float64
if e.Param != nil {
param = v[0].(Vector)[0].F
}
return ev.aggregation(e, sortedGrouping, param, v[1].(Vector), sh[1], enh)
}, e.Param, e.Expr)
return ev.rangeEvalAgg(e, sortedGrouping)
case *parser.Call:
call := FunctionCalls[e.Func.Name]