Refactor VectorAnd for invariant exprs

Signed-off-by: darshanime <deathbullet@gmail.com>
This commit is contained in:
darshanime 2025-01-04 13:31:28 +05:30
parent 096e2aa7bd
commit 73d66f97c1
4 changed files with 67 additions and 13 deletions

View file

@ -238,6 +238,16 @@ func rangeQueryCases() []benchCase {
{ {
expr: "timestamp(a_X)", expr: "timestamp(a_X)",
}, },
// Vector Ops.
{
expr: "rate(a_X[1m]) and topk(1, rate(a_X[1m] @ start()))",
},
{
expr: "rate(a_X[1m] @ start()) or rate(a_X[1m])",
},
{
expr: "rate(a_X[1m]) unless rate(a_X[1m] @ start())",
},
} }
// X in an expr will be replaced by different metric sizes. // X in an expr will be replaced by different metric sizes.

View file

@ -1146,6 +1146,9 @@ type EvalNodeHelper struct {
// Additional options for the evaluation. // Additional options for the evaluation.
enableDelayedNameRemoval bool enableDelayedNameRemoval bool
// Slice of signature sets, indexed by index of exprs passed to rangeEval.
stepInvariantSigSet []map[string]struct{}
} }
func (enh *EvalNodeHelper) resetBuilder(lbls labels.Labels) { func (enh *EvalNodeHelper) resetBuilder(lbls labels.Labels) {
@ -1195,7 +1198,7 @@ func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Label
biggestLen = len(matrixes[i]) biggestLen = len(matrixes[i])
} }
} }
enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen), enableDelayedNameRemoval: ev.enableDelayedNameRemoval} enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen), enableDelayedNameRemoval: ev.enableDelayedNameRemoval, stepInvariantSigSet: make([]map[string]struct{}, len(exprs))}
type seriesAndTimestamp struct { type seriesAndTimestamp struct {
Series Series
ts int64 ts int64
@ -1214,13 +1217,17 @@ func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Label
seriesHelpers = make([][]EvalSeriesHelper, len(exprs)) seriesHelpers = make([][]EvalSeriesHelper, len(exprs))
bufHelpers = make([][]EvalSeriesHelper, len(exprs)) bufHelpers = make([][]EvalSeriesHelper, len(exprs))
for i := range exprs { for i, e := range exprs {
seriesHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i])) seriesHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i]))
bufHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i])) bufHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i]))
for si, series := range matrixes[i] { for si, series := range matrixes[i] {
prepSeries(series.Metric, &seriesHelpers[i][si]) prepSeries(series.Metric, &seriesHelpers[i][si])
} }
if _, si := e.(*parser.StepInvariantExpr); si {
enh.stepInvariantSigSet[i] = map[string]struct{}{}
}
} }
} }
@ -2461,16 +2468,41 @@ func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *parser.VectorMatching,
return nil // Short-circuit: AND with nothing is nothing. return nil // Short-circuit: AND with nothing is nothing.
} }
// The set of signatures for the right-hand side Vector. signatureSet := map[string]struct{}{}
rightSigs := map[string]struct{}{} switch {
// Add all rhs samples to a map so we can easily find matches later. // Check if both LHS and RHS are step invariant.
for _, sh := range rhsh { case enh.stepInvariantSigSet[0] != nil && enh.stepInvariantSigSet[1] != nil:
rightSigs[sh.signature] = struct{}{} if lhsh[0].signature == rhsh[0].signature {
enh.Out = append(enh.Out, lhs...)
}
return enh.Out
// Check if LHS is step invariant.
case enh.stepInvariantSigSet[0] != nil:
for i, sh := range rhsh {
// If there's a matching entry in the left-hand side Vector, add the sample.
if sh.signature == lhsh[0].signature {
enh.Out = append(enh.Out, lhs[i])
}
}
return enh.Out
// Check if RHS is step invariant.
case enh.stepInvariantSigSet[1] != nil:
// If so, check if we have already populated the signature set for rhs.
if len(enh.stepInvariantSigSet[1]) == 0 {
for _, sh := range rhsh {
enh.stepInvariantSigSet[1][sh.signature] = struct{}{}
}
}
signatureSet = enh.stepInvariantSigSet[1]
default:
for _, sh := range rhsh {
signatureSet[sh.signature] = struct{}{}
}
} }
for i, ls := range lhs { for i, ls := range lhs {
// If there's a matching entry in the right-hand side Vector, add the sample. // If there's a matching entry in the other Vector, add the sample.
if _, ok := rightSigs[lhsh[i].signature]; ok { if _, ok := signatureSet[lhsh[i].signature]; ok {
enh.Out = append(enh.Out, ls) enh.Out = append(enh.Out, ls)
} }
} }

View file

@ -41,10 +41,11 @@ func TestConcurrentRangeQueries(t *testing.T) {
stor := teststorage.New(t) stor := teststorage.New(t)
defer stor.Close() defer stor.Close()
opts := promql.EngineOpts{ opts := promql.EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxSamples: 50000000, MaxSamples: 50000000,
Timeout: 100 * time.Second, Timeout: 100 * time.Second,
EnableAtModifier: true,
} }
// Enable experimental functions testing // Enable experimental functions testing
parser.EnableExperimentalFunctions = true parser.EnableExperimentalFunctions = true

View file

@ -14,6 +14,17 @@ load 5m
vector_matching_a{l="y"} 0+2x50 vector_matching_a{l="y"} 0+2x50
vector_matching_b{l="x"} 0+4x25 vector_matching_b{l="x"} 0+4x25
eval range from 0 to 30m step 5m vector_matching_a @ start() and vector_matching_b
vector_matching_a{l="x"} 0 0 0 0 0 0 0
eval range from 0 to 30m step 5m vector_matching_a @ end() and vector_matching_b
vector_matching_a{l="x"} 6 6 6 6 6 6 6
eval range from 0 to 30m step 5m vector_matching_a and vector_matching_b @ end()
vector_matching_a{l="x"} 0 1 2 3 4 5 6
eval range from 0 to 30m step 5m vector_matching_a @ start() and vector_matching_b @ end()
vector_matching_a{l="x"} 0 0 0 0 0 0 0
eval instant at 50m SUM(http_requests_total) BY (job) - COUNT(http_requests_total) BY (job) eval instant at 50m SUM(http_requests_total) BY (job) - COUNT(http_requests_total) BY (job)
{job="api-server"} 996 {job="api-server"} 996