From 8e04ab6dd4eb3fc3a2c67f974f07f8ac2b852fa8 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 16 Jan 2024 17:49:43 +0000 Subject: [PATCH 01/24] promql: refactor: extract generateGroupingLabels function Signed-off-by: Bryan Boreham --- promql/engine.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 2f7dcb222e..56a7774c68 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2691,19 +2691,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par group, ok := result[groupingKey] // Add a new group if it doesn't exist. if !ok { - var m labels.Labels - enh.resetBuilder(metric) - switch { - case without: - enh.lb.Del(grouping...) - enh.lb.Del(labels.MetricName) - m = enh.lb.Labels() - case len(grouping) > 0: - enh.lb.Keep(grouping...) - m = enh.lb.Labels() - default: - m = labels.EmptyLabels() - } + m := generateGroupingLabels(enh, metric, without, grouping) newAgg := &groupedAggregation{ labels: m, floatValue: s.F, @@ -2969,6 +2957,21 @@ func generateGroupingKey(metric labels.Labels, grouping []string, without bool, return metric.HashForLabels(buf, grouping...) } +func generateGroupingLabels(enh *EvalNodeHelper, metric labels.Labels, without bool, grouping []string) labels.Labels { + enh.resetBuilder(metric) + switch { + case without: + enh.lb.Del(grouping...) + enh.lb.Del(labels.MetricName) + return enh.lb.Labels() + case len(grouping) > 0: + enh.lb.Keep(grouping...) + return enh.lb.Labels() + default: + return labels.EmptyLabels() + } +} + // btos returns 1 if b is true, 0 otherwise. func btos(b bool) float64 { if b { From 29244fb84173313479e5c0603c793f66d9959e46 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 16 Jan 2024 16:22:34 +0000 Subject: [PATCH 02/24] promql: refactor: extract count_values implementation The existing aggregation function is very long and covers very different cases. `aggregationCountValues` is just for `count_values`, which differs from other aggregations in that it outputs as many series per group as there are values in the input. Remove the top-level switch on string parameter type; use the same `Op` check there as elswehere. Pull checking parameters out to caller, where it is only executed once. Signed-off-by: Bryan Boreham --- promql/engine.go | 99 +++++++++++++++++++++++++++++------------------- 1 file changed, 59 insertions(+), 40 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 56a7774c68..9074912d66 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1352,9 +1352,18 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio unwrapParenExpr(&e.Param) param := unwrapStepInvariantExpr(e.Param) unwrapParenExpr(¶m) - if s, ok := param.(*parser.StringLiteral); ok { - return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - return ev.aggregation(e, sortedGrouping, s.Val, v[0].(Vector), sh[0], enh) + + if e.Op == parser.COUNT_VALUES { + valueLabel := param.(*parser.StringLiteral) + if !model.LabelName(valueLabel.Val).IsValid() { + ev.errorf("invalid label name %q", valueLabel) + } + if !e.Without { + sortedGrouping = append(sortedGrouping, valueLabel.Val) + slices.Sort(sortedGrouping) + } + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + return ev.aggregationCountValues(e, sortedGrouping, valueLabel.Val, v[0].(Vector), enh) }, e.Expr) } @@ -2649,44 +2658,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par if op == parser.QUANTILE { q = param.(float64) } - var valueLabel string - var recomputeGroupingKey bool - if op == parser.COUNT_VALUES { - valueLabel = param.(string) - if !model.LabelName(valueLabel).IsValid() { - ev.errorf("invalid label name %q", valueLabel) - } - if !without { - // We're changing the grouping labels so we have to ensure they're still sorted - // and we have to flag to recompute the grouping key. Considering the count_values() - // operator is less frequently used than other aggregations, we're fine having to - // re-compute the grouping key on each step for this case. - grouping = append(grouping, valueLabel) - slices.Sort(grouping) - recomputeGroupingKey = true - } - } - var buf []byte for si, s := range vec { metric := s.Metric - - if op == parser.COUNT_VALUES { - enh.resetBuilder(metric) - enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64)) - metric = enh.lb.Labels() - - // We've changed the metric so we have to recompute the grouping key. - recomputeGroupingKey = true - } - - // We can use the pre-computed grouping key unless grouping labels have changed. - var groupingKey uint64 - if !recomputeGroupingKey { - groupingKey = seriesHelper[si].groupingKey - } else { - groupingKey, buf = generateGroupingKey(metric, grouping, without, buf) - } + groupingKey := seriesHelper[si].groupingKey group, ok := result[groupingKey] // Add a new group if it doesn't exist. @@ -2807,7 +2782,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par group.floatValue = s.F } - case parser.COUNT, parser.COUNT_VALUES: + case parser.COUNT: group.groupCount++ case parser.STDVAR, parser.STDDEV: @@ -2879,7 +2854,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par aggr.floatValue = aggr.floatMean } - case parser.COUNT, parser.COUNT_VALUES: + case parser.COUNT: aggr.floatValue = float64(aggr.groupCount) case parser.STDVAR: @@ -2942,6 +2917,50 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par return enh.Out, annos } +// aggregationK evaluates count_values on vec. +// Outputs as many series per group as there are values in the input. +func (ev *evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping []string, valueLabel string, vec Vector, enh *EvalNodeHelper) (Vector, annotations.Annotations) { + result := map[uint64]*groupedAggregation{} + orderedResult := []*groupedAggregation{} + + var buf []byte + for _, s := range vec { + enh.resetBuilder(s.Metric) + enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64)) + metric := enh.lb.Labels() + + // Considering the count_values() + // operator is less frequently used than other aggregations, we're fine having to + // re-compute the grouping key on each step for this case. + var groupingKey uint64 + groupingKey, buf = generateGroupingKey(metric, grouping, e.Without, buf) + + group, ok := result[groupingKey] + // Add a new group if it doesn't exist. + if !ok { + newAgg := &groupedAggregation{ + labels: generateGroupingLabels(enh, metric, e.Without, grouping), + groupCount: 1, + } + + result[groupingKey] = newAgg + orderedResult = append(orderedResult, newAgg) + continue + } + + group.groupCount++ + } + + // Construct the result Vector from the aggregated groups. + for _, aggr := range orderedResult { + enh.Out = append(enh.Out, Sample{ + Metric: aggr.labels, + F: float64(aggr.groupCount), + }) + } + return enh.Out, nil +} + // groupingKey builds and returns the grouping key for the given metric and // grouping labels. func generateGroupingKey(metric labels.Labels, grouping []string, without bool, buf []byte) (uint64, []byte) { From e5f667537c94dbd1e5ad70d766c0c27ea5cb8830 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 17 Jan 2024 10:57:02 +0000 Subject: [PATCH 03/24] promql: refactor: initialize aggregation before storing in map This seems more consistent to me. Signed-off-by: Bryan Boreham --- promql/engine.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 9074912d66..0a2c5ff3fc 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2686,9 +2686,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par newAgg.groupCount = 0 } - result[groupingKey] = newAgg - orderedResult = append(orderedResult, newAgg) - inputVecLen := int64(len(vec)) resultSize := k switch { @@ -2699,22 +2696,25 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par } switch op { case parser.STDVAR, parser.STDDEV: - result[groupingKey].floatValue = 0 + newAgg.floatValue = 0 case parser.TOPK, parser.QUANTILE: - result[groupingKey].heap = make(vectorByValueHeap, 1, resultSize) - result[groupingKey].heap[0] = Sample{ + newAgg.heap = make(vectorByValueHeap, 1, resultSize) + newAgg.heap[0] = Sample{ F: s.F, Metric: s.Metric, } case parser.BOTTOMK: - result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 1, resultSize) - result[groupingKey].reverseHeap[0] = Sample{ + newAgg.reverseHeap = make(vectorByReverseValueHeap, 1, resultSize) + newAgg.reverseHeap[0] = Sample{ F: s.F, Metric: s.Metric, } case parser.GROUP: - result[groupingKey].floatValue = 1 + newAgg.floatValue = 1 } + + result[groupingKey] = newAgg + orderedResult = append(orderedResult, newAgg) continue } From 5f10d17cef1a042ea7c9cd99026a502f7ae7c80e Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 27 Feb 2024 06:38:49 +0000 Subject: [PATCH 04/24] promql: refactor: split out aggregations over range The new function `rangeEvalAgg` is mostly a copy of `rangeEval`, but without `initSeries` which we don't need and inlining the callback to `aggregation()`. Signed-off-by: Bryan Boreham --- promql/engine.go | 180 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 167 insertions(+), 13 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 0a2c5ff3fc..91c5e9e249 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1291,6 +1291,172 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) return mat, warnings } +func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string) (Matrix, annotations.Annotations) { + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + matrixes := make([]Matrix, 2) + origMatrixes := make([]Matrix, 2) + originalNumSamples := ev.currentSamples + + var warnings annotations.Annotations + for i, e := range []parser.Expr{aggExpr.Param, aggExpr.Expr} { + // Functions will take string arguments from the expressions, not the values. + if e != nil && e.Type() != parser.ValueTypeString { + // ev.currentSamples will be updated to the correct value within the ev.eval call. + val, ws := ev.eval(e) + warnings.Merge(ws) + matrixes[i] = val.(Matrix) + + // Keep a copy of the original point slices so that they + // can be returned to the pool. + origMatrixes[i] = make(Matrix, len(matrixes[i])) + copy(origMatrixes[i], matrixes[i]) + } + } + + vectors := make([]Vector, 2) // Input vectors for the function. + args := make([]parser.Value, 2) // Argument to function. + biggestLen := len(matrixes[1]) + enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)} + type seriesAndTimestamp struct { + Series + ts int64 + } + seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash. + tempNumSamples := ev.currentSamples + + seriesHelpers := make([][]EvalSeriesHelper, 2) + bufHelpers := make([][]EvalSeriesHelper, 2) + // Prepare a function to initialise series helpers with the grouping key. + buf := make([]byte, 0, 1024) + + seriesHelpers[1] = make([]EvalSeriesHelper, len(matrixes[1])) + bufHelpers[1] = make([]EvalSeriesHelper, len(matrixes[1])) + + for si, series := range matrixes[1] { + seriesHelpers[1][si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) + } + + for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { + if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + ev.error(err) + } + // Reset number of samples in memory after each timestamp. + ev.currentSamples = tempNumSamples + // Gather input vectors for this timestamp. + for i := range []parser.Expr{aggExpr.Param, aggExpr.Expr} { + vectors[i] = vectors[i][:0] + bufHelpers[i] = bufHelpers[i][:0] + + for si, series := range matrixes[i] { + switch { + case len(series.Floats) > 0 && series.Floats[0].T == ts: + vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts}) + // Move input vectors forward so we don't have to re-scan the same + // past points at the next step. + matrixes[i][si].Floats = series.Floats[1:] + case len(series.Histograms) > 0 && series.Histograms[0].T == ts: + vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts}) + matrixes[i][si].Histograms = series.Histograms[1:] + default: + continue + } + if seriesHelpers[i] != nil { + bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si]) + } + ev.currentSamples++ + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + } + args[i] = vectors[i] + ev.samplesStats.UpdatePeak(ev.currentSamples) + } + + // Make the function call. + enh.Ts = ts + var param float64 + if aggExpr.Param != nil { + param = args[0].(Vector)[0].F + } + result, ws := ev.aggregation(aggExpr, sortedGrouping, param, args[1].(Vector), bufHelpers[1], enh) + + enh.Out = result[:0] // Reuse result vector. + warnings.Merge(ws) + + vecNumSamples := result.TotalSamples() + ev.currentSamples += vecNumSamples + // When we reset currentSamples to tempNumSamples during the next iteration of the loop it also + // needs to include the samples from the result here, as they're still in memory. + tempNumSamples += vecNumSamples + ev.samplesStats.UpdatePeak(ev.currentSamples) + + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + ev.samplesStats.UpdatePeak(ev.currentSamples) + + // If this could be an instant query, shortcut so as not to change sort order. + if ev.endTimestamp == ev.startTimestamp { + if result.ContainsSameLabelset() { + ev.errorf("vector cannot contain metrics with the same labelset") + } + mat := make(Matrix, len(result)) + for i, s := range result { + if s.H == nil { + mat[i] = Series{Metric: s.Metric, Floats: []FPoint{{T: ts, F: s.F}}} + } else { + mat[i] = Series{Metric: s.Metric, Histograms: []HPoint{{T: ts, H: s.H}}} + } + } + ev.currentSamples = originalNumSamples + mat.TotalSamples() + ev.samplesStats.UpdatePeak(ev.currentSamples) + return mat, warnings + } + + // Add samples in output vector to output series. + for _, sample := range result { + h := sample.Metric.Hash() + ss, ok := seriess[h] + if ok { + if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate. + ev.errorf("vector cannot contain metrics with the same labelset") + } + ss.ts = ts + } else { + ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts} + } + if sample.H == nil { + if ss.Floats == nil { + ss.Floats = getFPointSlice(numSteps) + } + ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F}) + } else { + if ss.Histograms == nil { + ss.Histograms = getHPointSlice(numSteps) + } + ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H}) + } + seriess[h] = ss + } + } + + // Reuse the original point slices. + for _, m := range origMatrixes { + for _, s := range m { + putFPointSlice(s.Floats) + putHPointSlice(s.Histograms) + } + } + // Assemble the output matrix. By the time we get here we know we don't have too many samples. + mat := make(Matrix, 0, len(seriess)) + for _, ss := range seriess { + mat = append(mat, ss.Series) + } + ev.currentSamples = originalNumSamples + mat.TotalSamples() + ev.samplesStats.UpdatePeak(ev.currentSamples) + return mat, warnings +} + // evalSubquery evaluates given SubqueryExpr and returns an equivalent // evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set. func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) { @@ -1343,12 +1509,6 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio sortedGrouping := e.Grouping slices.Sort(sortedGrouping) - // Prepare a function to initialise series helpers with the grouping key. - buf := make([]byte, 0, 1024) - initSeries := func(series labels.Labels, h *EvalSeriesHelper) { - h.groupingKey, buf = generateGroupingKey(series, sortedGrouping, e.Without, buf) - } - unwrapParenExpr(&e.Param) param := unwrapStepInvariantExpr(e.Param) unwrapParenExpr(¶m) @@ -1367,13 +1527,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio }, e.Expr) } - return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - var param float64 - if e.Param != nil { - param = v[0].(Vector)[0].F - } - return ev.aggregation(e, sortedGrouping, param, v[1].(Vector), sh[1], enh) - }, e.Param, e.Expr) + return ev.rangeEvalAgg(e, sortedGrouping) case *parser.Call: call := FunctionCalls[e.Func.Name] From bd9bdccb22bb0ba99dd2da29de6d1d96d47fa0d0 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 27 Feb 2024 07:00:35 +0000 Subject: [PATCH 05/24] promql: refactor: simplify internal data structures Signed-off-by: Bryan Boreham --- promql/engine.go | 83 ++++++++++++++++++++---------------------------- 1 file changed, 35 insertions(+), 48 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 91c5e9e249..821a0e3698 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1293,29 +1293,27 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string) (Matrix, annotations.Annotations) { numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 - matrixes := make([]Matrix, 2) - origMatrixes := make([]Matrix, 2) originalNumSamples := ev.currentSamples - var warnings annotations.Annotations - for i, e := range []parser.Expr{aggExpr.Param, aggExpr.Expr} { - // Functions will take string arguments from the expressions, not the values. - if e != nil && e.Type() != parser.ValueTypeString { - // ev.currentSamples will be updated to the correct value within the ev.eval call. - val, ws := ev.eval(e) - warnings.Merge(ws) - matrixes[i] = val.(Matrix) - // Keep a copy of the original point slices so that they - // can be returned to the pool. - origMatrixes[i] = make(Matrix, len(matrixes[i])) - copy(origMatrixes[i], matrixes[i]) - } + // param is the number k for topk/bottomk. + var param float64 + if aggExpr.Param != nil { + val, ws := ev.eval(aggExpr.Param) + warnings.Merge(ws) + param = val.(Matrix)[0].Floats[0].F } + // Now fetch the data to be aggregated. + // ev.currentSamples will be updated to the correct value within the ev.eval call. + val, ws := ev.eval(aggExpr.Expr) + warnings.Merge(ws) + inputMatrix := val.(Matrix) - vectors := make([]Vector, 2) // Input vectors for the function. - args := make([]parser.Value, 2) // Argument to function. - biggestLen := len(matrixes[1]) + // Keep a copy of the original point slice so that it can be returned to the pool. + origMatrix := inputMatrix + + var vector Vector // Input vectors for the function. + biggestLen := len(inputMatrix) enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)} type seriesAndTimestamp struct { Series @@ -1324,16 +1322,14 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash. tempNumSamples := ev.currentSamples - seriesHelpers := make([][]EvalSeriesHelper, 2) - bufHelpers := make([][]EvalSeriesHelper, 2) - // Prepare a function to initialise series helpers with the grouping key. + // Initialise series helpers with the grouping key. buf := make([]byte, 0, 1024) - seriesHelpers[1] = make([]EvalSeriesHelper, len(matrixes[1])) - bufHelpers[1] = make([]EvalSeriesHelper, len(matrixes[1])) + seriesHelper := make([]EvalSeriesHelper, len(inputMatrix)) + bufHelper := make([]EvalSeriesHelper, len(inputMatrix)) - for si, series := range matrixes[1] { - seriesHelpers[1][si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) + for si, series := range inputMatrix { + seriesHelper[si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) } for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { @@ -1343,42 +1339,35 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // Reset number of samples in memory after each timestamp. ev.currentSamples = tempNumSamples // Gather input vectors for this timestamp. - for i := range []parser.Expr{aggExpr.Param, aggExpr.Expr} { - vectors[i] = vectors[i][:0] - bufHelpers[i] = bufHelpers[i][:0] + { + vector = vector[:0] + bufHelper = bufHelper[:0] - for si, series := range matrixes[i] { + for si, series := range inputMatrix { switch { case len(series.Floats) > 0 && series.Floats[0].T == ts: - vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts}) + vector = append(vector, Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts}) // Move input vectors forward so we don't have to re-scan the same // past points at the next step. - matrixes[i][si].Floats = series.Floats[1:] + inputMatrix[si].Floats = series.Floats[1:] case len(series.Histograms) > 0 && series.Histograms[0].T == ts: - vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts}) - matrixes[i][si].Histograms = series.Histograms[1:] + vector = append(vector, Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts}) + inputMatrix[si].Histograms = series.Histograms[1:] default: continue } - if seriesHelpers[i] != nil { - bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si]) - } + bufHelper = append(bufHelper, seriesHelper[si]) ev.currentSamples++ if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } } - args[i] = vectors[i] ev.samplesStats.UpdatePeak(ev.currentSamples) } // Make the function call. enh.Ts = ts - var param float64 - if aggExpr.Param != nil { - param = args[0].(Vector)[0].F - } - result, ws := ev.aggregation(aggExpr, sortedGrouping, param, args[1].(Vector), bufHelpers[1], enh) + result, ws := ev.aggregation(aggExpr, sortedGrouping, param, vector, bufHelper, enh) enh.Out = result[:0] // Reuse result vector. warnings.Merge(ws) @@ -1440,12 +1429,10 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping } } - // Reuse the original point slices. - for _, m := range origMatrixes { - for _, s := range m { - putFPointSlice(s.Floats) - putHPointSlice(s.Histograms) - } + // Reuse the original point slice. + for _, s := range origMatrix { + putFPointSlice(s.Floats) + putHPointSlice(s.Histograms) } // Assemble the output matrix. By the time we get here we know we don't have too many samples. mat := make(Matrix, 0, len(seriess)) From 59548b8a0b8f7f4e35d9b1fcd43c26f6d7cd10f5 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 27 Feb 2024 08:19:01 +0000 Subject: [PATCH 06/24] promql: refactor: move collection of results into aggregation() We don't need to check for duplicates as aggregation cannot generate them. Signed-off-by: Bryan Boreham --- promql/engine.go | 73 ++++++++++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 821a0e3698..b73364bc87 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1292,7 +1292,6 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) } func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string) (Matrix, annotations.Annotations) { - numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 originalNumSamples := ev.currentSamples var warnings annotations.Annotations @@ -1315,11 +1314,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping var vector Vector // Input vectors for the function. biggestLen := len(inputMatrix) enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)} - type seriesAndTimestamp struct { - Series - ts int64 - } - seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash. + seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. tempNumSamples := ev.currentSamples // Initialise series helpers with the grouping key. @@ -1367,7 +1362,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // Make the function call. enh.Ts = ts - result, ws := ev.aggregation(aggExpr, sortedGrouping, param, vector, bufHelper, enh) + result, ws := ev.aggregation(aggExpr, sortedGrouping, param, vector, bufHelper, enh, seriess) enh.Out = result[:0] // Reuse result vector. warnings.Merge(ws) @@ -1386,9 +1381,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { - if result.ContainsSameLabelset() { - ev.errorf("vector cannot contain metrics with the same labelset") - } mat := make(Matrix, len(result)) for i, s := range result { if s.H == nil { @@ -1401,32 +1393,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping ev.samplesStats.UpdatePeak(ev.currentSamples) return mat, warnings } - - // Add samples in output vector to output series. - for _, sample := range result { - h := sample.Metric.Hash() - ss, ok := seriess[h] - if ok { - if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate. - ev.errorf("vector cannot contain metrics with the same labelset") - } - ss.ts = ts - } else { - ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts} - } - if sample.H == nil { - if ss.Floats == nil { - ss.Floats = getFPointSlice(numSteps) - } - ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F}) - } else { - if ss.Histograms == nil { - ss.Histograms = getHPointSlice(numSteps) - } - ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H}) - } - seriess[h] = ss - } } // Reuse the original point slice. @@ -1437,7 +1403,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // Assemble the output matrix. By the time we get here we know we don't have too many samples. mat := make(Matrix, 0, len(seriess)) for _, ss := range seriess { - mat = append(mat, ss.Series) + mat = append(mat, ss) } ev.currentSamples = originalNumSamples + mat.TotalSamples() ev.samplesStats.UpdatePeak(ev.currentSamples) @@ -2778,7 +2744,7 @@ type groupedAggregation struct { // aggregation evaluates an aggregation operation on a Vector. The provided grouping labels // must be sorted. -func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { +func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Vector, annotations.Annotations) { op := e.Op without := e.Without var annos annotations.Annotations @@ -3055,7 +3021,36 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par H: aggr.histogramValue, }) } - return enh.Out, annos + + ts := enh.Ts + // If this could be an instant query, shortcut so as not to change sort order. + if ev.endTimestamp == ev.startTimestamp { + return enh.Out, annos + } + + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + // Add samples in output vector to output series. + for _, sample := range enh.Out { + h := sample.Metric.Hash() + ss, ok := seriess[h] + if !ok { + ss = Series{Metric: sample.Metric} + } + if sample.H == nil { + if ss.Floats == nil { + ss.Floats = getFPointSlice(numSteps) + } + ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F}) + } else { + if ss.Histograms == nil { + ss.Histograms = getHPointSlice(numSteps) + } + ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H}) + } + seriess[h] = ss + } + + return nil, annos } // aggregationK evaluates count_values on vec. From 3851b74db1923ac414a7a0b88a81c9b663efd1ab Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 29 Feb 2024 23:21:46 +0000 Subject: [PATCH 07/24] promql: aggregations: skip result vector in range queries Adjust test to match the lower count, since samples in the vector are no longer counted. Signed-off-by: Bryan Boreham --- promql/engine.go | 89 +++++++++++++++++-------------------------- promql/engine_test.go | 2 +- 2 files changed, 35 insertions(+), 56 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index b73364bc87..5757604b7a 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1367,18 +1367,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping enh.Out = result[:0] // Reuse result vector. warnings.Merge(ws) - vecNumSamples := result.TotalSamples() - ev.currentSamples += vecNumSamples - // When we reset currentSamples to tempNumSamples during the next iteration of the loop it also - // needs to include the samples from the result here, as they're still in memory. - tempNumSamples += vecNumSamples - ev.samplesStats.UpdatePeak(ev.currentSamples) - - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } - ev.samplesStats.UpdatePeak(ev.currentSamples) - // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { mat := make(Matrix, len(result)) @@ -1393,6 +1381,9 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping ev.samplesStats.UpdatePeak(ev.currentSamples) return mat, warnings } + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } } // Reuse the original point slice. @@ -2946,7 +2937,33 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par } } - // Construct the result Vector from the aggregated groups. + // Construct the result from the aggregated groups. + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + add := func(lbls labels.Labels, f float64, h *histogram.FloatHistogram) { + // If this could be an instant query, build a slice so the result is in consistent order. + if ev.endTimestamp == ev.startTimestamp { + enh.Out = append(enh.Out, Sample{Metric: lbls, F: f, H: h}) + } else { + // Otherwise the results are added into seriess elements. + hash := lbls.Hash() + ss, ok := seriess[hash] + if !ok { + ss = Series{Metric: lbls} + } + if h == nil { + if ss.Floats == nil { + ss.Floats = getFPointSlice(numSteps) + } + ss.Floats = append(ss.Floats, FPoint{T: enh.Ts, F: f}) + } else { + if ss.Histograms == nil { + ss.Histograms = getHPointSlice(numSteps) + } + ss.Histograms = append(ss.Histograms, HPoint{T: enh.Ts, H: h}) + } + seriess[hash] = ss + } + } for _, aggr := range orderedResult { switch op { case parser.AVG: @@ -2976,10 +2993,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par sort.Sort(sort.Reverse(aggr.heap)) } for _, v := range aggr.heap { - enh.Out = append(enh.Out, Sample{ - Metric: v.Metric, - F: v.F, - }) + add(v.Metric, v.F, nil) } continue // Bypass default append. @@ -2989,10 +3003,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par sort.Sort(sort.Reverse(aggr.reverseHeap)) } for _, v := range aggr.reverseHeap { - enh.Out = append(enh.Out, Sample{ - Metric: v.Metric, - F: v.F, - }) + add(v.Metric, v.F, nil) } continue // Bypass default append. @@ -3015,42 +3026,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par // For other aggregations, we already have the right value. } - enh.Out = append(enh.Out, Sample{ - Metric: aggr.labels, - F: aggr.floatValue, - H: aggr.histogramValue, - }) + add(aggr.labels, aggr.floatValue, aggr.histogramValue) } - ts := enh.Ts - // If this could be an instant query, shortcut so as not to change sort order. - if ev.endTimestamp == ev.startTimestamp { - return enh.Out, annos - } - - numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 - // Add samples in output vector to output series. - for _, sample := range enh.Out { - h := sample.Metric.Hash() - ss, ok := seriess[h] - if !ok { - ss = Series{Metric: sample.Metric} - } - if sample.H == nil { - if ss.Floats == nil { - ss.Floats = getFPointSlice(numSteps) - } - ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F}) - } else { - if ss.Histograms == nil { - ss.Histograms = getHPointSlice(numSteps) - } - ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H}) - } - seriess[h] = ss - } - - return nil, annos + return enh.Out, annos } // aggregationK evaluates count_values on vec. diff --git a/promql/engine_test.go b/promql/engine_test.go index 13731efd45..0202c15ae1 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -966,7 +966,7 @@ load 10s { Query: "sum by (b) (max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))", Start: time.Unix(201, 0), - PeakSamples: 8, + PeakSamples: 7, TotalSamples: 12, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 12, From c9b6c4c55ae89f9ab4b4f232247eae67d4062c40 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 29 Feb 2024 23:39:29 +0000 Subject: [PATCH 08/24] promql: aggregations: output directly to matrix for instant queries Signed-off-by: Bryan Boreham --- promql/engine.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 5757604b7a..2e5f620e32 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1313,7 +1313,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping var vector Vector // Input vectors for the function. biggestLen := len(inputMatrix) - enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)} + enh := &EvalNodeHelper{} seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. tempNumSamples := ev.currentSamples @@ -1364,22 +1364,13 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping enh.Ts = ts result, ws := ev.aggregation(aggExpr, sortedGrouping, param, vector, bufHelper, enh, seriess) - enh.Out = result[:0] // Reuse result vector. warnings.Merge(ws) // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { - mat := make(Matrix, len(result)) - for i, s := range result { - if s.H == nil { - mat[i] = Series{Metric: s.Metric, Floats: []FPoint{{T: ts, F: s.F}}} - } else { - mat[i] = Series{Metric: s.Metric, Histograms: []HPoint{{T: ts, H: s.H}}} - } - } - ev.currentSamples = originalNumSamples + mat.TotalSamples() + ev.currentSamples = originalNumSamples + result.TotalSamples() ev.samplesStats.UpdatePeak(ev.currentSamples) - return mat, warnings + return result, warnings } if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) @@ -2735,7 +2726,7 @@ type groupedAggregation struct { // aggregation evaluates an aggregation operation on a Vector. The provided grouping labels // must be sorted. -func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Vector, annotations.Annotations) { +func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { op := e.Op without := e.Without var annos annotations.Annotations @@ -2749,7 +2740,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par } k = int64(f) if k < 1 { - return Vector{}, annos + return nil, annos } } var q float64 @@ -2939,10 +2930,19 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par // Construct the result from the aggregated groups. numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + var mat Matrix + if ev.endTimestamp == ev.startTimestamp { + mat = make(Matrix, 0, len(orderedResult)) + } + add := func(lbls labels.Labels, f float64, h *histogram.FloatHistogram) { - // If this could be an instant query, build a slice so the result is in consistent order. + // If this could be an instant query, add directly to the matrix so the result is in consistent order. if ev.endTimestamp == ev.startTimestamp { - enh.Out = append(enh.Out, Sample{Metric: lbls, F: f, H: h}) + if h == nil { + mat = append(mat, Series{Metric: lbls, Floats: []FPoint{{T: enh.Ts, F: f}}}) + } else { + mat = append(mat, Series{Metric: lbls, Histograms: []HPoint{{T: enh.Ts, H: h}}}) + } } else { // Otherwise the results are added into seriess elements. hash := lbls.Hash() @@ -3029,7 +3029,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par add(aggr.labels, aggr.floatValue, aggr.histogramValue) } - return enh.Out, annos + return mat, annos } // aggregationK evaluates count_values on vec. From b3bda7df4b7d1e5e47edb0bb3b7ea9d3057fefdc Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 1 Mar 2024 16:02:54 +0000 Subject: [PATCH 09/24] promql: aggregations: skip copying input to a Vector We can work directly from the inputMatrix on each timestep. Signed-off-by: Bryan Boreham --- promql/engine.go | 55 ++++++++++++++++++++---------------------------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 2e5f620e32..5fb1d849df 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1311,7 +1311,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // Keep a copy of the original point slice so that it can be returned to the pool. origMatrix := inputMatrix - var vector Vector // Input vectors for the function. biggestLen := len(inputMatrix) enh := &EvalNodeHelper{} seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. @@ -1321,7 +1320,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping buf := make([]byte, 0, 1024) seriesHelper := make([]EvalSeriesHelper, len(inputMatrix)) - bufHelper := make([]EvalSeriesHelper, len(inputMatrix)) for si, series := range inputMatrix { seriesHelper[si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) @@ -1333,36 +1331,10 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping } // Reset number of samples in memory after each timestamp. ev.currentSamples = tempNumSamples - // Gather input vectors for this timestamp. - { - vector = vector[:0] - bufHelper = bufHelper[:0] - - for si, series := range inputMatrix { - switch { - case len(series.Floats) > 0 && series.Floats[0].T == ts: - vector = append(vector, Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts}) - // Move input vectors forward so we don't have to re-scan the same - // past points at the next step. - inputMatrix[si].Floats = series.Floats[1:] - case len(series.Histograms) > 0 && series.Histograms[0].T == ts: - vector = append(vector, Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts}) - inputMatrix[si].Histograms = series.Histograms[1:] - default: - continue - } - bufHelper = append(bufHelper, seriesHelper[si]) - ev.currentSamples++ - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } - } - ev.samplesStats.UpdatePeak(ev.currentSamples) - } // Make the function call. enh.Ts = ts - result, ws := ev.aggregation(aggExpr, sortedGrouping, param, vector, bufHelper, enh, seriess) + result, ws := ev.aggregation(aggExpr, sortedGrouping, param, inputMatrix, seriesHelper, enh, seriess) warnings.Merge(ws) @@ -2726,7 +2698,7 @@ type groupedAggregation struct { // aggregation evaluates an aggregation operation on a Vector. The provided grouping labels // must be sorted. -func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { +func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, inputMatrix Matrix, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { op := e.Op without := e.Without var annos annotations.Annotations @@ -2748,7 +2720,26 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par q = param.(float64) } - for si, s := range vec { + for si, series := range inputMatrix { + var s Sample + + switch { + case len(series.Floats) > 0 && series.Floats[0].T == enh.Ts: + s = Sample{Metric: series.Metric, F: series.Floats[0].F, T: enh.Ts} + // Move input vectors forward so we don't have to re-scan the same + // past points at the next step. + inputMatrix[si].Floats = series.Floats[1:] + case len(series.Histograms) > 0 && series.Histograms[0].T == enh.Ts: + s = Sample{Metric: series.Metric, H: series.Histograms[0].H, T: enh.Ts} + inputMatrix[si].Histograms = series.Histograms[1:] + default: + continue + } + ev.currentSamples++ + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + metric := s.Metric groupingKey := seriesHelper[si].groupingKey @@ -2775,7 +2766,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par newAgg.groupCount = 0 } - inputVecLen := int64(len(vec)) + inputVecLen := int64(len(inputMatrix)) resultSize := k switch { case k > inputVecLen: From cb6c4b3092ce7e42a2fe08011c5ad283ff17c64e Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 1 Mar 2024 16:01:20 +0000 Subject: [PATCH 10/24] promql: simplify k/q parameter to topk/bottomk/quantile Pass it as a float64 not as interface{}. Make k a simple int, since that is the parameter to make(). Pull invalid quantile warning out of the loop. Signed-off-by: Bryan Boreham --- promql/engine.go | 40 ++++++++++++++++------------------------ 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 5fb1d849df..35dc52942c 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1295,7 +1295,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping originalNumSamples := ev.currentSamples var warnings annotations.Annotations - // param is the number k for topk/bottomk. + // param is the number k for topk/bottomk, or q for quantile. var param float64 if aggExpr.Param != nil { val, ws := ev.eval(aggExpr.Param) @@ -2698,26 +2698,29 @@ type groupedAggregation struct { // aggregation evaluates an aggregation operation on a Vector. The provided grouping labels // must be sorted. -func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, param interface{}, inputMatrix Matrix, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { +func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, q float64, inputMatrix Matrix, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { op := e.Op without := e.Without var annos annotations.Annotations result := map[uint64]*groupedAggregation{} orderedResult := []*groupedAggregation{} - var k int64 + k := 1 if op == parser.TOPK || op == parser.BOTTOMK { - f := param.(float64) - if !convertibleToInt64(f) { - ev.errorf("Scalar value %v overflows int64", f) + if !convertibleToInt64(q) { + ev.errorf("Scalar value %v overflows int64", q) + } + k = int(q) + if k > len(inputMatrix) { + k = len(inputMatrix) } - k = int64(f) if k < 1 { return nil, annos } } - var q float64 if op == parser.QUANTILE { - q = param.(float64) + if math.IsNaN(q) || q < 0 || q > 1 { + annos.Add(annotations.NewInvalidQuantileWarning(q, e.Param.PositionRange())) + } } for si, series := range inputMatrix { @@ -2766,25 +2769,17 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par newAgg.groupCount = 0 } - inputVecLen := int64(len(inputMatrix)) - resultSize := k - switch { - case k > inputVecLen: - resultSize = inputVecLen - case k == 0: - resultSize = 1 - } switch op { case parser.STDVAR, parser.STDDEV: newAgg.floatValue = 0 case parser.TOPK, parser.QUANTILE: - newAgg.heap = make(vectorByValueHeap, 1, resultSize) + newAgg.heap = make(vectorByValueHeap, 1, k) newAgg.heap[0] = Sample{ F: s.F, Metric: s.Metric, } case parser.BOTTOMK: - newAgg.reverseHeap = make(vectorByReverseValueHeap, 1, resultSize) + newAgg.reverseHeap = make(vectorByReverseValueHeap, 1, k) newAgg.reverseHeap[0] = Sample{ F: s.F, Metric: s.Metric, @@ -2876,7 +2871,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par case parser.TOPK: // We build a heap of up to k elements, with the smallest element at heap[0]. switch { - case int64(len(group.heap)) < k: + case len(group.heap) < k: heap.Push(&group.heap, &Sample{ F: s.F, Metric: s.Metric, @@ -2895,7 +2890,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par case parser.BOTTOMK: // We build a heap of up to k elements, with the biggest element at heap[0]. switch { - case int64(len(group.reverseHeap)) < k: + case len(group.reverseHeap) < k: heap.Push(&group.reverseHeap, &Sample{ F: s.F, Metric: s.Metric, @@ -2999,9 +2994,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, par continue // Bypass default append. case parser.QUANTILE: - if math.IsNaN(q) || q < 0 || q > 1 { - annos.Add(annotations.NewInvalidQuantileWarning(q, e.Param.PositionRange())) - } aggr.floatValue = quantile(q, aggr.heap) case parser.SUM: From 53a3138eeb54eeb7331c300cebd2d0122c8ba796 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 2 Mar 2024 12:52:35 +0000 Subject: [PATCH 11/24] promql aggregations: pre-generate mapping from inputs to outputs So we don't have to re-create it on every time step. Signed-off-by: Bryan Boreham --- promql/engine.go | 77 ++++++++++++++++++++++++++---------------------- 1 file changed, 41 insertions(+), 36 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 35dc52942c..140c0a0e49 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1067,8 +1067,6 @@ func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws annotations.Anno // EvalSeriesHelper stores extra information about a series. type EvalSeriesHelper struct { - // The grouping key used by aggregation. - groupingKey uint64 // Used to map left-hand to right-hand in binary operations. signature string } @@ -1316,13 +1314,25 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. tempNumSamples := ev.currentSamples - // Initialise series helpers with the grouping key. + // Create a mapping from input series to output groups. buf := make([]byte, 0, 1024) - - seriesHelper := make([]EvalSeriesHelper, len(inputMatrix)) + groupToResultIndex := make(map[uint64]int) + seriesToResult := make([]int, len(inputMatrix)) + orderedResult := make([]*groupedAggregation, 0, 16) for si, series := range inputMatrix { - seriesHelper[si].groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) + var groupingKey uint64 + groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) + index, ok := groupToResultIndex[groupingKey] + // Add a new group if it doesn't exist. + if !ok { + m := generateGroupingLabels(enh, series.Metric, aggExpr.Without, sortedGrouping) + newAgg := &groupedAggregation{labels: m} + index = len(orderedResult) + groupToResultIndex[groupingKey] = index + orderedResult = append(orderedResult, newAgg) + } + seriesToResult[si] = index } for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { @@ -1334,7 +1344,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // Make the function call. enh.Ts = ts - result, ws := ev.aggregation(aggExpr, sortedGrouping, param, inputMatrix, seriesHelper, enh, seriess) + result, ws := ev.aggregation(aggExpr, param, inputMatrix, seriesToResult, orderedResult, enh, seriess) warnings.Merge(ws) @@ -2698,12 +2708,10 @@ type groupedAggregation struct { // aggregation evaluates an aggregation operation on a Vector. The provided grouping labels // must be sorted. -func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, q float64, inputMatrix Matrix, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { +func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { op := e.Op - without := e.Without var annos annotations.Annotations - result := map[uint64]*groupedAggregation{} - orderedResult := []*groupedAggregation{} + seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp. k := 1 if op == parser.TOPK || op == parser.BOTTOMK { if !convertibleToInt64(q) { @@ -2743,53 +2751,47 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, q f ev.error(ErrTooManySamples(env)) } - metric := s.Metric - groupingKey := seriesHelper[si].groupingKey - - group, ok := result[groupingKey] - // Add a new group if it doesn't exist. - if !ok { - m := generateGroupingLabels(enh, metric, without, grouping) - newAgg := &groupedAggregation{ - labels: m, + group := orderedResult[seriesToResult[si]] + // Initialize this group if it's the first time we've seen it. + if !seen[seriesToResult[si]] { + *group = groupedAggregation{ + labels: group.labels, floatValue: s.F, floatMean: s.F, groupCount: 1, } switch { case s.H == nil: - newAgg.hasFloat = true + group.hasFloat = true case op == parser.SUM: - newAgg.histogramValue = s.H.Copy() - newAgg.hasHistogram = true + group.histogramValue = s.H.Copy() + group.hasHistogram = true case op == parser.AVG: - newAgg.histogramMean = s.H.Copy() - newAgg.hasHistogram = true + group.histogramMean = s.H.Copy() + group.hasHistogram = true case op == parser.STDVAR || op == parser.STDDEV: - newAgg.groupCount = 0 + group.groupCount = 0 } switch op { case parser.STDVAR, parser.STDDEV: - newAgg.floatValue = 0 + group.floatValue = 0 case parser.TOPK, parser.QUANTILE: - newAgg.heap = make(vectorByValueHeap, 1, k) - newAgg.heap[0] = Sample{ + group.heap = make(vectorByValueHeap, 1, k) + group.heap[0] = Sample{ F: s.F, Metric: s.Metric, } case parser.BOTTOMK: - newAgg.reverseHeap = make(vectorByReverseValueHeap, 1, k) - newAgg.reverseHeap[0] = Sample{ + group.reverseHeap = make(vectorByReverseValueHeap, 1, k) + group.reverseHeap[0] = Sample{ F: s.F, Metric: s.Metric, } case parser.GROUP: - newAgg.floatValue = 1 + group.floatValue = 1 } - - result[groupingKey] = newAgg - orderedResult = append(orderedResult, newAgg) + seen[seriesToResult[si]] = true continue } @@ -2950,7 +2952,10 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, grouping []string, q f seriess[hash] = ss } } - for _, aggr := range orderedResult { + for ri, aggr := range orderedResult { + if !seen[ri] { + continue + } switch op { case parser.AVG: if aggr.hasFloat && aggr.hasHistogram { From eb41e770b79fe3824a342efbab4fc169364706ef Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sun, 3 Mar 2024 16:52:43 +0000 Subject: [PATCH 12/24] promql: refactor: extract function addToSeries Signed-off-by: Bryan Boreham --- promql/engine.go | 38 ++++++++++++++++---------------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 140c0a0e49..f915c3480e 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1257,17 +1257,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) } else { ss = seriesAndTimestamp{Series{Metric: sample.Metric}, ts} } - if sample.H == nil { - if ss.Floats == nil { - ss.Floats = getFPointSlice(numSteps) - } - ss.Floats = append(ss.Floats, FPoint{T: ts, F: sample.F}) - } else { - if ss.Histograms == nil { - ss.Histograms = getHPointSlice(numSteps) - } - ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: sample.H}) - } + addToSeries(&ss.Series, enh.Ts, sample.F, sample.H, numSteps) seriess[h] = ss } } @@ -2938,17 +2928,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix if !ok { ss = Series{Metric: lbls} } - if h == nil { - if ss.Floats == nil { - ss.Floats = getFPointSlice(numSteps) - } - ss.Floats = append(ss.Floats, FPoint{T: enh.Ts, F: f}) - } else { - if ss.Histograms == nil { - ss.Histograms = getHPointSlice(numSteps) - } - ss.Histograms = append(ss.Histograms, HPoint{T: enh.Ts, H: h}) - } + addToSeries(&ss, enh.Ts, f, h, numSteps) seriess[hash] = ss } } @@ -3064,6 +3044,20 @@ func (ev *evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping [] return enh.Out, nil } +func addToSeries(ss *Series, ts int64, f float64, h *histogram.FloatHistogram, numSteps int) { + if h == nil { + if ss.Floats == nil { + ss.Floats = getFPointSlice(numSteps) + } + ss.Floats = append(ss.Floats, FPoint{T: ts, F: f}) + } else { + if ss.Histograms == nil { + ss.Histograms = getHPointSlice(numSteps) + } + ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: h}) + } +} + // groupingKey builds and returns the grouping key for the given metric and // grouping labels. func generateGroupingKey(metric labels.Labels, grouping []string, without bool, buf []byte) (uint64, []byte) { From 602eb69edfe87f90e8a1a2844168c946f83aa3f6 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 5 Apr 2024 11:37:55 +0100 Subject: [PATCH 13/24] promql: refactor: extract function nextSample With sub-function nextValues which we shall use shortly. Signed-off-by: Bryan Boreham --- promql/engine.go | 43 ++++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index f915c3480e..9e0a6b17c4 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2721,25 +2721,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix } } - for si, series := range inputMatrix { - var s Sample - - switch { - case len(series.Floats) > 0 && series.Floats[0].T == enh.Ts: - s = Sample{Metric: series.Metric, F: series.Floats[0].F, T: enh.Ts} - // Move input vectors forward so we don't have to re-scan the same - // past points at the next step. - inputMatrix[si].Floats = series.Floats[1:] - case len(series.Histograms) > 0 && series.Histograms[0].T == enh.Ts: - s = Sample{Metric: series.Metric, H: series.Histograms[0].H, T: enh.Ts} - inputMatrix[si].Histograms = series.Histograms[1:] - default: + for si := range inputMatrix { + s, ok := ev.nextSample(enh.Ts, inputMatrix, si) + if !ok { continue } - ev.currentSamples++ - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } group := orderedResult[seriesToResult[si]] // Initialize this group if it's the first time we've seen it. @@ -3058,6 +3044,29 @@ func addToSeries(ss *Series, ts int64, f float64, h *histogram.FloatHistogram, n } } +func (ev *evaluator) nextValues(ts int64, series *Series) (f float64, h *histogram.FloatHistogram, b bool) { + switch { + case len(series.Floats) > 0 && series.Floats[0].T == ts: + f = series.Floats[0].F + series.Floats = series.Floats[1:] // Move input vectors forward + case len(series.Histograms) > 0 && series.Histograms[0].T == ts: + h = series.Histograms[0].H + series.Histograms = series.Histograms[1:] + default: + return f, h, false + } + return f, h, true +} + +func (ev *evaluator) nextSample(ts int64, inputMatrix Matrix, si int) (Sample, bool) { + f, h, ok := ev.nextValues(ts, &inputMatrix[si]) + ev.currentSamples++ + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + return Sample{Metric: inputMatrix[si].Metric, F: f, H: h, T: ts}, ok +} + // groupingKey builds and returns the grouping key for the given metric and // grouping labels. func generateGroupingKey(metric labels.Labels, grouping []string, without bool, buf []byte) (uint64, []byte) { From 74eed67ef6c41ea252e3208f7db6ac52e9b941d8 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 5 Apr 2024 11:56:04 +0100 Subject: [PATCH 14/24] promql: refactor: pull fetching input data out of rangeEvalAgg This is a cleaner split of responsibilities. We now check the sample count after calling rangeEvalAgg. Changed re-use of samples to use `Clone` and `defer`. Signed-off-by: Bryan Boreham --- promql/engine.go | 62 +++++++++++++++++++++++++----------------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 9e0a6b17c4..770550dac1 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1279,29 +1279,19 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) return mat, warnings } -func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string) (Matrix, annotations.Annotations) { - originalNumSamples := ev.currentSamples +func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) { + // Keep a copy of the original point slice so that it can be returned to the pool. + origMatrix := slices.Clone(inputMatrix) + defer func() { + for _, s := range origMatrix { + putFPointSlice(s.Floats) + putHPointSlice(s.Histograms) + } + }() + var warnings annotations.Annotations - // param is the number k for topk/bottomk, or q for quantile. - var param float64 - if aggExpr.Param != nil { - val, ws := ev.eval(aggExpr.Param) - warnings.Merge(ws) - param = val.(Matrix)[0].Floats[0].F - } - // Now fetch the data to be aggregated. - // ev.currentSamples will be updated to the correct value within the ev.eval call. - val, ws := ev.eval(aggExpr.Expr) - warnings.Merge(ws) - inputMatrix := val.(Matrix) - - // Keep a copy of the original point slice so that it can be returned to the pool. - origMatrix := inputMatrix - - biggestLen := len(inputMatrix) enh := &EvalNodeHelper{} - seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. tempNumSamples := ev.currentSamples // Create a mapping from input series to output groups. @@ -1325,6 +1315,8 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping seriesToResult[si] = index } + seriess := make(map[uint64]Series, len(inputMatrix)) // Output series by series hash. + for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) @@ -1340,8 +1332,6 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { - ev.currentSamples = originalNumSamples + result.TotalSamples() - ev.samplesStats.UpdatePeak(ev.currentSamples) return result, warnings } if ev.currentSamples > ev.maxSamples { @@ -1349,18 +1339,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping } } - // Reuse the original point slice. - for _, s := range origMatrix { - putFPointSlice(s.Floats) - putHPointSlice(s.Histograms) - } // Assemble the output matrix. By the time we get here we know we don't have too many samples. mat := make(Matrix, 0, len(seriess)) for _, ss := range seriess { mat = append(mat, ss) } - ev.currentSamples = originalNumSamples + mat.TotalSamples() - ev.samplesStats.UpdatePeak(ev.currentSamples) return mat, warnings } @@ -1434,7 +1417,26 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio }, e.Expr) } - return ev.rangeEvalAgg(e, sortedGrouping) + var warnings annotations.Annotations + originalNumSamples := ev.currentSamples + // param is the number k for topk/bottomk, or q for quantile. + var fParam float64 + if param != nil { + val, ws := ev.eval(param) + warnings.Merge(ws) + fParam = val.(Matrix)[0].Floats[0].F + } + // Now fetch the data to be aggregated. + val, ws := ev.eval(e.Expr) + warnings.Merge(ws) + inputMatrix := val.(Matrix) + + result, ws := ev.rangeEvalAgg(e, sortedGrouping, inputMatrix, fParam) + warnings.Merge(ws) + ev.currentSamples = originalNumSamples + result.TotalSamples() + ev.samplesStats.UpdatePeak(ev.currentSamples) + + return result, warnings case *parser.Call: call := FunctionCalls[e.Func.Name] From 2f03acbafc08dd83cc8a5c3d94b2f071bb34f809 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 5 Apr 2024 12:22:44 +0100 Subject: [PATCH 15/24] promql: refactor: split topk/bottomk from sum/avg/etc They aggregate results in different ways. topk/bottomk don't consider histograms so can simplify data collection. Signed-off-by: Bryan Boreham --- promql/engine.go | 294 ++++++++++++++++++++++++++++------------------- 1 file changed, 174 insertions(+), 120 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 770550dac1..22428e12c7 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1299,6 +1299,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping groupToResultIndex := make(map[uint64]int) seriesToResult := make([]int, len(inputMatrix)) orderedResult := make([]*groupedAggregation, 0, 16) + var result Matrix for si, series := range inputMatrix { var groupingKey uint64 @@ -1306,8 +1307,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping index, ok := groupToResultIndex[groupingKey] // Add a new group if it doesn't exist. if !ok { - m := generateGroupingLabels(enh, series.Metric, aggExpr.Without, sortedGrouping) - newAgg := &groupedAggregation{labels: m} + if aggExpr.Op != parser.TOPK && aggExpr.Op != parser.BOTTOMK { + m := generateGroupingLabels(enh, series.Metric, aggExpr.Without, sortedGrouping) + result = append(result, Series{Metric: m}) + } + newAgg := &groupedAggregation{} index = len(orderedResult) groupToResultIndex[groupingKey] = index orderedResult = append(orderedResult, newAgg) @@ -1315,7 +1319,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping seriesToResult[si] = index } - seriess := make(map[uint64]Series, len(inputMatrix)) // Output series by series hash. + var seriess map[uint64]Series + switch aggExpr.Op { + case parser.TOPK, parser.BOTTOMK: + seriess = make(map[uint64]Series, len(inputMatrix)) // Output series by series hash. + } for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { if err := contextDone(ev.ctx, "expression evaluation"); err != nil { @@ -1326,25 +1334,44 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping // Make the function call. enh.Ts = ts - result, ws := ev.aggregation(aggExpr, param, inputMatrix, seriesToResult, orderedResult, enh, seriess) + var ws annotations.Annotations + switch aggExpr.Op { + case parser.TOPK, parser.BOTTOMK: + result, ws = ev.aggregationK(aggExpr, param, inputMatrix, seriesToResult, orderedResult, enh, seriess) + // If this could be an instant query, shortcut so as not to change sort order. + if ev.endTimestamp == ev.startTimestamp { + return result, ws + } + default: + ws = ev.aggregation(aggExpr, param, inputMatrix, result, seriesToResult, orderedResult, enh) + } warnings.Merge(ws) - // If this could be an instant query, shortcut so as not to change sort order. - if ev.endTimestamp == ev.startTimestamp { - return result, warnings - } if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } } // Assemble the output matrix. By the time we get here we know we don't have too many samples. - mat := make(Matrix, 0, len(seriess)) - for _, ss := range seriess { - mat = append(mat, ss) + switch aggExpr.Op { + case parser.TOPK, parser.BOTTOMK: + result = make(Matrix, 0, len(seriess)) + for _, ss := range seriess { + result = append(result, ss) + } + default: + // Remove empty result rows. + dst := 0 + for _, series := range result { + if len(series.Floats) > 0 || len(series.Histograms) > 0 { + result[dst] = series + dst++ + } + } + result = result[:dst] } - return mat, warnings + return result, warnings } // evalSubquery evaluates given SubqueryExpr and returns an equivalent @@ -2698,25 +2725,14 @@ type groupedAggregation struct { reverseHeap vectorByReverseValueHeap } -// aggregation evaluates an aggregation operation on a Vector. The provided grouping labels -// must be sorted. -func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { +// aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix. +// These functions produce one output series for each group specified in the expression, with just the labels from `by(...)`. +// outputMatrix should be already populated with grouping labels; groups is one-to-one with outputMatrix. +// seriesToResult maps inputMatrix indexes to outputMatrix indexes. +func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix, outputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper) annotations.Annotations { op := e.Op var annos annotations.Annotations seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp. - k := 1 - if op == parser.TOPK || op == parser.BOTTOMK { - if !convertibleToInt64(q) { - ev.errorf("Scalar value %v overflows int64", q) - } - k = int(q) - if k > len(inputMatrix) { - k = len(inputMatrix) - } - if k < 1 { - return nil, annos - } - } if op == parser.QUANTILE { if math.IsNaN(q) || q < 0 || q > 1 { annos.Add(annotations.NewInvalidQuantileWarning(q, e.Param.PositionRange())) @@ -2733,7 +2749,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // Initialize this group if it's the first time we've seen it. if !seen[seriesToResult[si]] { *group = groupedAggregation{ - labels: group.labels, floatValue: s.F, floatMean: s.F, groupCount: 1, @@ -2754,18 +2769,12 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix switch op { case parser.STDVAR, parser.STDDEV: group.floatValue = 0 - case parser.TOPK, parser.QUANTILE: - group.heap = make(vectorByValueHeap, 1, k) + case parser.QUANTILE: + group.heap = make(vectorByValueHeap, 1) group.heap[0] = Sample{ F: s.F, Metric: s.Metric, } - case parser.BOTTOMK: - group.reverseHeap = make(vectorByReverseValueHeap, 1, k) - group.reverseHeap[0] = Sample{ - F: s.F, - Metric: s.Metric, - } case parser.GROUP: group.floatValue = 1 } @@ -2848,44 +2857,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix group.floatValue += delta * (s.F - group.floatMean) } - case parser.TOPK: - // We build a heap of up to k elements, with the smallest element at heap[0]. - switch { - case len(group.heap) < k: - heap.Push(&group.heap, &Sample{ - F: s.F, - Metric: s.Metric, - }) - case group.heap[0].F < s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)): - // This new element is bigger than the previous smallest element - overwrite that. - group.heap[0] = Sample{ - F: s.F, - Metric: s.Metric, - } - if k > 1 { - heap.Fix(&group.heap, 0) // Maintain the heap invariant. - } - } - - case parser.BOTTOMK: - // We build a heap of up to k elements, with the biggest element at heap[0]. - switch { - case len(group.reverseHeap) < k: - heap.Push(&group.reverseHeap, &Sample{ - F: s.F, - Metric: s.Metric, - }) - case group.reverseHeap[0].F > s.F || (math.IsNaN(group.reverseHeap[0].F) && !math.IsNaN(s.F)): - // This new element is smaller than the previous biggest element - overwrite that. - group.reverseHeap[0] = Sample{ - F: s.F, - Metric: s.Metric, - } - if k > 1 { - heap.Fix(&group.reverseHeap, 0) // Maintain the heap invariant. - } - } - case parser.QUANTILE: group.heap = append(group.heap, s) @@ -2894,32 +2865,9 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix } } - // Construct the result from the aggregated groups. + // Construct the output matrix from the aggregated groups. numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 - var mat Matrix - if ev.endTimestamp == ev.startTimestamp { - mat = make(Matrix, 0, len(orderedResult)) - } - add := func(lbls labels.Labels, f float64, h *histogram.FloatHistogram) { - // If this could be an instant query, add directly to the matrix so the result is in consistent order. - if ev.endTimestamp == ev.startTimestamp { - if h == nil { - mat = append(mat, Series{Metric: lbls, Floats: []FPoint{{T: enh.Ts, F: f}}}) - } else { - mat = append(mat, Series{Metric: lbls, Histograms: []HPoint{{T: enh.Ts, H: h}}}) - } - } else { - // Otherwise the results are added into seriess elements. - hash := lbls.Hash() - ss, ok := seriess[hash] - if !ok { - ss = Series{Metric: lbls} - } - addToSeries(&ss, enh.Ts, f, h, numSteps) - seriess[hash] = ss - } - } for ri, aggr := range orderedResult { if !seen[ri] { continue @@ -2946,26 +2894,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix case parser.STDDEV: aggr.floatValue = math.Sqrt(aggr.floatValue / float64(aggr.groupCount)) - case parser.TOPK: - // The heap keeps the lowest value on top, so reverse it. - if len(aggr.heap) > 1 { - sort.Sort(sort.Reverse(aggr.heap)) - } - for _, v := range aggr.heap { - add(v.Metric, v.F, nil) - } - continue // Bypass default append. - - case parser.BOTTOMK: - // The heap keeps the highest value on top, so reverse it. - if len(aggr.reverseHeap) > 1 { - sort.Sort(sort.Reverse(aggr.reverseHeap)) - } - for _, v := range aggr.reverseHeap { - add(v.Metric, v.F, nil) - } - continue // Bypass default append. - case parser.QUANTILE: aggr.floatValue = quantile(q, aggr.heap) @@ -2982,7 +2910,133 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // For other aggregations, we already have the right value. } - add(aggr.labels, aggr.floatValue, aggr.histogramValue) + ss := &outputMatrix[ri] + addToSeries(ss, enh.Ts, aggr.floatValue, aggr.histogramValue, numSteps) + } + + return annos +} + +// aggregationK evaluates topk or bottomk at one timestep on inputMatrix. +// Output that has the same labels as the input, but just k of them per group. +// seriesToResult maps inputMatrix indexes to groups indexes. +// For an instant query, returns a Matrix in descending order for topk or ascending for bottomk. +// For a range query, aggregates output in the seriess map. +func (ev *evaluator) aggregationK(e *parser.AggregateExpr, q float64, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { + op := e.Op + var annos annotations.Annotations + seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp. + if !convertibleToInt64(q) { + ev.errorf("Scalar value %v overflows int64", q) + } + k := int(q) + if k > len(inputMatrix) { + k = len(inputMatrix) + } + if k < 1 { + return nil, annos + } + + for si := range inputMatrix { + s, ok := ev.nextSample(enh.Ts, inputMatrix, si) + if !ok { + continue + } + + group := orderedResult[seriesToResult[si]] + // Initialize this group if it's the first time we've seen it. + if !seen[seriesToResult[si]] { + *group = groupedAggregation{} + + switch op { + case parser.TOPK: + group.heap = make(vectorByValueHeap, 1, k) + group.heap[0] = s + case parser.BOTTOMK: + group.reverseHeap = make(vectorByReverseValueHeap, 1, k) + group.reverseHeap[0] = s + } + seen[seriesToResult[si]] = true + continue + } + + switch op { + case parser.TOPK: + // We build a heap of up to k elements, with the smallest element at heap[0]. + switch { + case len(group.heap) < k: + heap.Push(&group.heap, &s) + case group.heap[0].F < s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)): + // This new element is bigger than the previous smallest element - overwrite that. + group.heap[0] = s + if k > 1 { + heap.Fix(&group.heap, 0) // Maintain the heap invariant. + } + } + + case parser.BOTTOMK: + // We build a heap of up to k elements, with the biggest element at heap[0]. + switch { + case len(group.reverseHeap) < k: + heap.Push(&group.reverseHeap, &s) + case group.reverseHeap[0].F > s.F || (math.IsNaN(group.reverseHeap[0].F) && !math.IsNaN(s.F)): + // This new element is smaller than the previous biggest element - overwrite that. + group.reverseHeap[0] = s + if k > 1 { + heap.Fix(&group.reverseHeap, 0) // Maintain the heap invariant. + } + } + + default: + panic(fmt.Errorf("expected aggregation operator but got %q", op)) + } + } + + // Construct the result from the aggregated groups. + numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 + var mat Matrix + if ev.endTimestamp == ev.startTimestamp { + mat = make(Matrix, 0, len(orderedResult)) + } + + add := func(lbls labels.Labels, f float64) { + // If this could be an instant query, add directly to the matrix so the result is in consistent order. + if ev.endTimestamp == ev.startTimestamp { + mat = append(mat, Series{Metric: lbls, Floats: []FPoint{{T: enh.Ts, F: f}}}) + } else { + // Otherwise the results are added into seriess elements. + hash := lbls.Hash() + ss, ok := seriess[hash] + if !ok { + ss = Series{Metric: lbls} + } + addToSeries(&ss, enh.Ts, f, nil, numSteps) + seriess[hash] = ss + } + } + for ri, aggr := range orderedResult { + if !seen[ri] { + continue + } + switch op { + case parser.TOPK: + // The heap keeps the lowest value on top, so reverse it. + if len(aggr.heap) > 1 { + sort.Sort(sort.Reverse(aggr.heap)) + } + for _, v := range aggr.heap { + add(v.Metric, v.F) + } + + case parser.BOTTOMK: + // The heap keeps the highest value on top, so reverse it. + if len(aggr.reverseHeap) > 1 { + sort.Sort(sort.Reverse(aggr.reverseHeap)) + } + for _, v := range aggr.reverseHeap { + add(v.Metric, v.F) + } + } } return mat, annos From 526ce4ee7ad8cd2802590596801f2be8821faa63 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 4 Mar 2024 21:05:00 +0000 Subject: [PATCH 16/24] promql: simplify data collection in aggregations We don't need a Sample, just the float and histogram values. Signed-off-by: Bryan Boreham --- promql/engine.go | 49 +++++++++++++++++++++++------------------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 22428e12c7..a38cdf218e 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2740,7 +2740,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix } for si := range inputMatrix { - s, ok := ev.nextSample(enh.Ts, inputMatrix, si) + f, h, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) if !ok { continue } @@ -2749,18 +2749,18 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // Initialize this group if it's the first time we've seen it. if !seen[seriesToResult[si]] { *group = groupedAggregation{ - floatValue: s.F, - floatMean: s.F, + floatValue: f, + floatMean: f, groupCount: 1, } switch { - case s.H == nil: + case h == nil: group.hasFloat = true case op == parser.SUM: - group.histogramValue = s.H.Copy() + group.histogramValue = h.Copy() group.hasHistogram = true case op == parser.AVG: - group.histogramMean = s.H.Copy() + group.histogramMean = h.Copy() group.hasHistogram = true case op == parser.STDVAR || op == parser.STDDEV: group.groupCount = 0 @@ -2771,10 +2771,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix group.floatValue = 0 case parser.QUANTILE: group.heap = make(vectorByValueHeap, 1) - group.heap[0] = Sample{ - F: s.F, - Metric: s.Metric, - } + group.heap[0] = Sample{F: f} case parser.GROUP: group.floatValue = 1 } @@ -2784,25 +2781,25 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix switch op { case parser.SUM: - if s.H != nil { + if h != nil { group.hasHistogram = true if group.histogramValue != nil { - group.histogramValue.Add(s.H) + group.histogramValue.Add(h) } // Otherwise the aggregation contained floats // previously and will be invalid anyway. No // point in copying the histogram in that case. } else { group.hasFloat = true - group.floatValue += s.F + group.floatValue += f } case parser.AVG: group.groupCount++ - if s.H != nil { + if h != nil { group.hasHistogram = true if group.histogramMean != nil { - left := s.H.Copy().Div(float64(group.groupCount)) + left := h.Copy().Div(float64(group.groupCount)) right := group.histogramMean.Copy().Div(float64(group.groupCount)) toAdd := left.Sub(right) group.histogramMean.Add(toAdd) @@ -2813,13 +2810,13 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix } else { group.hasFloat = true if math.IsInf(group.floatMean, 0) { - if math.IsInf(s.F, 0) && (group.floatMean > 0) == (s.F > 0) { + if math.IsInf(f, 0) && (group.floatMean > 0) == (f > 0) { // The `floatMean` and `s.F` values are `Inf` of the same sign. They // can't be subtracted, but the value of `floatMean` is correct // already. break } - if !math.IsInf(s.F, 0) && !math.IsNaN(s.F) { + if !math.IsInf(f, 0) && !math.IsNaN(f) { // At this stage, the mean is an infinite. If the added // value is neither an Inf or a Nan, we can keep that mean // value. @@ -2830,35 +2827,35 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix } } // Divide each side of the `-` by `group.groupCount` to avoid float64 overflows. - group.floatMean += s.F/float64(group.groupCount) - group.floatMean/float64(group.groupCount) + group.floatMean += f/float64(group.groupCount) - group.floatMean/float64(group.groupCount) } case parser.GROUP: // Do nothing. Required to avoid the panic in `default:` below. case parser.MAX: - if group.floatValue < s.F || math.IsNaN(group.floatValue) { - group.floatValue = s.F + if group.floatValue < f || math.IsNaN(group.floatValue) { + group.floatValue = f } case parser.MIN: - if group.floatValue > s.F || math.IsNaN(group.floatValue) { - group.floatValue = s.F + if group.floatValue > f || math.IsNaN(group.floatValue) { + group.floatValue = f } case parser.COUNT: group.groupCount++ case parser.STDVAR, parser.STDDEV: - if s.H == nil { // Ignore native histograms. + if h == nil { // Ignore native histograms. group.groupCount++ - delta := s.F - group.floatMean + delta := f - group.floatMean group.floatMean += delta / float64(group.groupCount) - group.floatValue += delta * (s.F - group.floatMean) + group.floatValue += delta * (f - group.floatMean) } case parser.QUANTILE: - group.heap = append(group.heap, s) + group.heap = append(group.heap, Sample{F: f}) default: panic(fmt.Errorf("expected aggregation operator but got %q", op)) From 4584f67e1706b5fa15bc4436c158fec701ae9402 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 9 Mar 2024 11:31:46 +0000 Subject: [PATCH 17/24] promql: inline nextSample function Move Sample out of loop to reduce allocations, otherwise it escapes to the heap. Signed-off-by: Bryan Boreham --- promql/engine.go | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index a38cdf218e..592114db2b 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2921,6 +2921,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // For a range query, aggregates output in the seriess map. func (ev *evaluator) aggregationK(e *parser.AggregateExpr, q float64, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { op := e.Op + var s Sample var annos annotations.Annotations seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp. if !convertibleToInt64(q) { @@ -2935,10 +2936,11 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, q float64, inputMatri } for si := range inputMatrix { - s, ok := ev.nextSample(enh.Ts, inputMatrix, si) + f, _, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) if !ok { continue } + s = Sample{Metric: inputMatrix[si].Metric, F: f} group := orderedResult[seriesToResult[si]] // Initialize this group if it's the first time we've seen it. @@ -3111,15 +3113,6 @@ func (ev *evaluator) nextValues(ts int64, series *Series) (f float64, h *histogr return f, h, true } -func (ev *evaluator) nextSample(ts int64, inputMatrix Matrix, si int) (Sample, bool) { - f, h, ok := ev.nextValues(ts, &inputMatrix[si]) - ev.currentSamples++ - if ev.currentSamples > ev.maxSamples { - ev.error(ErrTooManySamples(env)) - } - return Sample{Metric: inputMatrix[si].Metric, F: f, H: h, T: ts}, ok -} - // groupingKey builds and returns the grouping key for the given metric and // grouping labels. func generateGroupingKey(metric labels.Labels, grouping []string, without bool, buf []byte) (uint64, []byte) { From 185290a0d2f5908665a7d59710850ce2fdf59cf2 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 8 Mar 2024 17:53:19 +0000 Subject: [PATCH 18/24] promql: pull checking of q and k out of loop Signed-off-by: Bryan Boreham --- promql/engine.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 592114db2b..0dd33a7f94 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1319,10 +1319,25 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping seriesToResult[si] = index } + var k int var seriess map[uint64]Series switch aggExpr.Op { case parser.TOPK, parser.BOTTOMK: + if !convertibleToInt64(param) { + ev.errorf("Scalar value %v overflows int64", param) + } + k = int(param) + if k > len(inputMatrix) { + k = len(inputMatrix) + } + if k < 1 { + return nil, warnings + } seriess = make(map[uint64]Series, len(inputMatrix)) // Output series by series hash. + case parser.QUANTILE: + if math.IsNaN(param) || param < 0 || param > 1 { + warnings.Add(annotations.NewInvalidQuantileWarning(param, aggExpr.Param.PositionRange())) + } } for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { @@ -1337,7 +1352,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping var ws annotations.Annotations switch aggExpr.Op { case parser.TOPK, parser.BOTTOMK: - result, ws = ev.aggregationK(aggExpr, param, inputMatrix, seriesToResult, orderedResult, enh, seriess) + result, ws = ev.aggregationK(aggExpr, k, inputMatrix, seriesToResult, orderedResult, enh, seriess) // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { return result, ws @@ -2733,11 +2748,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix op := e.Op var annos annotations.Annotations seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp. - if op == parser.QUANTILE { - if math.IsNaN(q) || q < 0 || q > 1 { - annos.Add(annotations.NewInvalidQuantileWarning(q, e.Param.PositionRange())) - } - } for si := range inputMatrix { f, h, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) @@ -2919,21 +2929,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // seriesToResult maps inputMatrix indexes to groups indexes. // For an instant query, returns a Matrix in descending order for topk or ascending for bottomk. // For a range query, aggregates output in the seriess map. -func (ev *evaluator) aggregationK(e *parser.AggregateExpr, q float64, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { +func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { op := e.Op var s Sample var annos annotations.Annotations seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp. - if !convertibleToInt64(q) { - ev.errorf("Scalar value %v overflows int64", q) - } - k := int(q) - if k > len(inputMatrix) { - k = len(inputMatrix) - } - if k < 1 { - return nil, annos - } for si := range inputMatrix { f, _, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) From 2cf3c9de8f3fc0d88b42c13b072c7d3516de37e9 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 5 Apr 2024 14:39:29 +0100 Subject: [PATCH 19/24] promql: store labels per-group only for count_values This saves memory in other kinds of aggregation. We don't need `orderedResult` in `aggregationCountValues`; the ordering is not guaranteed. Signed-off-by: Bryan Boreham --- promql/engine.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 0dd33a7f94..b81617a1d1 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2730,7 +2730,6 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram type groupedAggregation struct { hasFloat bool // Has at least 1 float64 sample aggregated. hasHistogram bool // Has at least 1 histogram sample aggregated. - labels labels.Labels floatValue float64 histogramValue *histogram.FloatHistogram floatMean float64 @@ -3044,8 +3043,11 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma // aggregationK evaluates count_values on vec. // Outputs as many series per group as there are values in the input. func (ev *evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping []string, valueLabel string, vec Vector, enh *EvalNodeHelper) (Vector, annotations.Annotations) { - result := map[uint64]*groupedAggregation{} - orderedResult := []*groupedAggregation{} + type groupCount struct { + labels labels.Labels + count int + } + result := map[uint64]*groupCount{} var buf []byte for _, s := range vec { @@ -3062,24 +3064,21 @@ func (ev *evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping [] group, ok := result[groupingKey] // Add a new group if it doesn't exist. if !ok { - newAgg := &groupedAggregation{ - labels: generateGroupingLabels(enh, metric, e.Without, grouping), - groupCount: 1, + result[groupingKey] = &groupCount{ + labels: generateGroupingLabels(enh, metric, e.Without, grouping), + count: 1, } - - result[groupingKey] = newAgg - orderedResult = append(orderedResult, newAgg) continue } - group.groupCount++ + group.count++ } // Construct the result Vector from the aggregated groups. - for _, aggr := range orderedResult { + for _, aggr := range result { enh.Out = append(enh.Out, Sample{ Metric: aggr.labels, - F: float64(aggr.groupCount), + F: float64(aggr.count), }) } return enh.Out, nil From 5e3914a27cfbd32f4d54177ae896ccb65fe9cc4e Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 9 Mar 2024 10:23:31 +0000 Subject: [PATCH 20/24] promql: remove histogramMean from groupedAggregation Re-use histogramValue since we don't need them separately. Tidy up initialization. Signed-off-by: Bryan Boreham --- promql/engine.go | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index b81617a1d1..4230bff878 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2733,7 +2733,6 @@ type groupedAggregation struct { floatValue float64 histogramValue *histogram.FloatHistogram floatMean float64 - histogramMean *histogram.FloatHistogram groupCount int heap vectorByValueHeap reverseHeap vectorByReverseValueHeap @@ -2762,20 +2761,14 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix floatMean: f, groupCount: 1, } - switch { - case h == nil: - group.hasFloat = true - case op == parser.SUM: - group.histogramValue = h.Copy() - group.hasHistogram = true - case op == parser.AVG: - group.histogramMean = h.Copy() - group.hasHistogram = true - case op == parser.STDVAR || op == parser.STDDEV: - group.groupCount = 0 - } - switch op { + case parser.SUM, parser.AVG: + if h == nil { + group.hasFloat = true + } else { + group.histogramValue = h.Copy() + group.hasHistogram = true + } case parser.STDVAR, parser.STDDEV: group.floatValue = 0 case parser.QUANTILE: @@ -2807,11 +2800,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix group.groupCount++ if h != nil { group.hasHistogram = true - if group.histogramMean != nil { + if group.histogramValue != nil { left := h.Copy().Div(float64(group.groupCount)) - right := group.histogramMean.Copy().Div(float64(group.groupCount)) + right := group.histogramValue.Copy().Div(float64(group.groupCount)) toAdd := left.Sub(right) - group.histogramMean.Add(toAdd) + group.histogramValue.Add(toAdd) } // Otherwise the aggregation contained floats // previously and will be invalid anyway. No @@ -2886,7 +2879,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix continue } if aggr.hasHistogram { - aggr.histogramValue = aggr.histogramMean.Compact(0) + aggr.histogramValue = aggr.histogramValue.Compact(0) } else { aggr.floatValue = aggr.floatMean } From cfbeb6681bfb2a7d51e3ce3f6ef891097547bf10 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 9 Mar 2024 10:51:16 +0000 Subject: [PATCH 21/24] promql: re-use one heap for topk and bottomk Slightly ugly casting saves memory. Signed-off-by: Bryan Boreham --- promql/engine.go | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 4230bff878..2819b36d02 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2735,7 +2735,6 @@ type groupedAggregation struct { floatMean float64 groupCount int heap vectorByValueHeap - reverseHeap vectorByReverseValueHeap } // aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix. @@ -2937,16 +2936,10 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma group := orderedResult[seriesToResult[si]] // Initialize this group if it's the first time we've seen it. if !seen[seriesToResult[si]] { - *group = groupedAggregation{} - - switch op { - case parser.TOPK: - group.heap = make(vectorByValueHeap, 1, k) - group.heap[0] = s - case parser.BOTTOMK: - group.reverseHeap = make(vectorByReverseValueHeap, 1, k) - group.reverseHeap[0] = s + *group = groupedAggregation{ + heap: make(vectorByValueHeap, 1, k), } + group.heap[0] = s seen[seriesToResult[si]] = true continue } @@ -2968,13 +2961,13 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma case parser.BOTTOMK: // We build a heap of up to k elements, with the biggest element at heap[0]. switch { - case len(group.reverseHeap) < k: - heap.Push(&group.reverseHeap, &s) - case group.reverseHeap[0].F > s.F || (math.IsNaN(group.reverseHeap[0].F) && !math.IsNaN(s.F)): + case len(group.heap) < k: + heap.Push((*vectorByReverseValueHeap)(&group.heap), &s) + case group.heap[0].F > s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)): // This new element is smaller than the previous biggest element - overwrite that. - group.reverseHeap[0] = s + group.heap[0] = s if k > 1 { - heap.Fix(&group.reverseHeap, 0) // Maintain the heap invariant. + heap.Fix((*vectorByReverseValueHeap)(&group.heap), 0) // Maintain the heap invariant. } } @@ -3021,10 +3014,10 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma case parser.BOTTOMK: // The heap keeps the highest value on top, so reverse it. - if len(aggr.reverseHeap) > 1 { - sort.Sort(sort.Reverse(aggr.reverseHeap)) + if len(aggr.heap) > 1 { + sort.Sort(sort.Reverse((*vectorByReverseValueHeap)(&aggr.heap))) } - for _, v := range aggr.reverseHeap { + for _, v := range aggr.heap { add(v.Metric, v.F) } } From 7499d90913725e1d95f231a8584be34a10fafbce Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 9 Mar 2024 11:06:46 +0000 Subject: [PATCH 22/24] promql: remove pointer to aggregation groups Just allocate in one slice. Signed-off-by: Bryan Boreham --- promql/engine.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 2819b36d02..bd4c7ed467 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1298,9 +1298,9 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping buf := make([]byte, 0, 1024) groupToResultIndex := make(map[uint64]int) seriesToResult := make([]int, len(inputMatrix)) - orderedResult := make([]*groupedAggregation, 0, 16) var result Matrix + groupCount := 0 for si, series := range inputMatrix { var groupingKey uint64 groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) @@ -1311,13 +1311,13 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping m := generateGroupingLabels(enh, series.Metric, aggExpr.Without, sortedGrouping) result = append(result, Series{Metric: m}) } - newAgg := &groupedAggregation{} - index = len(orderedResult) + index = groupCount groupToResultIndex[groupingKey] = index - orderedResult = append(orderedResult, newAgg) + groupCount++ } seriesToResult[si] = index } + groups := make([]groupedAggregation, groupCount) var k int var seriess map[uint64]Series @@ -1352,13 +1352,13 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping var ws annotations.Annotations switch aggExpr.Op { case parser.TOPK, parser.BOTTOMK: - result, ws = ev.aggregationK(aggExpr, k, inputMatrix, seriesToResult, orderedResult, enh, seriess) + result, ws = ev.aggregationK(aggExpr, k, inputMatrix, seriesToResult, groups, enh, seriess) // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { return result, ws } default: - ws = ev.aggregation(aggExpr, param, inputMatrix, result, seriesToResult, orderedResult, enh) + ws = ev.aggregation(aggExpr, param, inputMatrix, result, seriesToResult, groups, enh) } warnings.Merge(ws) @@ -2741,10 +2741,10 @@ type groupedAggregation struct { // These functions produce one output series for each group specified in the expression, with just the labels from `by(...)`. // outputMatrix should be already populated with grouping labels; groups is one-to-one with outputMatrix. // seriesToResult maps inputMatrix indexes to outputMatrix indexes. -func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix, outputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper) annotations.Annotations { +func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix, outputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper) annotations.Annotations { op := e.Op var annos annotations.Annotations - seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp. + seen := make([]bool, len(groups)) // Which output groups were seen in the input at this timestamp. for si := range inputMatrix { f, h, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) @@ -2752,7 +2752,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix continue } - group := orderedResult[seriesToResult[si]] + group := &groups[seriesToResult[si]] // Initialize this group if it's the first time we've seen it. if !seen[seriesToResult[si]] { *group = groupedAggregation{ @@ -2866,7 +2866,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // Construct the output matrix from the aggregated groups. numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 - for ri, aggr := range orderedResult { + for ri, aggr := range groups { if !seen[ri] { continue } @@ -2920,11 +2920,11 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // seriesToResult maps inputMatrix indexes to groups indexes. // For an instant query, returns a Matrix in descending order for topk or ascending for bottomk. // For a range query, aggregates output in the seriess map. -func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Matrix, seriesToResult []int, orderedResult []*groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { +func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { op := e.Op var s Sample var annos annotations.Annotations - seen := make([]bool, len(orderedResult)) // Which output groups were seen in the input at this timestamp. + seen := make([]bool, len(groups)) // Which output groups were seen in the input at this timestamp. for si := range inputMatrix { f, _, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) @@ -2933,7 +2933,7 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma } s = Sample{Metric: inputMatrix[si].Metric, F: f} - group := orderedResult[seriesToResult[si]] + group := &groups[seriesToResult[si]] // Initialize this group if it's the first time we've seen it. if !seen[seriesToResult[si]] { *group = groupedAggregation{ @@ -2980,7 +2980,7 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 var mat Matrix if ev.endTimestamp == ev.startTimestamp { - mat = make(Matrix, 0, len(orderedResult)) + mat = make(Matrix, 0, len(groups)) } add := func(lbls labels.Labels, f float64) { @@ -2998,7 +2998,7 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma seriess[hash] = ss } } - for ri, aggr := range orderedResult { + for ri, aggr := range groups { if !seen[ri] { continue } From 0ac927515b193abdc65575f8a71a1138a1debf80 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 9 Mar 2024 11:24:32 +0000 Subject: [PATCH 23/24] promql: move group-seen into group struct Save allocating an auxilliary array. Signed-off-by: Bryan Boreham --- promql/engine.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index bd4c7ed467..c23964ed82 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -2728,6 +2728,7 @@ func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram } type groupedAggregation struct { + seen bool // Was this output groups seen in the input at this timestamp. hasFloat bool // Has at least 1 float64 sample aggregated. hasHistogram bool // Has at least 1 histogram sample aggregated. floatValue float64 @@ -2744,7 +2745,9 @@ type groupedAggregation struct { func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix, outputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper) annotations.Annotations { op := e.Op var annos annotations.Annotations - seen := make([]bool, len(groups)) // Which output groups were seen in the input at this timestamp. + for i := range groups { + groups[i].seen = false + } for si := range inputMatrix { f, h, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) @@ -2754,8 +2757,9 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix group := &groups[seriesToResult[si]] // Initialize this group if it's the first time we've seen it. - if !seen[seriesToResult[si]] { + if !group.seen { *group = groupedAggregation{ + seen: true, floatValue: f, floatMean: f, groupCount: 1, @@ -2776,7 +2780,6 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix case parser.GROUP: group.floatValue = 1 } - seen[seriesToResult[si]] = true continue } @@ -2867,7 +2870,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 for ri, aggr := range groups { - if !seen[ri] { + if !aggr.seen { continue } switch op { @@ -2924,7 +2927,9 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma op := e.Op var s Sample var annos annotations.Annotations - seen := make([]bool, len(groups)) // Which output groups were seen in the input at this timestamp. + for i := range groups { + groups[i].seen = false + } for si := range inputMatrix { f, _, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) @@ -2935,12 +2940,12 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma group := &groups[seriesToResult[si]] // Initialize this group if it's the first time we've seen it. - if !seen[seriesToResult[si]] { + if !group.seen { *group = groupedAggregation{ + seen: true, heap: make(vectorByValueHeap, 1, k), } group.heap[0] = s - seen[seriesToResult[si]] = true continue } @@ -2998,8 +3003,8 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, inputMatrix Ma seriess[hash] = ss } } - for ri, aggr := range groups { - if !seen[ri] { + for _, aggr := range groups { + if !aggr.seen { continue } switch op { From 12961c6a373fe9d19f97a7bff9cac6587896eec2 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 5 Apr 2024 15:40:07 +0100 Subject: [PATCH 24/24] promql: refactor: eliminate one 'else' Signed-off-by: Bryan Boreham --- promql/engine.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index c23964ed82..b8a8ea0959 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -3081,12 +3081,12 @@ func addToSeries(ss *Series, ts int64, f float64, h *histogram.FloatHistogram, n ss.Floats = getFPointSlice(numSteps) } ss.Floats = append(ss.Floats, FPoint{T: ts, F: f}) - } else { - if ss.Histograms == nil { - ss.Histograms = getHPointSlice(numSteps) - } - ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: h}) + return } + if ss.Histograms == nil { + ss.Histograms = getHPointSlice(numSteps) + } + ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: h}) } func (ev *evaluator) nextValues(ts int64, series *Series) (f float64, h *histogram.FloatHistogram, b bool) {