From 73d66f97c151f69e09dcccc08f97d7ca82f1a1b0 Mon Sep 17 00:00:00 2001 From: darshanime Date: Sat, 4 Jan 2025 13:31:28 +0530 Subject: [PATCH] Refactor VectorAnd for invariant exprs Signed-off-by: darshanime --- promql/bench_test.go | 10 +++++ promql/engine.go | 50 +++++++++++++++++++---- promql/promql_test.go | 9 ++-- promql/promqltest/testdata/operators.test | 11 +++++ 4 files changed, 67 insertions(+), 13 deletions(-) diff --git a/promql/bench_test.go b/promql/bench_test.go index 943baceecb..38d7b64077 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -238,6 +238,16 @@ func rangeQueryCases() []benchCase { { expr: "timestamp(a_X)", }, + // Vector Ops. + { + expr: "rate(a_X[1m]) and topk(1, rate(a_X[1m] @ start()))", + }, + { + expr: "rate(a_X[1m] @ start()) or rate(a_X[1m])", + }, + { + expr: "rate(a_X[1m]) unless rate(a_X[1m] @ start())", + }, } // X in an expr will be replaced by different metric sizes. diff --git a/promql/engine.go b/promql/engine.go index bbd8410268..913349a097 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1146,6 +1146,9 @@ type EvalNodeHelper struct { // Additional options for the evaluation. enableDelayedNameRemoval bool + + // Slice of signature sets, indexed by index of exprs passed to rangeEval. + stepInvariantSigSet []map[string]struct{} } func (enh *EvalNodeHelper) resetBuilder(lbls labels.Labels) { @@ -1195,7 +1198,7 @@ func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Label biggestLen = len(matrixes[i]) } } - enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen), enableDelayedNameRemoval: ev.enableDelayedNameRemoval} + enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen), enableDelayedNameRemoval: ev.enableDelayedNameRemoval, stepInvariantSigSet: make([]map[string]struct{}, len(exprs))} type seriesAndTimestamp struct { Series ts int64 @@ -1214,13 +1217,17 @@ func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Label seriesHelpers = make([][]EvalSeriesHelper, len(exprs)) bufHelpers = make([][]EvalSeriesHelper, len(exprs)) - for i := range exprs { + for i, e := range exprs { seriesHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i])) bufHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i])) for si, series := range matrixes[i] { prepSeries(series.Metric, &seriesHelpers[i][si]) } + + if _, si := e.(*parser.StepInvariantExpr); si { + enh.stepInvariantSigSet[i] = map[string]struct{}{} + } } } @@ -2461,16 +2468,41 @@ func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *parser.VectorMatching, return nil // Short-circuit: AND with nothing is nothing. } - // The set of signatures for the right-hand side Vector. - rightSigs := map[string]struct{}{} - // Add all rhs samples to a map so we can easily find matches later. - for _, sh := range rhsh { - rightSigs[sh.signature] = struct{}{} + signatureSet := map[string]struct{}{} + switch { + // Check if both LHS and RHS are step invariant. + case enh.stepInvariantSigSet[0] != nil && enh.stepInvariantSigSet[1] != nil: + if lhsh[0].signature == rhsh[0].signature { + enh.Out = append(enh.Out, lhs...) + } + return enh.Out + // Check if LHS is step invariant. + case enh.stepInvariantSigSet[0] != nil: + for i, sh := range rhsh { + // If there's a matching entry in the left-hand side Vector, add the sample. + if sh.signature == lhsh[0].signature { + enh.Out = append(enh.Out, lhs[i]) + } + } + return enh.Out + // Check if RHS is step invariant. + case enh.stepInvariantSigSet[1] != nil: + // If so, check if we have already populated the signature set for rhs. + if len(enh.stepInvariantSigSet[1]) == 0 { + for _, sh := range rhsh { + enh.stepInvariantSigSet[1][sh.signature] = struct{}{} + } + } + signatureSet = enh.stepInvariantSigSet[1] + default: + for _, sh := range rhsh { + signatureSet[sh.signature] = struct{}{} + } } for i, ls := range lhs { - // If there's a matching entry in the right-hand side Vector, add the sample. - if _, ok := rightSigs[lhsh[i].signature]; ok { + // If there's a matching entry in the other Vector, add the sample. + if _, ok := signatureSet[lhsh[i].signature]; ok { enh.Out = append(enh.Out, ls) } } diff --git a/promql/promql_test.go b/promql/promql_test.go index 345ecab5ed..b30477a81b 100644 --- a/promql/promql_test.go +++ b/promql/promql_test.go @@ -41,10 +41,11 @@ func TestConcurrentRangeQueries(t *testing.T) { stor := teststorage.New(t) defer stor.Close() opts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 50000000, - Timeout: 100 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 50000000, + Timeout: 100 * time.Second, + EnableAtModifier: true, } // Enable experimental functions testing parser.EnableExperimentalFunctions = true diff --git a/promql/promqltest/testdata/operators.test b/promql/promqltest/testdata/operators.test index 667989ca77..409bf75f27 100644 --- a/promql/promqltest/testdata/operators.test +++ b/promql/promqltest/testdata/operators.test @@ -14,6 +14,17 @@ load 5m vector_matching_a{l="y"} 0+2x50 vector_matching_b{l="x"} 0+4x25 +eval range from 0 to 30m step 5m vector_matching_a @ start() and vector_matching_b + vector_matching_a{l="x"} 0 0 0 0 0 0 0 + +eval range from 0 to 30m step 5m vector_matching_a @ end() and vector_matching_b + vector_matching_a{l="x"} 6 6 6 6 6 6 6 + +eval range from 0 to 30m step 5m vector_matching_a and vector_matching_b @ end() + vector_matching_a{l="x"} 0 1 2 3 4 5 6 + +eval range from 0 to 30m step 5m vector_matching_a @ start() and vector_matching_b @ end() + vector_matching_a{l="x"} 0 0 0 0 0 0 0 eval instant at 50m SUM(http_requests_total) BY (job) - COUNT(http_requests_total) BY (job) {job="api-server"} 996