Merge pull request #14816 from bboreham/improve-promql-tracing
Some checks are pending
buf.build / lint and publish (push) Waiting to run
CI / Go tests (push) Waiting to run
CI / More Go tests (push) Waiting to run
CI / Go tests with previous Go version (push) Waiting to run
CI / UI tests (push) Waiting to run
CI / Go tests on Windows (push) Waiting to run
CI / Mixins tests (push) Waiting to run
CI / Build Prometheus for common architectures (0) (push) Waiting to run
CI / Build Prometheus for common architectures (1) (push) Waiting to run
CI / Build Prometheus for common architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (0) (push) Waiting to run
CI / Build Prometheus for all architectures (1) (push) Waiting to run
CI / Build Prometheus for all architectures (10) (push) Waiting to run
CI / Build Prometheus for all architectures (11) (push) Waiting to run
CI / Build Prometheus for all architectures (2) (push) Waiting to run
CI / Build Prometheus for all architectures (3) (push) Waiting to run
CI / Build Prometheus for all architectures (4) (push) Waiting to run
CI / Build Prometheus for all architectures (5) (push) Waiting to run
CI / Build Prometheus for all architectures (6) (push) Waiting to run
CI / Build Prometheus for all architectures (7) (push) Waiting to run
CI / Build Prometheus for all architectures (8) (push) Waiting to run
CI / Build Prometheus for all architectures (9) (push) Waiting to run
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions
CI / Check generated parser (push) Waiting to run
CI / golangci-lint (push) Waiting to run
CI / fuzzing (push) Waiting to run
CI / codeql (push) Waiting to run
CI / Publish main branch artifacts (push) Blocked by required conditions
CI / Publish release artefacts (push) Blocked by required conditions
CI / Publish UI on npm Registry (push) Blocked by required conditions
Scorecards supply-chain security / Scorecards analysis (push) Waiting to run

Improve promql tracing
This commit is contained in:
Bryan Boreham 2024-09-04 14:32:22 +01:00 committed by GitHub
commit 485523eed2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 101 additions and 65 deletions

View file

@ -724,7 +724,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
startTimestamp: start, startTimestamp: start,
endTimestamp: start, endTimestamp: start,
interval: 1, interval: 1,
ctx: ctxInnerEval,
maxSamples: ng.maxSamplesPerQuery, maxSamples: ng.maxSamplesPerQuery,
logger: ng.logger, logger: ng.logger,
lookbackDelta: s.LookbackDelta, lookbackDelta: s.LookbackDelta,
@ -734,7 +733,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
} }
query.sampleStats.InitStepTracking(start, start, 1) query.sampleStats.InitStepTracking(start, start, 1)
val, warnings, err := evaluator.Eval(s.Expr) val, warnings, err := evaluator.Eval(ctxInnerEval, s.Expr)
evalSpanTimer.Finish() evalSpanTimer.Finish()
@ -783,7 +782,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
startTimestamp: timeMilliseconds(s.Start), startTimestamp: timeMilliseconds(s.Start),
endTimestamp: timeMilliseconds(s.End), endTimestamp: timeMilliseconds(s.End),
interval: durationMilliseconds(s.Interval), interval: durationMilliseconds(s.Interval),
ctx: ctxInnerEval,
maxSamples: ng.maxSamplesPerQuery, maxSamples: ng.maxSamplesPerQuery,
logger: ng.logger, logger: ng.logger,
lookbackDelta: s.LookbackDelta, lookbackDelta: s.LookbackDelta,
@ -792,7 +790,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
enableDelayedNameRemoval: ng.enableDelayedNameRemoval, enableDelayedNameRemoval: ng.enableDelayedNameRemoval,
} }
query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval) query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval)
val, warnings, err := evaluator.Eval(s.Expr) val, warnings, err := evaluator.Eval(ctxInnerEval, s.Expr)
evalSpanTimer.Finish() evalSpanTimer.Finish()
@ -1004,6 +1002,8 @@ func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations
if e.Series != nil { if e.Series != nil {
return nil, nil return nil, nil
} }
span := trace.SpanFromContext(ctx)
span.AddEvent("expand start", trace.WithAttributes(attribute.String("selector", e.String())))
series, ws, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet) series, ws, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet)
if e.SkipHistogramBuckets { if e.SkipHistogramBuckets {
for i := range series { for i := range series {
@ -1011,6 +1011,7 @@ func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations
} }
} }
e.Series = series e.Series = series
span.AddEvent("expand end", trace.WithAttributes(attribute.Int("num_series", len(series))))
return ws, err return ws, err
} }
return nil, nil return nil, nil
@ -1040,8 +1041,6 @@ func (e errWithWarnings) Error() string { return e.err.Error() }
// querier and reports errors. On timeout or cancellation of its context it // querier and reports errors. On timeout or cancellation of its context it
// terminates. // terminates.
type evaluator struct { type evaluator struct {
ctx context.Context
startTimestamp int64 // Start time in milliseconds. startTimestamp int64 // Start time in milliseconds.
endTimestamp int64 // End time in milliseconds. endTimestamp int64 // End time in milliseconds.
interval int64 // Interval in milliseconds. interval int64 // Interval in milliseconds.
@ -1090,10 +1089,10 @@ func (ev *evaluator) recover(expr parser.Expr, ws *annotations.Annotations, errp
} }
} }
func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws annotations.Annotations, err error) { func (ev *evaluator) Eval(ctx context.Context, expr parser.Expr) (v parser.Value, ws annotations.Annotations, err error) {
defer ev.recover(expr, &ws, &err) defer ev.recover(expr, &ws, &err)
v, ws = ev.eval(expr) v, ws = ev.eval(ctx, expr)
if ev.enableDelayedNameRemoval { if ev.enableDelayedNameRemoval {
ev.cleanupMetricLabels(v) ev.cleanupMetricLabels(v)
} }
@ -1144,7 +1143,7 @@ func (enh *EvalNodeHelper) resetBuilder(lbls labels.Labels) {
// function call results. // function call results.
// The prepSeries function (if provided) can be used to prepare the helper // The prepSeries function (if provided) can be used to prepare the helper
// for each series, then passed to each call funcCall. // for each series, then passed to each call funcCall.
func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper), funcCall func([]parser.Value, [][]EvalSeriesHelper, *EvalNodeHelper) (Vector, annotations.Annotations), exprs ...parser.Expr) (Matrix, annotations.Annotations) { func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Labels, *EvalSeriesHelper), funcCall func([]parser.Value, [][]EvalSeriesHelper, *EvalNodeHelper) (Vector, annotations.Annotations), exprs ...parser.Expr) (Matrix, annotations.Annotations) {
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
matrixes := make([]Matrix, len(exprs)) matrixes := make([]Matrix, len(exprs))
origMatrixes := make([]Matrix, len(exprs)) origMatrixes := make([]Matrix, len(exprs))
@ -1155,7 +1154,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
// Functions will take string arguments from the expressions, not the values. // Functions will take string arguments from the expressions, not the values.
if e != nil && e.Type() != parser.ValueTypeString { if e != nil && e.Type() != parser.ValueTypeString {
// ev.currentSamples will be updated to the correct value within the ev.eval call. // ev.currentSamples will be updated to the correct value within the ev.eval call.
val, ws := ev.eval(e) val, ws := ev.eval(ctx, e)
warnings.Merge(ws) warnings.Merge(ws)
matrixes[i] = val.(Matrix) matrixes[i] = val.(Matrix)
@ -1207,7 +1206,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
} }
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil { if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err) ev.error(err)
} }
// Reset number of samples in memory after each timestamp. // Reset number of samples in memory after each timestamp.
@ -1317,7 +1316,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
return mat, warnings return mat, warnings
} }
func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) { func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) {
// Keep a copy of the original point slice so that it can be returned to the pool. // Keep a copy of the original point slice so that it can be returned to the pool.
origMatrix := slices.Clone(inputMatrix) origMatrix := slices.Clone(inputMatrix)
defer func() { defer func() {
@ -1397,7 +1396,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
} }
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil { if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err) ev.error(err)
} }
// Reset number of samples in memory after each timestamp. // Reset number of samples in memory after each timestamp.
@ -1448,11 +1447,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping
// evalSubquery evaluates given SubqueryExpr and returns an equivalent // evalSubquery evaluates given SubqueryExpr and returns an equivalent
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set. // evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) { func (ev *evaluator) evalSubquery(ctx context.Context, subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) {
samplesStats := ev.samplesStats samplesStats := ev.samplesStats
// Avoid double counting samples when running a subquery, those samples will be counted in later stage. // Avoid double counting samples when running a subquery, those samples will be counted in later stage.
ev.samplesStats = ev.samplesStats.NewChild() ev.samplesStats = ev.samplesStats.NewChild()
val, ws := ev.eval(subq) val, ws := ev.eval(ctx, subq)
// But do incorporate the peak from the subquery // But do incorporate the peak from the subquery
samplesStats.UpdatePeakFromSubquery(ev.samplesStats) samplesStats.UpdatePeakFromSubquery(ev.samplesStats)
ev.samplesStats = samplesStats ev.samplesStats = samplesStats
@ -1479,18 +1478,20 @@ func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSele
} }
// eval evaluates the given expression as the given AST expression node requires. // eval evaluates the given expression as the given AST expression node requires.
func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotations) { func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, annotations.Annotations) {
// This is the top-level evaluation method. // This is the top-level evaluation method.
// Thus, we check for timeout/cancellation here. // Thus, we check for timeout/cancellation here.
if err := contextDone(ev.ctx, "expression evaluation"); err != nil { if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err) ev.error(err)
} }
numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1
// Create a new span to help investigate inner evaluation performances. // Create a new span to help investigate inner evaluation performances.
ctxWithSpan, span := otel.Tracer("").Start(ev.ctx, stats.InnerEvalTime.SpanOperation()+" eval "+reflect.TypeOf(expr).String()) ctx, span := otel.Tracer("").Start(ctx, stats.InnerEvalTime.SpanOperation()+" eval "+reflect.TypeOf(expr).String())
ev.ctx = ctxWithSpan
defer span.End() defer span.End()
if ss, ok := expr.(interface{ ShortString() string }); ok {
span.SetAttributes(attribute.String("operation", ss.ShortString()))
}
switch e := expr.(type) { switch e := expr.(type) {
case *parser.AggregateExpr: case *parser.AggregateExpr:
@ -1511,7 +1512,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
sortedGrouping = append(sortedGrouping, valueLabel.Val) sortedGrouping = append(sortedGrouping, valueLabel.Val)
slices.Sort(sortedGrouping) slices.Sort(sortedGrouping)
} }
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.aggregationCountValues(e, sortedGrouping, valueLabel.Val, v[0].(Vector), enh) return ev.aggregationCountValues(e, sortedGrouping, valueLabel.Val, v[0].(Vector), enh)
}, e.Expr) }, e.Expr)
} }
@ -1521,16 +1522,16 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
// param is the number k for topk/bottomk, or q for quantile. // param is the number k for topk/bottomk, or q for quantile.
var fParam float64 var fParam float64
if param != nil { if param != nil {
val, ws := ev.eval(param) val, ws := ev.eval(ctx, param)
warnings.Merge(ws) warnings.Merge(ws)
fParam = val.(Matrix)[0].Floats[0].F fParam = val.(Matrix)[0].Floats[0].F
} }
// Now fetch the data to be aggregated. // Now fetch the data to be aggregated.
val, ws := ev.eval(e.Expr) val, ws := ev.eval(ctx, e.Expr)
warnings.Merge(ws) warnings.Merge(ws)
inputMatrix := val.(Matrix) inputMatrix := val.(Matrix)
result, ws := ev.rangeEvalAgg(e, sortedGrouping, inputMatrix, fParam) result, ws := ev.rangeEvalAgg(ctx, e, sortedGrouping, inputMatrix, fParam)
warnings.Merge(ws) warnings.Merge(ws)
ev.currentSamples = originalNumSamples + result.TotalSamples() ev.currentSamples = originalNumSamples + result.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples) ev.samplesStats.UpdatePeak(ev.currentSamples)
@ -1548,7 +1549,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
unwrapParenExpr(&arg) unwrapParenExpr(&arg)
vs, ok := arg.(*parser.VectorSelector) vs, ok := arg.(*parser.VectorSelector)
if ok { if ok {
return ev.rangeEvalTimestampFunctionOverVectorSelector(vs, call, e) return ev.rangeEvalTimestampFunctionOverVectorSelector(ctx, vs, call, e)
} }
} }
@ -1572,7 +1573,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
matrixArgIndex = i matrixArgIndex = i
matrixArg = true matrixArg = true
// Replacing parser.SubqueryExpr with parser.MatrixSelector. // Replacing parser.SubqueryExpr with parser.MatrixSelector.
val, totalSamples, ws := ev.evalSubquery(subq) val, totalSamples, ws := ev.evalSubquery(ctx, subq)
e.Args[i] = val e.Args[i] = val
warnings.Merge(ws) warnings.Merge(ws)
defer func() { defer func() {
@ -1587,14 +1588,14 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
// Special handling for functions that work on series not samples. // Special handling for functions that work on series not samples.
switch e.Func.Name { switch e.Func.Name {
case "label_replace": case "label_replace":
return ev.evalLabelReplace(e.Args) return ev.evalLabelReplace(ctx, e.Args)
case "label_join": case "label_join":
return ev.evalLabelJoin(e.Args) return ev.evalLabelJoin(ctx, e.Args)
} }
if !matrixArg { if !matrixArg {
// Does not have a matrix argument. // Does not have a matrix argument.
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
vec, annos := call(v, e.Args, enh) vec, annos := call(v, e.Args, enh)
return vec, warnings.Merge(annos) return vec, warnings.Merge(annos)
}, e.Args...) }, e.Args...)
@ -1606,7 +1607,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
otherInArgs := make([]Vector, len(e.Args)) otherInArgs := make([]Vector, len(e.Args))
for i, e := range e.Args { for i, e := range e.Args {
if i != matrixArgIndex { if i != matrixArgIndex {
val, ws := ev.eval(e) val, ws := ev.eval(ctx, e)
otherArgs[i] = val.(Matrix) otherArgs[i] = val.(Matrix)
otherInArgs[i] = Vector{Sample{}} otherInArgs[i] = Vector{Sample{}}
inArgs[i] = otherInArgs[i] inArgs[i] = otherInArgs[i]
@ -1620,7 +1621,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
sel := arg.(*parser.MatrixSelector) sel := arg.(*parser.MatrixSelector)
selVS := sel.VectorSelector.(*parser.VectorSelector) selVS := sel.VectorSelector.(*parser.VectorSelector)
ws, err := checkAndExpandSeriesSet(ev.ctx, sel) ws, err := checkAndExpandSeriesSet(ctx, sel)
warnings.Merge(ws) warnings.Merge(ws)
if err != nil { if err != nil {
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), warnings}) ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), warnings})
@ -1650,7 +1651,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
dropName := e.Func.Name != "last_over_time" dropName := e.Func.Name != "last_over_time"
for i, s := range selVS.Series { for i, s := range selVS.Series {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil { if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err) ev.error(err)
} }
ev.currentSamples -= len(floats) + totalHPointSize(histograms) ev.currentSamples -= len(floats) + totalHPointSize(histograms)
@ -1796,10 +1797,10 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
return mat, warnings return mat, warnings
case *parser.ParenExpr: case *parser.ParenExpr:
return ev.eval(e.Expr) return ev.eval(ctx, e.Expr)
case *parser.UnaryExpr: case *parser.UnaryExpr:
val, ws := ev.eval(e.Expr) val, ws := ev.eval(ctx, e.Expr)
mat := val.(Matrix) mat := val.(Matrix)
if e.Op == parser.SUB { if e.Op == parser.SUB {
for i := range mat { for i := range mat {
@ -1820,7 +1821,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
case *parser.BinaryExpr: case *parser.BinaryExpr:
switch lt, rt := e.LHS.Type(), e.RHS.Type(); { switch lt, rt := e.LHS.Type(), e.RHS.Type(); {
case lt == parser.ValueTypeScalar && rt == parser.ValueTypeScalar: case lt == parser.ValueTypeScalar && rt == parser.ValueTypeScalar:
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
val := scalarBinop(e.Op, v[0].(Vector)[0].F, v[1].(Vector)[0].F) val := scalarBinop(e.Op, v[0].(Vector)[0].F, v[1].(Vector)[0].F)
return append(enh.Out, Sample{F: val}), nil return append(enh.Out, Sample{F: val}), nil
}, e.LHS, e.RHS) }, e.LHS, e.RHS)
@ -1833,47 +1834,49 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
} }
switch e.Op { switch e.Op {
case parser.LAND: case parser.LAND:
return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil
}, e.LHS, e.RHS) }, e.LHS, e.RHS)
case parser.LOR: case parser.LOR:
return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil
}, e.LHS, e.RHS) }, e.LHS, e.RHS)
case parser.LUNLESS: case parser.LUNLESS:
return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil
}, e.LHS, e.RHS) }, e.LHS, e.RHS)
default: default:
return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
vec, err := ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, sh[0], sh[1], enh) vec, err := ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, sh[0], sh[1], enh)
return vec, handleVectorBinopError(err, e) return vec, handleVectorBinopError(err, e)
}, e.LHS, e.RHS) }, e.LHS, e.RHS)
} }
case lt == parser.ValueTypeVector && rt == parser.ValueTypeScalar: case lt == parser.ValueTypeVector && rt == parser.ValueTypeScalar:
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
vec, err := ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].F}, false, e.ReturnBool, enh) vec, err := ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].F}, false, e.ReturnBool, enh)
return vec, handleVectorBinopError(err, e) return vec, handleVectorBinopError(err, e)
}, e.LHS, e.RHS) }, e.LHS, e.RHS)
case lt == parser.ValueTypeScalar && rt == parser.ValueTypeVector: case lt == parser.ValueTypeScalar && rt == parser.ValueTypeVector:
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
vec, err := ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].F}, true, e.ReturnBool, enh) vec, err := ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].F}, true, e.ReturnBool, enh)
return vec, handleVectorBinopError(err, e) return vec, handleVectorBinopError(err, e)
}, e.LHS, e.RHS) }, e.LHS, e.RHS)
} }
case *parser.NumberLiteral: case *parser.NumberLiteral:
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { span.SetAttributes(attribute.Float64("value", e.Val))
return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
return append(enh.Out, Sample{F: e.Val, Metric: labels.EmptyLabels()}), nil return append(enh.Out, Sample{F: e.Val, Metric: labels.EmptyLabels()}), nil
}) })
case *parser.StringLiteral: case *parser.StringLiteral:
span.SetAttributes(attribute.String("value", e.Val))
return String{V: e.Val, T: ev.startTimestamp}, nil return String{V: e.Val, T: ev.startTimestamp}, nil
case *parser.VectorSelector: case *parser.VectorSelector:
ws, err := checkAndExpandSeriesSet(ev.ctx, e) ws, err := checkAndExpandSeriesSet(ctx, e)
if err != nil { if err != nil {
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
} }
@ -1882,7 +1885,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
var chkIter chunkenc.Iterator var chkIter chunkenc.Iterator
for i, s := range e.Series { for i, s := range e.Series {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil { if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err) ev.error(err)
} }
chkIter = s.Iterator(chkIter) chkIter = s.Iterator(chkIter)
@ -1933,14 +1936,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
if ev.startTimestamp != ev.endTimestamp { if ev.startTimestamp != ev.endTimestamp {
panic(errors.New("cannot do range evaluation of matrix selector")) panic(errors.New("cannot do range evaluation of matrix selector"))
} }
return ev.matrixSelector(e) return ev.matrixSelector(ctx, e)
case *parser.SubqueryExpr: case *parser.SubqueryExpr:
offsetMillis := durationMilliseconds(e.Offset) offsetMillis := durationMilliseconds(e.Offset)
rangeMillis := durationMilliseconds(e.Range) rangeMillis := durationMilliseconds(e.Range)
newEv := &evaluator{ newEv := &evaluator{
endTimestamp: ev.endTimestamp - offsetMillis, endTimestamp: ev.endTimestamp - offsetMillis,
ctx: ev.ctx,
currentSamples: ev.currentSamples, currentSamples: ev.currentSamples,
maxSamples: ev.maxSamples, maxSamples: ev.maxSamples,
logger: ev.logger, logger: ev.logger,
@ -1970,7 +1972,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
setOffsetForAtModifier(newEv.startTimestamp, e.Expr) setOffsetForAtModifier(newEv.startTimestamp, e.Expr)
} }
res, ws := newEv.eval(e.Expr) res, ws := newEv.eval(ctx, e.Expr)
ev.currentSamples = newEv.currentSamples ev.currentSamples = newEv.currentSamples
ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats) ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats)
ev.samplesStats.IncrementSamplesAtTimestamp(ev.endTimestamp, newEv.samplesStats.TotalSamples) ev.samplesStats.IncrementSamplesAtTimestamp(ev.endTimestamp, newEv.samplesStats.TotalSamples)
@ -1978,14 +1980,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
case *parser.StepInvariantExpr: case *parser.StepInvariantExpr:
switch ce := e.Expr.(type) { switch ce := e.Expr.(type) {
case *parser.StringLiteral, *parser.NumberLiteral: case *parser.StringLiteral, *parser.NumberLiteral:
return ev.eval(ce) return ev.eval(ctx, ce)
} }
newEv := &evaluator{ newEv := &evaluator{
startTimestamp: ev.startTimestamp, startTimestamp: ev.startTimestamp,
endTimestamp: ev.startTimestamp, // Always a single evaluation. endTimestamp: ev.startTimestamp, // Always a single evaluation.
interval: ev.interval, interval: ev.interval,
ctx: ev.ctx,
currentSamples: ev.currentSamples, currentSamples: ev.currentSamples,
maxSamples: ev.maxSamples, maxSamples: ev.maxSamples,
logger: ev.logger, logger: ev.logger,
@ -1994,7 +1995,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio
noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn, noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn,
enableDelayedNameRemoval: ev.enableDelayedNameRemoval, enableDelayedNameRemoval: ev.enableDelayedNameRemoval,
} }
res, ws := newEv.eval(e.Expr) res, ws := newEv.eval(ctx, e.Expr)
ev.currentSamples = newEv.currentSamples ev.currentSamples = newEv.currentSamples
ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats) ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats)
for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval {
@ -2070,8 +2071,8 @@ func reuseOrGetFPointSlices(prevSS *Series, numSteps int) (r []FPoint) {
return getFPointSlice(numSteps) return getFPointSlice(numSteps)
} }
func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) { func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(ctx context.Context, vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) {
ws, err := checkAndExpandSeriesSet(ev.ctx, vs) ws, err := checkAndExpandSeriesSet(ctx, vs)
if err != nil { if err != nil {
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
} }
@ -2082,7 +2083,7 @@ func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.Vec
seriesIterators[i] = storage.NewMemoizedIterator(it, durationMilliseconds(ev.lookbackDelta)) seriesIterators[i] = storage.NewMemoizedIterator(it, durationMilliseconds(ev.lookbackDelta))
} }
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) {
if vs.Timestamp != nil { if vs.Timestamp != nil {
// This is a special case for "timestamp()" when the @ modifier is used, to ensure that // This is a special case for "timestamp()" when the @ modifier is used, to ensure that
// we return a point for each time step in this case. // we return a point for each time step in this case.
@ -2218,7 +2219,7 @@ func putMatrixSelectorHPointSlice(p []HPoint) {
} }
// matrixSelector evaluates a *parser.MatrixSelector expression. // matrixSelector evaluates a *parser.MatrixSelector expression.
func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annotations.Annotations) { func (ev *evaluator) matrixSelector(ctx context.Context, node *parser.MatrixSelector) (Matrix, annotations.Annotations) {
var ( var (
vs = node.VectorSelector.(*parser.VectorSelector) vs = node.VectorSelector.(*parser.VectorSelector)
@ -2229,7 +2230,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota
it = storage.NewBuffer(durationMilliseconds(node.Range)) it = storage.NewBuffer(durationMilliseconds(node.Range))
) )
ws, err := checkAndExpandSeriesSet(ev.ctx, node) ws, err := checkAndExpandSeriesSet(ctx, node)
if err != nil { if err != nil {
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
} }
@ -2237,7 +2238,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota
var chkIter chunkenc.Iterator var chkIter chunkenc.Iterator
series := vs.Series series := vs.Series
for i, s := range series { for i, s := range series {
if err := contextDone(ev.ctx, "expression evaluation"); err != nil { if err := contextDone(ctx, "expression evaluation"); err != nil {
ev.error(err) ev.error(err)
} }
chkIter = s.Iterator(chkIter) chkIter = s.Iterator(chkIter)

View file

@ -14,6 +14,7 @@
package promql package promql
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"math" "math"
@ -1463,7 +1464,7 @@ func funcChanges(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelp
} }
// label_replace function operates only on series; does not look at timestamps or values. // label_replace function operates only on series; does not look at timestamps or values.
func (ev *evaluator) evalLabelReplace(args parser.Expressions) (parser.Value, annotations.Annotations) { func (ev *evaluator) evalLabelReplace(ctx context.Context, args parser.Expressions) (parser.Value, annotations.Annotations) {
var ( var (
dst = stringFromArg(args[1]) dst = stringFromArg(args[1])
repl = stringFromArg(args[2]) repl = stringFromArg(args[2])
@ -1479,7 +1480,7 @@ func (ev *evaluator) evalLabelReplace(args parser.Expressions) (parser.Value, an
panic(fmt.Errorf("invalid destination label name in label_replace(): %s", dst)) panic(fmt.Errorf("invalid destination label name in label_replace(): %s", dst))
} }
val, ws := ev.eval(args[0]) val, ws := ev.eval(ctx, args[0])
matrix := val.(Matrix) matrix := val.(Matrix)
lb := labels.NewBuilder(labels.EmptyLabels()) lb := labels.NewBuilder(labels.EmptyLabels())
@ -1520,7 +1521,7 @@ func funcVector(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelpe
} }
// label_join function operates only on series; does not look at timestamps or values. // label_join function operates only on series; does not look at timestamps or values.
func (ev *evaluator) evalLabelJoin(args parser.Expressions) (parser.Value, annotations.Annotations) { func (ev *evaluator) evalLabelJoin(ctx context.Context, args parser.Expressions) (parser.Value, annotations.Annotations) {
var ( var (
dst = stringFromArg(args[1]) dst = stringFromArg(args[1])
sep = stringFromArg(args[2]) sep = stringFromArg(args[2])
@ -1537,7 +1538,7 @@ func (ev *evaluator) evalLabelJoin(args parser.Expressions) (parser.Value, annot
panic(fmt.Errorf("invalid destination label name in label_join(): %s", dst)) panic(fmt.Errorf("invalid destination label name in label_join(): %s", dst))
} }
val, ws := ev.eval(args[0]) val, ws := ev.eval(ctx, args[0])
matrix := val.(Matrix) matrix := val.(Matrix)
srcVals := make([]string, len(srcLabels)) srcVals := make([]string, len(srcLabels))
lb := labels.NewBuilder(labels.EmptyLabels()) lb := labels.NewBuilder(labels.EmptyLabels())

View file

@ -72,6 +72,11 @@ func (node *AggregateExpr) String() string {
return aggrString return aggrString
} }
func (node *AggregateExpr) ShortString() string {
aggrString := node.getAggOpStr()
return aggrString
}
func (node *AggregateExpr) getAggOpStr() string { func (node *AggregateExpr) getAggOpStr() string {
aggrString := node.Op.String() aggrString := node.Op.String()
@ -95,14 +100,20 @@ func joinLabels(ss []string) string {
return strings.Join(ss, ", ") return strings.Join(ss, ", ")
} }
func (node *BinaryExpr) String() string { func (node *BinaryExpr) returnBool() string {
returnBool := ""
if node.ReturnBool { if node.ReturnBool {
returnBool = " bool" return " bool"
} }
return ""
}
func (node *BinaryExpr) String() string {
matching := node.getMatchingStr() matching := node.getMatchingStr()
return fmt.Sprintf("%s %s%s%s %s", node.LHS, node.Op, returnBool, matching, node.RHS) return fmt.Sprintf("%s %s%s%s %s", node.LHS, node.Op, node.returnBool(), matching, node.RHS)
}
func (node *BinaryExpr) ShortString() string {
return fmt.Sprintf("%s%s%s", node.Op, node.returnBool(), node.getMatchingStr())
} }
func (node *BinaryExpr) getMatchingStr() string { func (node *BinaryExpr) getMatchingStr() string {
@ -130,9 +141,13 @@ func (node *Call) String() string {
return fmt.Sprintf("%s(%s)", node.Func.Name, node.Args) return fmt.Sprintf("%s(%s)", node.Func.Name, node.Args)
} }
func (node *MatrixSelector) String() string { func (node *Call) ShortString() string {
return node.Func.Name
}
func (node *MatrixSelector) atOffset() (string, string) {
// Copy the Vector selector before changing the offset // Copy the Vector selector before changing the offset
vecSelector := *node.VectorSelector.(*VectorSelector) vecSelector := node.VectorSelector.(*VectorSelector)
offset := "" offset := ""
switch { switch {
case vecSelector.OriginalOffset > time.Duration(0): case vecSelector.OriginalOffset > time.Duration(0):
@ -149,7 +164,13 @@ func (node *MatrixSelector) String() string {
case vecSelector.StartOrEnd == END: case vecSelector.StartOrEnd == END:
at = " @ end()" at = " @ end()"
} }
return at, offset
}
func (node *MatrixSelector) String() string {
at, offset := node.atOffset()
// Copy the Vector selector before changing the offset
vecSelector := *node.VectorSelector.(*VectorSelector)
// Do not print the @ and offset twice. // Do not print the @ and offset twice.
offsetVal, atVal, preproc := vecSelector.OriginalOffset, vecSelector.Timestamp, vecSelector.StartOrEnd offsetVal, atVal, preproc := vecSelector.OriginalOffset, vecSelector.Timestamp, vecSelector.StartOrEnd
vecSelector.OriginalOffset = 0 vecSelector.OriginalOffset = 0
@ -163,10 +184,19 @@ func (node *MatrixSelector) String() string {
return str return str
} }
func (node *MatrixSelector) ShortString() string {
at, offset := node.atOffset()
return fmt.Sprintf("[%s]%s%s", model.Duration(node.Range), at, offset)
}
func (node *SubqueryExpr) String() string { func (node *SubqueryExpr) String() string {
return fmt.Sprintf("%s%s", node.Expr.String(), node.getSubqueryTimeSuffix()) return fmt.Sprintf("%s%s", node.Expr.String(), node.getSubqueryTimeSuffix())
} }
func (node *SubqueryExpr) ShortString() string {
return node.getSubqueryTimeSuffix()
}
// getSubqueryTimeSuffix returns the '[<range>:<step>] @ <timestamp> offset <offset>' suffix of the subquery. // getSubqueryTimeSuffix returns the '[<range>:<step>] @ <timestamp> offset <offset>' suffix of the subquery.
func (node *SubqueryExpr) getSubqueryTimeSuffix() string { func (node *SubqueryExpr) getSubqueryTimeSuffix() string {
step := "" step := ""
@ -208,6 +238,10 @@ func (node *UnaryExpr) String() string {
return fmt.Sprintf("%s%s", node.Op, node.Expr) return fmt.Sprintf("%s%s", node.Op, node.Expr)
} }
func (node *UnaryExpr) ShortString() string {
return node.Op.String()
}
func (node *VectorSelector) String() string { func (node *VectorSelector) String() string {
var labelStrings []string var labelStrings []string
if len(node.LabelMatchers) > 1 { if len(node.LabelMatchers) > 1 {