diff --git a/promql/engine.go b/promql/engine.go index 0a2c5ff3f..91c5e9e24 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1291,6 +1291,172 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) return mat, warnings } +func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string) (Matrix, annotations.Annotations) { + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + matrixes := make([]Matrix, 2) + origMatrixes := make([]Matrix, 2) + originalNumSamples := ev.currentSamples + + var warnings annotations.Annotations + for i, e := range []parser.Expr{aggExpr.Param, aggExpr.Expr} { + // Functions will take string arguments from the expressions, not the values. + if e != nil && e.Type() != parser.ValueTypeString { + // ev.currentSamples will be updated to the correct value within the ev.eval call. + val, ws := ev.eval(e) + warnings.Merge(ws) + matrixes[i] = val.(Matrix) + + // Keep a copy of the original point slices so that they + // can be returned to the pool. + origMatrixes[i] = make(Matrix, len(matrixes[i])) + copy(origMatrixes[i], matrixes[i]) + } + } + + vectors := make([]Vector, 2) // Input vectors for the function. + args := make([]parser.Value, 2) // Argument to function. + biggestLen := len(matrixes[1]) + enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)} + type seriesAndTimestamp struct { + Series + ts int64 + } + seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash. + tempNumSamples := ev.currentSamples + + seriesHelpers := make([][]EvalSeriesHelper, 2) + bufHelpers := make([][]EvalSeriesHelper, 2) + // Prepare a function to initialise series helpers with the grouping key. + buf := make([]byte, 0, 1024) + + seriesHelpers[1] = make([]EvalSeriesHelper, len(matrixes[1])) + bufHelpers[1] = make([]EvalSeriesHelper, len(matrixes[1])) + + for si, series := range matrixes[1] { + seriesHelpers[1][si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) + } + + for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { + if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + ev.error(err) + } + // Reset number of samples in memory after each timestamp. + ev.currentSamples = tempNumSamples + // Gather input vectors for this timestamp. + for i := range []parser.Expr{aggExpr.Param, aggExpr.Expr} { + vectors[i] = vectors[i][:0] + bufHelpers[i] = bufHelpers[i][:0] + + for si, series := range matrixes[i] { + switch { + case len(series.Floats) > 0 && series.Floats[0].T == ts: + vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts}) + // Move input vectors forward so we don't have to re-scan the same + // past points at the next step. + matrixes[i][si].Floats = series.Floats[1:] + case len(series.Histograms) > 0 && series.Histograms[0].T == ts: + vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts}) + matrixes[i][si].Histograms = series.Histograms[1:] + default: + continue + } + if seriesHelpers[i] != nil { + bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si]) + } + ev.currentSamples++ + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + } + args[i] = vectors[i] + ev.samplesStats.UpdatePeak(ev.currentSamples) + } + + // Make the function call. + enh.Ts = ts + var param float64 + if aggExpr.Param != nil { + param = args[0].(Vector)[0].F + } + result, ws := ev.aggregation(aggExpr, sortedGrouping, param, args[1].(Vector), bufHelpers[1], enh) + + enh.Out = result[:0] // Reuse result vector. + warnings.Merge(ws) + + vecNumSamples := result.TotalSamples() + ev.currentSamples += vecNumSamples + // When we reset currentSamples to tempNumSamples during the next iteration of the loop it also + // needs to include the samples from the result here, as they're still in memory. + tempNumSamples += vecNumSamples + ev.samplesStats.UpdatePeak(ev.currentSamples) + + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + ev.samplesStats.UpdatePeak(ev.currentSamples) + + // If this could be an instant query, shortcut so as not to change sort order. + if ev.endTimestamp == ev.startTimestamp { + if result.ContainsSameLabelset() { + ev.errorf("vector cannot contain metrics with the same labelset") + } + mat := make(Matrix, len(result)) + for i, s := range result { + if s.H == nil { + mat[i] = Series{Metric: s.Metric, Floats: []FPoint{{T: ts, F: s.F}}} + } else { + mat[i] = Series{Metric: s.Metric, Histograms: []HPoint{{T: ts, H: s.H}}} + } + } + ev.currentSamples = originalNumSamples + mat.TotalSamples() + ev.samplesStats.UpdatePeak(ev.currentSamples) + return mat, warnings + } + + // Add samples in output vector to output series. + for _, sample := range result { + h := sample.Metric.Hash() + ss, ok := seriess[h] + if ok { + if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate. + ev.errorf("vector cannot contain metrics with the same labelset") + } + ss.ts = ts + } else { + ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts} + } + if sample.H == nil { + if ss.Floats == nil { + ss.Floats = getFPointSlice(numSteps) + } + ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F}) + } else { + if ss.Histograms == nil { + ss.Histograms = getHPointSlice(numSteps) + } + ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H}) + } + seriess[h] = ss + } + } + + // Reuse the original point slices. + for _, m := range origMatrixes { + for _, s := range m { + putFPointSlice(s.Floats) + putHPointSlice(s.Histograms) + } + } + // Assemble the output matrix. By the time we get here we know we don't have too many samples. + mat := make(Matrix, 0, len(seriess)) + for _, ss := range seriess { + mat = append(mat, ss.Series) + } + ev.currentSamples = originalNumSamples + mat.TotalSamples() + ev.samplesStats.UpdatePeak(ev.currentSamples) + return mat, warnings +} + // evalSubquery evaluates given SubqueryExpr and returns an equivalent // evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set. func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) { @@ -1343,12 +1509,6 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio sortedGrouping := e.Grouping slices.Sort(sortedGrouping) - // Prepare a function to initialise series helpers with the grouping key. - buf := make([]byte, 0, 1024) - initSeries := func(series labels.Labels, h *EvalSeriesHelper) { - h.groupingKey, buf = generateGroupingKey(series, sortedGrouping, e.Without, buf) - } - unwrapParenExpr(&e.Param) param := unwrapStepInvariantExpr(e.Param) unwrapParenExpr(¶m) @@ -1367,13 +1527,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio }, e.Expr) } - return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - var param float64 - if e.Param != nil { - param = v[0].(Vector)[0].F - } - return ev.aggregation(e, sortedGrouping, param, v[1].(Vector), sh[1], enh) - }, e.Param, e.Expr) + return ev.rangeEvalAgg(e, sortedGrouping) case *parser.Call: call := FunctionCalls[e.Func.Name]