diff --git a/promql/engine.go b/promql/engine.go index 9e0a6b17c..770550dac 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1279,29 +1279,19 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) return mat, warnings } -func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string) (Matrix, annotations.Annotations) { - originalNumSamples := ev.currentSamples +func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) { + // Keep a copy of the original point slice so that it can be returned to the pool. + origMatrix := slices.Clone(inputMatrix) + defer func() { + for _, s := range origMatrix { + putFPointSlice(s.Floats) + putHPointSlice(s.Histograms) + } + }() + var warnings annotations.Annotations - // param is the number k for topk/bottomk, or q for quantile. - var param float64 - if aggExpr.Param != nil { - val, ws := ev.eval(aggExpr.Param) - warnings.Merge(ws) - param = val.(Matrix)[0].Floats[0].F - } - // Now fetch the data to be aggregated. - // ev.currentSamples will be updated to the correct value within the ev.eval call. - val, ws := ev.eval(aggExpr.Expr) - warnings.Merge(ws) - inputMatrix := val.(Matrix) - - // Keep a copy of the original point slice so that it can be returned to the pool. - origMatrix := inputMatrix - - biggestLen := len(inputMatrix) enh := &EvalNodeHelper{} - seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. tempNumSamples := ev.currentSamples // Create a mapping from input series to output groups. @@ -1325,6 +1315,8 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping seriesToResult[si] = index } + seriess := make(map[uint64]Series, len(inputMatrix)) // Output series by series hash. + for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) @@ -1340,8 +1332,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { - ev.currentSamples = originalNumSamples + result.TotalSamples() - ev.samplesStats.UpdatePeak(ev.currentSamples) return result, warnings } if ev.currentSamples > ev.maxSamples { @@ -1349,18 +1339,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping } } - // Reuse the original point slice. - for _, s := range origMatrix { - putFPointSlice(s.Floats) - putHPointSlice(s.Histograms) - } // Assemble the output matrix. By the time we get here we know we don't have too many samples. mat := make(Matrix, 0, len(seriess)) for _, ss := range seriess { mat = append(mat, ss) } - ev.currentSamples = originalNumSamples + mat.TotalSamples() - ev.samplesStats.UpdatePeak(ev.currentSamples) return mat, warnings } @@ -1434,7 +1417,26 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio }, e.Expr) } - return ev.rangeEvalAgg(e, sortedGrouping) + var warnings annotations.Annotations + originalNumSamples := ev.currentSamples + // param is the number k for topk/bottomk, or q for quantile. + var fParam float64 + if param != nil { + val, ws := ev.eval(param) + warnings.Merge(ws) + fParam = val.(Matrix)[0].Floats[0].F + } + // Now fetch the data to be aggregated. + val, ws := ev.eval(e.Expr) + warnings.Merge(ws) + inputMatrix := val.(Matrix) + + result, ws := ev.rangeEvalAgg(e, sortedGrouping, inputMatrix, fParam) + warnings.Merge(ws) + ev.currentSamples = originalNumSamples + result.TotalSamples() + ev.samplesStats.UpdatePeak(ev.currentSamples) + + return result, warnings case *parser.Call: call := FunctionCalls[e.Func.Name]