From 6719071a0f12d2186b86edb61d3d3c477a932030 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 19 Mar 2021 17:52:29 +0100 Subject: [PATCH] Optimize aggregations in PromQL engine (#8594) * Optimize aggregations in PromQL engine Signed-off-by: Marco Pracucci --- promql/bench_test.go | 3 + promql/engine.go | 135 +++++++++++++++++++++++++++++++++---------- 2 files changed, 109 insertions(+), 29 deletions(-) diff --git a/promql/bench_test.go b/promql/bench_test.go index 03111767e5..aac2be7165 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -156,6 +156,9 @@ func BenchmarkRangeQuery(b *testing.B) { { expr: "sum by (le)(h_X)", }, + { + expr: "count_values('value', h_X)", + }, // Combinations. { expr: "rate(a_X[1m]) + rate(b_X[1m])", diff --git a/promql/engine.go b/promql/engine.go index f963f454b3..5c9caebada 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -902,6 +902,12 @@ func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws storage.Warnings return v, ws, nil } +// EvalSeriesHelper stores extra information about a series. +type EvalSeriesHelper struct { + // The grouping key used by aggregation. + groupingKey uint64 +} + // EvalNodeHelper stores extra information and caches for evaluating a single node across steps. type EvalNodeHelper struct { // Evaluation timestamp. @@ -962,10 +968,12 @@ func (enh *EvalNodeHelper) signatureFunc(on bool, names ...string) func(labels.L } // rangeEval evaluates the given expressions, and then for each step calls -// the given function with the values computed for each expression at that -// step. The return value is the combination into time series of all the +// the given funcCall with the values computed for each expression at that +// step. The return value is the combination into time series of all the // function call results. -func (ev *evaluator) rangeEval(funcCall func([]parser.Value, *EvalNodeHelper) (Vector, storage.Warnings), exprs ...parser.Expr) (Matrix, storage.Warnings) { +// The prepSeries function (if provided) can be used to prepare the helper +// for each series, then passed to each call funcCall. +func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper), funcCall func([]parser.Value, [][]EvalSeriesHelper, *EvalNodeHelper) (Vector, storage.Warnings), exprs ...parser.Expr) (Matrix, storage.Warnings) { numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 matrixes := make([]Matrix, len(exprs)) origMatrixes := make([]Matrix, len(exprs)) @@ -1001,6 +1009,30 @@ func (ev *evaluator) rangeEval(funcCall func([]parser.Value, *EvalNodeHelper) (V enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen)} seriess := make(map[uint64]Series, biggestLen) // Output series by series hash. tempNumSamples := ev.currentSamples + + var ( + seriesHelpers [][]EvalSeriesHelper + bufHelpers [][]EvalSeriesHelper // Buffer updated on each step + ) + + // If the series preparation function is provided, we should run it for + // every single series in the matrix. + if prepSeries != nil { + seriesHelpers = make([][]EvalSeriesHelper, len(exprs)) + bufHelpers = make([][]EvalSeriesHelper, len(exprs)) + + for i := range exprs { + seriesHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i])) + bufHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i])) + + for si, series := range matrixes[i] { + h := seriesHelpers[i][si] + prepSeries(series.Metric, &h) + seriesHelpers[i][si] = h + } + } + } + for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { if err := contextDone(ev.ctx, "expression evaluation"); err != nil { ev.error(err) @@ -1010,11 +1042,20 @@ func (ev *evaluator) rangeEval(funcCall func([]parser.Value, *EvalNodeHelper) (V // Gather input vectors for this timestamp. for i := range exprs { vectors[i] = vectors[i][:0] + + if prepSeries != nil { + bufHelpers[i] = bufHelpers[i][:0] + } + for si, series := range matrixes[i] { for _, point := range series.Points { if point.T == ts { if ev.currentSamples < ev.maxSamples { vectors[i] = append(vectors[i], Sample{Metric: series.Metric, Point: point}) + if prepSeries != nil { + bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si]) + } + // Move input vectors forward so we don't have to re-scan the same // past points at the next step. matrixes[i][si].Points = series.Points[1:] @@ -1028,9 +1069,10 @@ func (ev *evaluator) rangeEval(funcCall func([]parser.Value, *EvalNodeHelper) (V } args[i] = vectors[i] } + // Make the function call. enh.Ts = ts - result, ws := funcCall(args, enh) + result, ws := funcCall(args, bufHelpers, enh) if result.ContainsSameLabelset() { ev.errorf("vector cannot contain metrics with the same labelset") } @@ -1132,18 +1174,29 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { switch e := expr.(type) { case *parser.AggregateExpr: + // Grouping labels must be sorted (expected both by generateGroupingKey() and aggregation()). + sortedGrouping := e.Grouping + sort.Strings(sortedGrouping) + + // Prepare a function to initialise series helpers with the grouping key. + buf := make([]byte, 0, 1024) + initSeries := func(series labels.Labels, h *EvalSeriesHelper) { + h.groupingKey, buf = generateGroupingKey(series, sortedGrouping, e.Without, buf) + } + unwrapParenExpr(&e.Param) if s, ok := unwrapStepInvariantExpr(e.Param).(*parser.StringLiteral); ok { - return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { - return ev.aggregation(e.Op, e.Grouping, e.Without, s.Val, v[0].(Vector), enh), nil + return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.aggregation(e.Op, sortedGrouping, e.Without, s.Val, v[0].(Vector), sh[0], enh), nil }, e.Expr) } - return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { + + return ev.rangeEval(initSeries, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { var param float64 if e.Param != nil { param = v[0].(Vector)[0].V } - return ev.aggregation(e.Op, e.Grouping, e.Without, param, v[1].(Vector), enh), nil + return ev.aggregation(e.Op, sortedGrouping, e.Without, param, v[1].(Vector), sh[1], enh), nil }, e.Param, e.Expr) case *parser.Call: @@ -1156,7 +1209,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { arg := unwrapStepInvariantExpr(e.Args[0]) vs, ok := arg.(*parser.VectorSelector) if ok { - return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { if vs.Timestamp != nil { // This is a special case only for "timestamp" since the offset // needs to be adjusted for every point. @@ -1200,7 +1253,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { } if !matrixArg { // Does not have a matrix argument. - return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { return call(v, e.Args, enh), warnings }, e.Args...) } @@ -1367,43 +1420,43 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { case *parser.BinaryExpr: switch lt, rt := e.LHS.Type(), e.RHS.Type(); { case lt == parser.ValueTypeScalar && rt == parser.ValueTypeScalar: - return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { val := scalarBinop(e.Op, v[0].(Vector)[0].Point.V, v[1].(Vector)[0].Point.V) return append(enh.Out, Sample{Point: Point{V: val}}), nil }, e.LHS, e.RHS) case lt == parser.ValueTypeVector && rt == parser.ValueTypeVector: switch e.Op { case parser.LAND: - return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil }, e.LHS, e.RHS) case parser.LOR: - return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil }, e.LHS, e.RHS) case parser.LUNLESS: - return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, enh), nil }, e.LHS, e.RHS) default: - return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { return ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, enh), nil }, e.LHS, e.RHS) } case lt == parser.ValueTypeVector && rt == parser.ValueTypeScalar: - return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { return ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].Point.V}, false, e.ReturnBool, enh), nil }, e.LHS, e.RHS) case lt == parser.ValueTypeScalar && rt == parser.ValueTypeVector: - return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { return ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].Point.V}, true, e.ReturnBool, enh), nil }, e.LHS, e.RHS) } case *parser.NumberLiteral: - return ev.rangeEval(func(v []parser.Value, enh *EvalNodeHelper) (Vector, storage.Warnings) { + return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) { return append(enh.Out, Sample{Point: Point{V: e.Val}}), nil }) @@ -2067,8 +2120,9 @@ type groupedAggregation struct { reverseHeap vectorByReverseValueHeap } -// aggregation evaluates an aggregation operation on a Vector. -func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without bool, param interface{}, vec Vector, enh *EvalNodeHelper) Vector { +// aggregation evaluates an aggregation operation on a Vector. The provided grouping labels +// must be sorted. +func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without bool, param interface{}, vec Vector, seriesHelper []EvalSeriesHelper, enh *EvalNodeHelper) Vector { result := map[uint64]*groupedAggregation{} var k int64 @@ -2087,35 +2141,43 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without q = param.(float64) } var valueLabel string + var recomputeGroupingKey bool if op == parser.COUNT_VALUES { valueLabel = param.(string) if !model.LabelName(valueLabel).IsValid() { ev.errorf("invalid label name %q", valueLabel) } if !without { + // We're changing the grouping labels so we have to ensure they're still sorted + // and we have to flag to recompute the grouping key. Considering the count_values() + // operator is less frequently used than other aggregations, we're fine having to + // re-compute the grouping key on each step for this case. grouping = append(grouping, valueLabel) + sort.Strings(grouping) + recomputeGroupingKey = true } } - sort.Strings(grouping) lb := labels.NewBuilder(nil) - buf := make([]byte, 0, 1024) - for _, s := range vec { + var buf []byte + for si, s := range vec { metric := s.Metric if op == parser.COUNT_VALUES { lb.Reset(metric) lb.Set(valueLabel, strconv.FormatFloat(s.V, 'f', -1, 64)) metric = lb.Labels() + + // We've changed the metric so we have to recompute the grouping key. + recomputeGroupingKey = true } - var ( - groupingKey uint64 - ) - if without { - groupingKey, buf = metric.HashWithoutLabels(buf, grouping...) + // We can use the pre-computed grouping key unless grouping labels have changed. + var groupingKey uint64 + if !recomputeGroupingKey { + groupingKey = seriesHelper[si].groupingKey } else { - groupingKey, buf = metric.HashForLabels(buf, grouping...) + groupingKey, buf = generateGroupingKey(metric, grouping, without, buf) } group, ok := result[groupingKey] @@ -2302,6 +2364,21 @@ func (ev *evaluator) aggregation(op parser.ItemType, grouping []string, without return enh.Out } +// groupingKey builds and returns the grouping key for the given metric and +// grouping labels. +func generateGroupingKey(metric labels.Labels, grouping []string, without bool, buf []byte) (uint64, []byte) { + if without { + return metric.HashWithoutLabels(buf, grouping...) + } + + if len(grouping) == 0 { + // No need to generate any hash if there are no grouping labels. + return 0, buf + } + + return metric.HashForLabels(buf, grouping...) +} + // btos returns 1 if b is true, 0 otherwise. func btos(b bool) float64 { if b {