// Copyright 2013 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package promql import ( "bytes" "container/heap" "context" "errors" "fmt" "io" "log/slog" "math" "reflect" "runtime" "slices" "sort" "strconv" "strings" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/stats" "github.com/prometheus/prometheus/util/zeropool" ) const ( namespace = "prometheus" subsystem = "engine" queryTag = "query" env = "query execution" defaultLookbackDelta = 5 * time.Minute // The largest SampleValue that can be converted to an int64 without overflow. maxInt64 = 9223372036854774784 // The smallest SampleValue that can be converted to an int64 without underflow. minInt64 = -9223372036854775808 // Max initial size for the pooled points slices. // The getHPointSlice and getFPointSlice functions are called with an estimated size which often can be // over-estimated. maxPointsSliceSize = 5000 // The default buffer size for points used by the matrix selector. matrixSelectorSliceSize = 16 ) type engineMetrics struct { currentQueries prometheus.Gauge maxConcurrentQueries prometheus.Gauge queryLogEnabled prometheus.Gauge queryLogFailures prometheus.Counter queryQueueTime prometheus.Observer queryPrepareTime prometheus.Observer queryInnerEval prometheus.Observer queryResultSort prometheus.Observer querySamples prometheus.Counter } // convertibleToInt64 returns true if v does not over-/underflow an int64. func convertibleToInt64(v float64) bool { return v <= maxInt64 && v >= minInt64 } type ( // ErrQueryTimeout is returned if a query timed out during processing. ErrQueryTimeout string // ErrQueryCanceled is returned if a query was canceled during processing. ErrQueryCanceled string // ErrTooManySamples is returned if a query would load more than the maximum allowed samples into memory. ErrTooManySamples string // ErrStorage is returned if an error was encountered in the storage layer // during query handling. ErrStorage struct{ Err error } ) func (e ErrQueryTimeout) Error() string { return fmt.Sprintf("query timed out in %s", string(e)) } func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was canceled in %s", string(e)) } func (e ErrTooManySamples) Error() string { return fmt.Sprintf("query processing would load too many samples into memory in %s", string(e)) } func (e ErrStorage) Error() string { return e.Err.Error() } // QueryEngine defines the interface for the *promql.Engine, so it can be replaced, wrapped or mocked. type QueryEngine interface { NewInstantQuery(ctx context.Context, q storage.Queryable, opts QueryOpts, qs string, ts time.Time) (Query, error) NewRangeQuery(ctx context.Context, q storage.Queryable, opts QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) } // QueryLogger is an interface that can be used to log all the queries logged // by the engine. type QueryLogger interface { Error(msg string, args ...any) Info(msg string, args ...any) Debug(msg string, args ...any) Warn(msg string, args ...any) With(args ...any) Close() error } // A Query is derived from an a raw query string and can be run against an engine // it is associated with. type Query interface { // Exec processes the query. Can only be called once. Exec(ctx context.Context) *Result // Close recovers memory used by the query result. Close() // Statement returns the parsed statement of the query. Statement() parser.Statement // Stats returns statistics about the lifetime of the query. Stats() *stats.Statistics // Cancel signals that a running query execution should be aborted. Cancel() // String returns the original query string. String() string } type PrometheusQueryOpts struct { // Enables recording per-step statistics if the engine has it enabled as well. Disabled by default. enablePerStepStats bool // Lookback delta duration for this query. lookbackDelta time.Duration } var _ QueryOpts = &PrometheusQueryOpts{} func NewPrometheusQueryOpts(enablePerStepStats bool, lookbackDelta time.Duration) QueryOpts { return &PrometheusQueryOpts{ enablePerStepStats: enablePerStepStats, lookbackDelta: lookbackDelta, } } func (p *PrometheusQueryOpts) EnablePerStepStats() bool { return p.enablePerStepStats } func (p *PrometheusQueryOpts) LookbackDelta() time.Duration { return p.lookbackDelta } type QueryOpts interface { // Enables recording per-step statistics if the engine has it enabled as well. Disabled by default. EnablePerStepStats() bool // Lookback delta duration for this query. LookbackDelta() time.Duration } // query implements the Query interface. type query struct { // Underlying data provider. queryable storage.Queryable // The original query string. q string // Statement of the parsed query. stmt parser.Statement // Timer stats for the query execution. stats *stats.QueryTimers // Sample stats for the query execution. sampleStats *stats.QuerySamples // Result matrix for reuse. matrix Matrix // Cancellation function for the query. cancel func() // The engine against which the query is executed. ng *Engine } type QueryOrigin struct{} // Statement implements the Query interface. // Calling this after Exec may result in panic, // see https://github.com/prometheus/prometheus/issues/8949. func (q *query) Statement() parser.Statement { return q.stmt } // String implements the Query interface. func (q *query) String() string { return q.q } // Stats implements the Query interface. func (q *query) Stats() *stats.Statistics { return &stats.Statistics{ Timers: q.stats, Samples: q.sampleStats, } } // Cancel implements the Query interface. func (q *query) Cancel() { if q.cancel != nil { q.cancel() } } // Close implements the Query interface. func (q *query) Close() { for _, s := range q.matrix { putFPointSlice(s.Floats) putHPointSlice(s.Histograms) } } // Exec implements the Query interface. func (q *query) Exec(ctx context.Context) *Result { if span := trace.SpanFromContext(ctx); span != nil { span.SetAttributes(attribute.String(queryTag, q.stmt.String())) } // Exec query. res, warnings, err := q.ng.exec(ctx, q) return &Result{Err: err, Value: res, Warnings: warnings} } // contextDone returns an error if the context was canceled or timed out. func contextDone(ctx context.Context, env string) error { if err := ctx.Err(); err != nil { return contextErr(err, env) } return nil } func contextErr(err error, env string) error { switch { case errors.Is(err, context.Canceled): return ErrQueryCanceled(env) case errors.Is(err, context.DeadlineExceeded): return ErrQueryTimeout(env) default: return err } } // QueryTracker provides access to two features: // // 1) Tracking of active query. If PromQL engine crashes while executing any query, such query should be present // in the tracker on restart, hence logged. After the logging on restart, the tracker gets emptied. // // 2) Enforcement of the maximum number of concurrent queries. type QueryTracker interface { io.Closer // GetMaxConcurrent returns maximum number of concurrent queries that are allowed by this tracker. GetMaxConcurrent() int // Insert inserts query into query tracker. This call must block if maximum number of queries is already running. // If Insert doesn't return error then returned integer value should be used in subsequent Delete call. // Insert should return error if context is finished before query can proceed, and integer value returned in this case should be ignored by caller. Insert(ctx context.Context, query string) (int, error) // Delete removes query from activity tracker. InsertIndex is value returned by Insert call. Delete(insertIndex int) } // EngineOpts contains configuration options used when creating a new Engine. type EngineOpts struct { Logger *slog.Logger Reg prometheus.Registerer MaxSamples int Timeout time.Duration ActiveQueryTracker QueryTracker // LookbackDelta determines the time since the last sample after which a time // series is considered stale. LookbackDelta time.Duration // NoStepSubqueryIntervalFn is the default evaluation interval of // a subquery in milliseconds if no step in range vector was specified `[30m:]`. NoStepSubqueryIntervalFn func(rangeMillis int64) int64 // EnableAtModifier if true enables @ modifier. Disabled otherwise. This // is supposed to be enabled for regular PromQL (as of Prometheus v2.33) // but the option to disable it is still provided here for those using // the Engine outside of Prometheus. EnableAtModifier bool // EnableNegativeOffset if true enables negative (-) offset // values. Disabled otherwise. This is supposed to be enabled for // regular PromQL (as of Prometheus v2.33) but the option to disable it // is still provided here for those using the Engine outside of // Prometheus. EnableNegativeOffset bool // EnablePerStepStats if true allows for per-step stats to be computed on request. Disabled otherwise. EnablePerStepStats bool // EnableDelayedNameRemoval delays the removal of the __name__ label to the last step of the query evaluation. // This is useful in certain scenarios where the __name__ label must be preserved or where applying a // regex-matcher to the __name__ label may otherwise lead to duplicate labelset errors. EnableDelayedNameRemoval bool } // Engine handles the lifetime of queries from beginning to end. // It is connected to a querier. type Engine struct { logger *slog.Logger metrics *engineMetrics timeout time.Duration maxSamplesPerQuery int activeQueryTracker QueryTracker queryLogger QueryLogger queryLoggerLock sync.RWMutex lookbackDelta time.Duration noStepSubqueryIntervalFn func(rangeMillis int64) int64 enableAtModifier bool enableNegativeOffset bool enablePerStepStats bool enableDelayedNameRemoval bool } // NewEngine returns a new engine. func NewEngine(opts EngineOpts) *Engine { if opts.Logger == nil { opts.Logger = promslog.NewNopLogger() } queryResultSummary := prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, Name: "query_duration_seconds", Help: "Query timings", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }, []string{"slice"}, ) metrics := &engineMetrics{ currentQueries: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "queries", Help: "The current number of queries being executed or waiting.", }), queryLogEnabled: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "query_log_enabled", Help: "State of the query log.", }), queryLogFailures: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "query_log_failures_total", Help: "The number of query log failures.", }), maxConcurrentQueries: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, Name: "queries_concurrent_max", Help: "The max number of concurrent queries.", }), querySamples: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, Name: "query_samples_total", Help: "The total number of samples loaded by all queries.", }), queryQueueTime: queryResultSummary.WithLabelValues("queue_time"), queryPrepareTime: queryResultSummary.WithLabelValues("prepare_time"), queryInnerEval: queryResultSummary.WithLabelValues("inner_eval"), queryResultSort: queryResultSummary.WithLabelValues("result_sort"), } if t := opts.ActiveQueryTracker; t != nil { metrics.maxConcurrentQueries.Set(float64(t.GetMaxConcurrent())) } else { metrics.maxConcurrentQueries.Set(-1) } if opts.LookbackDelta == 0 { opts.LookbackDelta = defaultLookbackDelta if l := opts.Logger; l != nil { l.Debug("Lookback delta is zero, setting to default value", "value", defaultLookbackDelta) } } if opts.Reg != nil { opts.Reg.MustRegister( metrics.currentQueries, metrics.maxConcurrentQueries, metrics.queryLogEnabled, metrics.queryLogFailures, metrics.querySamples, queryResultSummary, ) } return &Engine{ timeout: opts.Timeout, logger: opts.Logger, metrics: metrics, maxSamplesPerQuery: opts.MaxSamples, activeQueryTracker: opts.ActiveQueryTracker, lookbackDelta: opts.LookbackDelta, noStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn, enableAtModifier: opts.EnableAtModifier, enableNegativeOffset: opts.EnableNegativeOffset, enablePerStepStats: opts.EnablePerStepStats, enableDelayedNameRemoval: opts.EnableDelayedNameRemoval, } } // Close closes ng. func (ng *Engine) Close() error { if ng == nil { return nil } if ng.activeQueryTracker != nil { return ng.activeQueryTracker.Close() } return nil } // SetQueryLogger sets the query logger. func (ng *Engine) SetQueryLogger(l QueryLogger) { ng.queryLoggerLock.Lock() defer ng.queryLoggerLock.Unlock() if ng.queryLogger != nil { // An error closing the old file descriptor should // not make reload fail; only log a warning. err := ng.queryLogger.Close() if err != nil { ng.logger.Warn("Error while closing the previous query log file", "err", err) } } ng.queryLogger = l if l != nil { ng.metrics.queryLogEnabled.Set(1) } else { ng.metrics.queryLogEnabled.Set(0) } } // NewInstantQuery returns an evaluation query for the given expression at the given time. func (ng *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts QueryOpts, qs string, ts time.Time) (Query, error) { pExpr, qry := ng.newQuery(q, qs, opts, ts, ts, 0) finishQueue, err := ng.queueActive(ctx, qry) if err != nil { return nil, err } defer finishQueue() expr, err := parser.ParseExpr(qs) if err != nil { return nil, err } if err := ng.validateOpts(expr); err != nil { return nil, err } *pExpr = PreprocessExpr(expr, ts, ts) return qry, nil } // NewRangeQuery returns an evaluation query for the given time range and with // the resolution set by the interval. func (ng *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) { pExpr, qry := ng.newQuery(q, qs, opts, start, end, interval) finishQueue, err := ng.queueActive(ctx, qry) if err != nil { return nil, err } defer finishQueue() expr, err := parser.ParseExpr(qs) if err != nil { return nil, err } if err := ng.validateOpts(expr); err != nil { return nil, err } if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar { return nil, fmt.Errorf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type())) } *pExpr = PreprocessExpr(expr, start, end) return qry, nil } func (ng *Engine) newQuery(q storage.Queryable, qs string, opts QueryOpts, start, end time.Time, interval time.Duration) (*parser.Expr, *query) { if opts == nil { opts = NewPrometheusQueryOpts(false, 0) } lookbackDelta := opts.LookbackDelta() if lookbackDelta <= 0 { lookbackDelta = ng.lookbackDelta } es := &parser.EvalStmt{ Start: start, End: end, Interval: interval, LookbackDelta: lookbackDelta, } qry := &query{ q: qs, stmt: es, ng: ng, stats: stats.NewQueryTimers(), sampleStats: stats.NewQuerySamples(ng.enablePerStepStats && opts.EnablePerStepStats()), queryable: q, } return &es.Expr, qry } var ( ErrValidationAtModifierDisabled = errors.New("@ modifier is disabled") ErrValidationNegativeOffsetDisabled = errors.New("negative offset is disabled") ) func (ng *Engine) validateOpts(expr parser.Expr) error { if ng.enableAtModifier && ng.enableNegativeOffset { return nil } var atModifierUsed, negativeOffsetUsed bool var validationErr error parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { switch n := node.(type) { case *parser.VectorSelector: if n.Timestamp != nil || n.StartOrEnd == parser.START || n.StartOrEnd == parser.END { atModifierUsed = true } if n.OriginalOffset < 0 { negativeOffsetUsed = true } case *parser.MatrixSelector: vs := n.VectorSelector.(*parser.VectorSelector) if vs.Timestamp != nil || vs.StartOrEnd == parser.START || vs.StartOrEnd == parser.END { atModifierUsed = true } if vs.OriginalOffset < 0 { negativeOffsetUsed = true } case *parser.SubqueryExpr: if n.Timestamp != nil || n.StartOrEnd == parser.START || n.StartOrEnd == parser.END { atModifierUsed = true } if n.OriginalOffset < 0 { negativeOffsetUsed = true } } if atModifierUsed && !ng.enableAtModifier { validationErr = ErrValidationAtModifierDisabled return validationErr } if negativeOffsetUsed && !ng.enableNegativeOffset { validationErr = ErrValidationNegativeOffsetDisabled return validationErr } return nil }) return validationErr } // NewTestQuery injects special behaviour into Query for testing. func (ng *Engine) NewTestQuery(f func(context.Context) error) Query { qry := &query{ q: "test statement", stmt: parser.TestStmt(f), ng: ng, stats: stats.NewQueryTimers(), sampleStats: stats.NewQuerySamples(ng.enablePerStepStats), } return qry } // exec executes the query. // // At this point per query only one EvalStmt is evaluated. Alert and record // statements are not handled by the Engine. func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, ws annotations.Annotations, err error) { ng.metrics.currentQueries.Inc() defer func() { ng.metrics.currentQueries.Dec() ng.metrics.querySamples.Add(float64(q.sampleStats.TotalSamples)) }() ctx, cancel := context.WithTimeout(ctx, ng.timeout) q.cancel = cancel defer func() { ng.queryLoggerLock.RLock() if l := ng.queryLogger; l != nil { params := make(map[string]interface{}, 4) params["query"] = q.q if eq, ok := q.Statement().(*parser.EvalStmt); ok { params["start"] = formatDate(eq.Start) params["end"] = formatDate(eq.End) // The step provided by the user is in seconds. params["step"] = int64(eq.Interval / (time.Second / time.Nanosecond)) } l.With("params", params) if err != nil { l.With("error", err) } l.With("stats", stats.NewQueryStats(q.Stats())) if span := trace.SpanFromContext(ctx); span != nil { l.With("spanID", span.SpanContext().SpanID()) } if origin := ctx.Value(QueryOrigin{}); origin != nil { for k, v := range origin.(map[string]interface{}) { l.With(k, v) } } l.Info("promql query logged") // TODO: @tjhop -- do we still need this metric/error log if logger doesn't return errors? // ng.metrics.queryLogFailures.Inc() // ng.logger.Error("can't log query", "err", err) } ng.queryLoggerLock.RUnlock() }() execSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.ExecTotalTime) defer execSpanTimer.Finish() finishQueue, err := ng.queueActive(ctx, q) if err != nil { return nil, nil, err } defer finishQueue() // Cancel when execution is done or an error was raised. defer q.cancel() evalSpanTimer, ctx := q.stats.GetSpanTimer(ctx, stats.EvalTotalTime) defer evalSpanTimer.Finish() // The base context might already be canceled on the first iteration (e.g. during shutdown). if err := contextDone(ctx, env); err != nil { return nil, nil, err } switch s := q.Statement().(type) { case *parser.EvalStmt: return ng.execEvalStmt(ctx, q, s) case parser.TestStmt: return nil, nil, s(ctx) } panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement())) } // Log query in active log. The active log guarantees that we don't run over // MaxConcurrent queries. func (ng *Engine) queueActive(ctx context.Context, q *query) (func(), error) { if ng.activeQueryTracker == nil { return func() {}, nil } queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime) queryIndex, err := ng.activeQueryTracker.Insert(ctx, q.q) queueSpanTimer.Finish() return func() { ng.activeQueryTracker.Delete(queryIndex) }, err } func timeMilliseconds(t time.Time) int64 { return t.UnixNano() / int64(time.Millisecond/time.Nanosecond) } func durationMilliseconds(d time.Duration) int64 { return int64(d / (time.Millisecond / time.Nanosecond)) } // execEvalStmt evaluates the expression of an evaluation statement for the given time range. func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.EvalStmt) (parser.Value, annotations.Annotations, error) { prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime) mint, maxt := FindMinMaxTime(s) querier, err := query.queryable.Querier(mint, maxt) if err != nil { prepareSpanTimer.Finish() return nil, nil, err } defer querier.Close() ng.populateSeries(ctxPrepare, querier, s) prepareSpanTimer.Finish() // Modify the offset of vector and matrix selectors for the @ modifier // w.r.t. the start time since only 1 evaluation will be done on them. setOffsetForAtModifier(timeMilliseconds(s.Start), s.Expr) evalSpanTimer, ctxInnerEval := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval) // Instant evaluation. This is executed as a range evaluation with one step. if s.Start == s.End && s.Interval == 0 { start := timeMilliseconds(s.Start) evaluator := &evaluator{ startTimestamp: start, endTimestamp: start, interval: 1, maxSamples: ng.maxSamplesPerQuery, logger: ng.logger, lookbackDelta: s.LookbackDelta, samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, enableDelayedNameRemoval: ng.enableDelayedNameRemoval, } query.sampleStats.InitStepTracking(start, start, 1) val, warnings, err := evaluator.Eval(ctxInnerEval, s.Expr) evalSpanTimer.Finish() if err != nil { return nil, warnings, err } var mat Matrix switch result := val.(type) { case Matrix: mat = result case String: return result, warnings, nil default: panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) } query.matrix = mat switch s.Expr.Type() { case parser.ValueTypeVector: // Convert matrix with one value per series into vector. vector := make(Vector, len(mat)) for i, s := range mat { // Point might have a different timestamp, force it to the evaluation // timestamp as that is when we ran the evaluation. if len(s.Histograms) > 0 { vector[i] = Sample{Metric: s.Metric, H: s.Histograms[0].H, T: start, DropName: s.DropName} } else { vector[i] = Sample{Metric: s.Metric, F: s.Floats[0].F, T: start, DropName: s.DropName} } } return vector, warnings, nil case parser.ValueTypeScalar: return Scalar{V: mat[0].Floats[0].F, T: start}, warnings, nil case parser.ValueTypeMatrix: ng.sortMatrixResult(ctx, query, mat) return mat, warnings, nil default: panic(fmt.Errorf("promql.Engine.exec: unexpected expression type %q", s.Expr.Type())) } } // Range evaluation. evaluator := &evaluator{ startTimestamp: timeMilliseconds(s.Start), endTimestamp: timeMilliseconds(s.End), interval: durationMilliseconds(s.Interval), maxSamples: ng.maxSamplesPerQuery, logger: ng.logger, lookbackDelta: s.LookbackDelta, samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, enableDelayedNameRemoval: ng.enableDelayedNameRemoval, } query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval) val, warnings, err := evaluator.Eval(ctxInnerEval, s.Expr) evalSpanTimer.Finish() if err != nil { return nil, warnings, err } mat, ok := val.(Matrix) if !ok { panic(fmt.Errorf("promql.Engine.exec: invalid expression type %q", val.Type())) } query.matrix = mat if err := contextDone(ctx, "expression evaluation"); err != nil { return nil, warnings, err } // TODO(fabxc): where to ensure metric labels are a copy from the storage internals. ng.sortMatrixResult(ctx, query, mat) return mat, warnings, nil } func (ng *Engine) sortMatrixResult(ctx context.Context, query *query, mat Matrix) { sortSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.ResultSortTime, ng.metrics.queryResultSort) sort.Sort(mat) sortSpanTimer.Finish() } // subqueryTimes returns the sum of offsets and ranges of all subqueries in the path. // If the @ modifier is used, then the offset and range is w.r.t. that timestamp // (i.e. the sum is reset when we have @ modifier). // The returned *int64 is the closest timestamp that was seen. nil for no @ modifier. func subqueryTimes(path []parser.Node) (time.Duration, time.Duration, *int64) { var ( subqOffset, subqRange time.Duration ts int64 = math.MaxInt64 ) for _, node := range path { if n, ok := node.(*parser.SubqueryExpr); ok { subqOffset += n.OriginalOffset subqRange += n.Range if n.Timestamp != nil { // The @ modifier on subquery invalidates all the offset and // range till now. Hence resetting it here. subqOffset = n.OriginalOffset subqRange = n.Range ts = *n.Timestamp } } } var tsp *int64 if ts != math.MaxInt64 { tsp = &ts } return subqOffset, subqRange, tsp } // FindMinMaxTime returns the time in milliseconds of the earliest and latest point in time the statement will try to process. // This takes into account offsets, @ modifiers, and range selectors. // If the statement does not select series, then FindMinMaxTime returns (0, 0). func FindMinMaxTime(s *parser.EvalStmt) (int64, int64) { var minTimestamp, maxTimestamp int64 = math.MaxInt64, math.MinInt64 // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. // The evaluation of the VectorSelector inside then evaluates the given range and unsets // the variable. var evalRange time.Duration parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error { switch n := node.(type) { case *parser.VectorSelector: start, end := getTimeRangesForSelector(s, n, path, evalRange) if start < minTimestamp { minTimestamp = start } if end > maxTimestamp { maxTimestamp = end } evalRange = 0 case *parser.MatrixSelector: evalRange = n.Range } return nil }) if maxTimestamp == math.MinInt64 { // This happens when there was no selector. Hence no time range to select. minTimestamp = 0 maxTimestamp = 0 } return minTimestamp, maxTimestamp } func getTimeRangesForSelector(s *parser.EvalStmt, n *parser.VectorSelector, path []parser.Node, evalRange time.Duration) (int64, int64) { start, end := timestamp.FromTime(s.Start), timestamp.FromTime(s.End) subqOffset, subqRange, subqTs := subqueryTimes(path) if subqTs != nil { // The timestamp on the subquery overrides the eval statement time ranges. start = *subqTs end = *subqTs } if n.Timestamp != nil { // The timestamp on the selector overrides everything. start = *n.Timestamp end = *n.Timestamp } else { offsetMilliseconds := durationMilliseconds(subqOffset) start = start - offsetMilliseconds - durationMilliseconds(subqRange) end -= offsetMilliseconds } if evalRange == 0 { // Reduce the start by one fewer ms than the lookback delta // because wo want to exclude samples that are precisely the // lookback delta before the eval time. start -= durationMilliseconds(s.LookbackDelta) - 1 } else { // For all matrix queries we want to ensure that we have // (end-start) + range selected this way we have `range` data // before the start time. We subtract one from the range to // exclude samples positioned directly at the lower boundary of // the range. start -= durationMilliseconds(evalRange) - 1 } offsetMilliseconds := durationMilliseconds(n.OriginalOffset) start -= offsetMilliseconds end -= offsetMilliseconds return start, end } func (ng *Engine) getLastSubqueryInterval(path []parser.Node) time.Duration { var interval time.Duration for _, node := range path { if n, ok := node.(*parser.SubqueryExpr); ok { interval = n.Step if n.Step == 0 { interval = time.Duration(ng.noStepSubqueryIntervalFn(durationMilliseconds(n.Range))) * time.Millisecond } } } return interval } func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s *parser.EvalStmt) { // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. // The evaluation of the VectorSelector inside then evaluates the given range and unsets // the variable. var evalRange time.Duration parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error { switch n := node.(type) { case *parser.VectorSelector: start, end := getTimeRangesForSelector(s, n, path, evalRange) interval := ng.getLastSubqueryInterval(path) if interval == 0 { interval = s.Interval } hints := &storage.SelectHints{ Start: start, End: end, Step: durationMilliseconds(interval), Range: durationMilliseconds(evalRange), Func: extractFuncFromPath(path), } evalRange = 0 hints.By, hints.Grouping = extractGroupsFromPath(path) n.UnexpandedSeriesSet = querier.Select(ctx, false, hints, n.LabelMatchers...) case *parser.MatrixSelector: evalRange = n.Range } return nil }) } // extractFuncFromPath walks up the path and searches for the first instance of // a function or aggregation. func extractFuncFromPath(p []parser.Node) string { if len(p) == 0 { return "" } switch n := p[len(p)-1].(type) { case *parser.AggregateExpr: return n.Op.String() case *parser.Call: return n.Func.Name case *parser.BinaryExpr: // If we hit a binary expression we terminate since we only care about functions // or aggregations over a single metric. return "" } return extractFuncFromPath(p[:len(p)-1]) } // extractGroupsFromPath parses vector outer function and extracts grouping information if by or without was used. func extractGroupsFromPath(p []parser.Node) (bool, []string) { if len(p) == 0 { return false, nil } if n, ok := p[len(p)-1].(*parser.AggregateExpr); ok { return !n.Without, n.Grouping } return false, nil } // checkAndExpandSeriesSet expands expr's UnexpandedSeriesSet into expr's Series. // If the Series field is already non-nil, it's a no-op. func checkAndExpandSeriesSet(ctx context.Context, expr parser.Expr) (annotations.Annotations, error) { switch e := expr.(type) { case *parser.MatrixSelector: return checkAndExpandSeriesSet(ctx, e.VectorSelector) case *parser.VectorSelector: if e.Series != nil { return nil, nil } span := trace.SpanFromContext(ctx) span.AddEvent("expand start", trace.WithAttributes(attribute.String("selector", e.String()))) series, ws, err := expandSeriesSet(ctx, e.UnexpandedSeriesSet) if e.SkipHistogramBuckets { for i := range series { series[i] = newHistogramStatsSeries(series[i]) } } e.Series = series span.AddEvent("expand end", trace.WithAttributes(attribute.Int("num_series", len(series)))) return ws, err } return nil, nil } func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, ws annotations.Annotations, err error) { for it.Next() { select { case <-ctx.Done(): return nil, nil, ctx.Err() default: } res = append(res, it.At()) } return res, it.Warnings(), it.Err() } type errWithWarnings struct { err error warnings annotations.Annotations } func (e errWithWarnings) Error() string { return e.err.Error() } // An evaluator evaluates the given expressions over the given fixed // timestamps. It is attached to an engine through which it connects to a // querier and reports errors. On timeout or cancellation of its context it // terminates. type evaluator struct { startTimestamp int64 // Start time in milliseconds. endTimestamp int64 // End time in milliseconds. interval int64 // Interval in milliseconds. maxSamples int currentSamples int logger *slog.Logger lookbackDelta time.Duration samplesStats *stats.QuerySamples noStepSubqueryIntervalFn func(rangeMillis int64) int64 enableDelayedNameRemoval bool } // errorf causes a panic with the input formatted into an error. func (ev *evaluator) errorf(format string, args ...interface{}) { ev.error(fmt.Errorf(format, args...)) } // error causes a panic with the given error. func (ev *evaluator) error(err error) { panic(err) } // recover is the handler that turns panics into returns from the top level of evaluation. func (ev *evaluator) recover(expr parser.Expr, ws *annotations.Annotations, errp *error) { e := recover() if e == nil { return } switch err := e.(type) { case runtime.Error: // Print the stack trace but do not inhibit the running application. buf := make([]byte, 64<<10) buf = buf[:runtime.Stack(buf, false)] ev.logger.Error("runtime panic during query evaluation", "expr", expr.String(), "err", e, "stacktrace", string(buf)) *errp = fmt.Errorf("unexpected error: %w", err) case errWithWarnings: *errp = err.err ws.Merge(err.warnings) case error: *errp = err default: *errp = fmt.Errorf("%v", err) } } func (ev *evaluator) Eval(ctx context.Context, expr parser.Expr) (v parser.Value, ws annotations.Annotations, err error) { defer ev.recover(expr, &ws, &err) v, ws = ev.eval(ctx, expr) if ev.enableDelayedNameRemoval { ev.cleanupMetricLabels(v) } return v, ws, nil } // EvalSeriesHelper stores extra information about a series. type EvalSeriesHelper struct { // Used to map left-hand to right-hand in binary operations. signature string } // EvalNodeHelper stores extra information and caches for evaluating a single node across steps. type EvalNodeHelper struct { // Evaluation timestamp. Ts int64 // Vector that can be used for output. Out Vector // Caches. // funcHistogramQuantile for classic histograms. signatureToMetricWithBuckets map[string]*metricWithBuckets lb *labels.Builder lblBuf []byte lblResultBuf []byte // For binary vector matching. rightSigs map[string]Sample matchedSigs map[string]map[uint64]struct{} resultMetric map[string]labels.Labels // Additional options for the evaluation. enableDelayedNameRemoval bool } func (enh *EvalNodeHelper) resetBuilder(lbls labels.Labels) { if enh.lb == nil { enh.lb = labels.NewBuilder(lbls) } else { enh.lb.Reset(lbls) } } // rangeEval evaluates the given expressions, and then for each step calls // the given funcCall with the values computed for each expression at that // step. The return value is the combination into time series of all the // function call results. // The prepSeries function (if provided) can be used to prepare the helper // for each series, then passed to each call funcCall. func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Labels, *EvalSeriesHelper), funcCall func([]parser.Value, [][]EvalSeriesHelper, *EvalNodeHelper) (Vector, annotations.Annotations), exprs ...parser.Expr) (Matrix, annotations.Annotations) { numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 matrixes := make([]Matrix, len(exprs)) origMatrixes := make([]Matrix, len(exprs)) originalNumSamples := ev.currentSamples var warnings annotations.Annotations for i, e := range exprs { // Functions will take string arguments from the expressions, not the values. if e != nil && e.Type() != parser.ValueTypeString { // ev.currentSamples will be updated to the correct value within the ev.eval call. val, ws := ev.eval(ctx, e) warnings.Merge(ws) matrixes[i] = val.(Matrix) // Keep a copy of the original point slices so that they // can be returned to the pool. origMatrixes[i] = make(Matrix, len(matrixes[i])) copy(origMatrixes[i], matrixes[i]) } } vectors := make([]Vector, len(exprs)) // Input vectors for the function. args := make([]parser.Value, len(exprs)) // Argument to function. // Create an output vector that is as big as the input matrix with // the most time series. biggestLen := 1 for i := range exprs { vectors[i] = make(Vector, 0, len(matrixes[i])) if len(matrixes[i]) > biggestLen { biggestLen = len(matrixes[i]) } } enh := &EvalNodeHelper{Out: make(Vector, 0, biggestLen), enableDelayedNameRemoval: ev.enableDelayedNameRemoval} type seriesAndTimestamp struct { Series ts int64 } seriess := make(map[uint64]seriesAndTimestamp, biggestLen) // Output series by series hash. tempNumSamples := ev.currentSamples var ( seriesHelpers [][]EvalSeriesHelper bufHelpers [][]EvalSeriesHelper // Buffer updated on each step ) // If the series preparation function is provided, we should run it for // every single series in the matrix. if prepSeries != nil { seriesHelpers = make([][]EvalSeriesHelper, len(exprs)) bufHelpers = make([][]EvalSeriesHelper, len(exprs)) for i := range exprs { seriesHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i])) bufHelpers[i] = make([]EvalSeriesHelper, len(matrixes[i])) for si, series := range matrixes[i] { prepSeries(series.Metric, &seriesHelpers[i][si]) } } } for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { if err := contextDone(ctx, "expression evaluation"); err != nil { ev.error(err) } // Reset number of samples in memory after each timestamp. ev.currentSamples = tempNumSamples // Gather input vectors for this timestamp. for i := range exprs { vectors[i] = vectors[i][:0] if prepSeries != nil { bufHelpers[i] = bufHelpers[i][:0] } for si, series := range matrixes[i] { switch { case len(series.Floats) > 0 && series.Floats[0].T == ts: vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts, DropName: series.DropName}) // Move input vectors forward so we don't have to re-scan the same // past points at the next step. matrixes[i][si].Floats = series.Floats[1:] case len(series.Histograms) > 0 && series.Histograms[0].T == ts: vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts, DropName: series.DropName}) matrixes[i][si].Histograms = series.Histograms[1:] default: continue } if prepSeries != nil { bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si]) } // Don't add histogram size here because we only // copy the pointer above, not the whole // histogram. ev.currentSamples++ if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } } args[i] = vectors[i] ev.samplesStats.UpdatePeak(ev.currentSamples) } // Make the function call. enh.Ts = ts result, ws := funcCall(args, bufHelpers, enh) enh.Out = result[:0] // Reuse result vector. warnings.Merge(ws) vecNumSamples := result.TotalSamples() ev.currentSamples += vecNumSamples // When we reset currentSamples to tempNumSamples during the next iteration of the loop it also // needs to include the samples from the result here, as they're still in memory. tempNumSamples += vecNumSamples ev.samplesStats.UpdatePeak(ev.currentSamples) if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { if !ev.enableDelayedNameRemoval && result.ContainsSameLabelset() { ev.errorf("vector cannot contain metrics with the same labelset") } mat := make(Matrix, len(result)) for i, s := range result { if s.H == nil { mat[i] = Series{Metric: s.Metric, Floats: []FPoint{{T: ts, F: s.F}}, DropName: s.DropName} } else { mat[i] = Series{Metric: s.Metric, Histograms: []HPoint{{T: ts, H: s.H}}, DropName: s.DropName} } } ev.currentSamples = originalNumSamples + mat.TotalSamples() ev.samplesStats.UpdatePeak(ev.currentSamples) return mat, warnings } // Add samples in output vector to output series. for _, sample := range result { h := sample.Metric.Hash() ss, ok := seriess[h] if ok { if ss.ts == ts { // If we've seen this output series before at this timestamp, it's a duplicate. ev.errorf("vector cannot contain metrics with the same labelset") } ss.ts = ts } else { ss = seriesAndTimestamp{Series{Metric: sample.Metric, DropName: sample.DropName}, ts} } addToSeries(&ss.Series, enh.Ts, sample.F, sample.H, numSteps) seriess[h] = ss } } // Reuse the original point slices. for _, m := range origMatrixes { for _, s := range m { putFPointSlice(s.Floats) putHPointSlice(s.Histograms) } } // Assemble the output matrix. By the time we get here we know we don't have too many samples. mat := make(Matrix, 0, len(seriess)) for _, ss := range seriess { mat = append(mat, ss.Series) } ev.currentSamples = originalNumSamples + mat.TotalSamples() ev.samplesStats.UpdatePeak(ev.currentSamples) return mat, warnings } func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) { // Keep a copy of the original point slice so that it can be returned to the pool. origMatrix := slices.Clone(inputMatrix) defer func() { for _, s := range origMatrix { putFPointSlice(s.Floats) putHPointSlice(s.Histograms) } }() var warnings annotations.Annotations enh := &EvalNodeHelper{enableDelayedNameRemoval: ev.enableDelayedNameRemoval} tempNumSamples := ev.currentSamples // Create a mapping from input series to output groups. buf := make([]byte, 0, 1024) groupToResultIndex := make(map[uint64]int) seriesToResult := make([]int, len(inputMatrix)) var result Matrix groupCount := 0 for si, series := range inputMatrix { var groupingKey uint64 groupingKey, buf = generateGroupingKey(series.Metric, sortedGrouping, aggExpr.Without, buf) index, ok := groupToResultIndex[groupingKey] // Add a new group if it doesn't exist. if !ok { if aggExpr.Op != parser.TOPK && aggExpr.Op != parser.BOTTOMK && aggExpr.Op != parser.LIMITK && aggExpr.Op != parser.LIMIT_RATIO { m := generateGroupingLabels(enh, series.Metric, aggExpr.Without, sortedGrouping) result = append(result, Series{Metric: m}) } index = groupCount groupToResultIndex[groupingKey] = index groupCount++ } seriesToResult[si] = index } groups := make([]groupedAggregation, groupCount) var k int var ratio float64 var seriess map[uint64]Series switch aggExpr.Op { case parser.TOPK, parser.BOTTOMK, parser.LIMITK: if !convertibleToInt64(param) { ev.errorf("Scalar value %v overflows int64", param) } k = int(param) if k > len(inputMatrix) { k = len(inputMatrix) } if k < 1 { return nil, warnings } seriess = make(map[uint64]Series, len(inputMatrix)) // Output series by series hash. case parser.LIMIT_RATIO: if math.IsNaN(param) { ev.errorf("Ratio value %v is NaN", param) } switch { case param == 0: return nil, warnings case param < -1.0: ratio = -1.0 warnings.Add(annotations.NewInvalidRatioWarning(param, ratio, aggExpr.Param.PositionRange())) case param > 1.0: ratio = 1.0 warnings.Add(annotations.NewInvalidRatioWarning(param, ratio, aggExpr.Param.PositionRange())) default: ratio = param } seriess = make(map[uint64]Series, len(inputMatrix)) // Output series by series hash. case parser.QUANTILE: if math.IsNaN(param) || param < 0 || param > 1 { warnings.Add(annotations.NewInvalidQuantileWarning(param, aggExpr.Param.PositionRange())) } } for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { if err := contextDone(ctx, "expression evaluation"); err != nil { ev.error(err) } // Reset number of samples in memory after each timestamp. ev.currentSamples = tempNumSamples // Make the function call. enh.Ts = ts var ws annotations.Annotations switch aggExpr.Op { case parser.TOPK, parser.BOTTOMK, parser.LIMITK, parser.LIMIT_RATIO: result, ws = ev.aggregationK(aggExpr, k, ratio, inputMatrix, seriesToResult, groups, enh, seriess) // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { warnings.Merge(ws) return result, warnings } default: ws = ev.aggregation(aggExpr, param, inputMatrix, result, seriesToResult, groups, enh) } warnings.Merge(ws) if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } } // Assemble the output matrix. By the time we get here we know we don't have too many samples. switch aggExpr.Op { case parser.TOPK, parser.BOTTOMK, parser.LIMITK, parser.LIMIT_RATIO: result = make(Matrix, 0, len(seriess)) for _, ss := range seriess { result = append(result, ss) } default: // Remove empty result rows. dst := 0 for _, series := range result { if len(series.Floats) > 0 || len(series.Histograms) > 0 { result[dst] = series dst++ } } result = result[:dst] } return result, warnings } // evalVectorSelector generates a Matrix between ev.startTimestamp and ev.endTimestamp (inclusive), each point spaced ev.interval apart, from vs. // vs.Series has to be expanded before calling this method. // For every series iterator in vs.Series, the method iterates in ev.interval sized steps from ev.startTimestamp until and including ev.endTimestamp, // collecting every corresponding sample (obtained via ev.vectorSelectorSingle) into a Series. // All of the generated Series are collected into a Matrix, that gets returned. func (ev *evaluator) evalVectorSelector(ctx context.Context, vs *parser.VectorSelector) Matrix { numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 mat := make(Matrix, 0, len(vs.Series)) var prevSS *Series it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) var chkIter chunkenc.Iterator for _, s := range vs.Series { if err := contextDone(ctx, "expression evaluation"); err != nil { ev.error(err) } chkIter = s.Iterator(chkIter) it.Reset(chkIter) ss := Series{ Metric: s.Labels(), } for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { step++ _, f, h, ok := ev.vectorSelectorSingle(it, vs, ts) if !ok { continue } if h == nil { ev.currentSamples++ ev.samplesStats.IncrementSamplesAtStep(step, 1) if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } if ss.Floats == nil { ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps) } ss.Floats = append(ss.Floats, FPoint{F: f, T: ts}) } else { point := HPoint{H: h, T: ts} histSize := point.size() ev.currentSamples += histSize ev.samplesStats.IncrementSamplesAtStep(step, int64(histSize)) if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } if ss.Histograms == nil { ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps) } ss.Histograms = append(ss.Histograms, point) } } if len(ss.Floats)+len(ss.Histograms) > 0 { mat = append(mat, ss) prevSS = &mat[len(mat)-1] } } ev.samplesStats.UpdatePeak(ev.currentSamples) return mat } // evalSubquery evaluates given SubqueryExpr and returns an equivalent // evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set. func (ev *evaluator) evalSubquery(ctx context.Context, subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) { samplesStats := ev.samplesStats // Avoid double counting samples when running a subquery, those samples will be counted in later stage. ev.samplesStats = ev.samplesStats.NewChild() val, ws := ev.eval(ctx, subq) // But do incorporate the peak from the subquery samplesStats.UpdatePeakFromSubquery(ev.samplesStats) ev.samplesStats = samplesStats mat := val.(Matrix) vs := &parser.VectorSelector{ OriginalOffset: subq.OriginalOffset, Offset: subq.Offset, Series: make([]storage.Series, 0, len(mat)), Timestamp: subq.Timestamp, } if subq.Timestamp != nil { // The offset of subquery is not modified in case of @ modifier. // Hence we take care of that here for the result. vs.Offset = subq.OriginalOffset + time.Duration(ev.startTimestamp-*subq.Timestamp)*time.Millisecond } ms := &parser.MatrixSelector{ Range: subq.Range, VectorSelector: vs, } for _, s := range mat { vs.Series = append(vs.Series, NewStorageSeries(s)) } return ms, mat.TotalSamples(), ws } // eval evaluates the given expression as the given AST expression node requires. func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, annotations.Annotations) { // This is the top-level evaluation method. // Thus, we check for timeout/cancellation here. if err := contextDone(ctx, "expression evaluation"); err != nil { ev.error(err) } numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 // Create a new span to help investigate inner evaluation performances. ctx, span := otel.Tracer("").Start(ctx, stats.InnerEvalTime.SpanOperation()+" eval "+reflect.TypeOf(expr).String()) defer span.End() if ss, ok := expr.(interface{ ShortString() string }); ok { span.SetAttributes(attribute.String("operation", ss.ShortString())) } switch e := expr.(type) { case *parser.AggregateExpr: // Grouping labels must be sorted (expected both by generateGroupingKey() and aggregation()). sortedGrouping := e.Grouping slices.Sort(sortedGrouping) unwrapParenExpr(&e.Param) param := unwrapStepInvariantExpr(e.Param) unwrapParenExpr(¶m) if e.Op == parser.COUNT_VALUES { valueLabel := param.(*parser.StringLiteral) if !model.LabelName(valueLabel.Val).IsValid() { ev.errorf("invalid label name %q", valueLabel) } if !e.Without { sortedGrouping = append(sortedGrouping, valueLabel.Val) slices.Sort(sortedGrouping) } return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.aggregationCountValues(e, sortedGrouping, valueLabel.Val, v[0].(Vector), enh) }, e.Expr) } var warnings annotations.Annotations originalNumSamples := ev.currentSamples // param is the number k for topk/bottomk, or q for quantile. var fParam float64 if param != nil { val, ws := ev.eval(ctx, param) warnings.Merge(ws) fParam = val.(Matrix)[0].Floats[0].F } // Now fetch the data to be aggregated. val, ws := ev.eval(ctx, e.Expr) warnings.Merge(ws) inputMatrix := val.(Matrix) result, ws := ev.rangeEvalAgg(ctx, e, sortedGrouping, inputMatrix, fParam) warnings.Merge(ws) ev.currentSamples = originalNumSamples + result.TotalSamples() ev.samplesStats.UpdatePeak(ev.currentSamples) return result, warnings case *parser.Call: call := FunctionCalls[e.Func.Name] if e.Func.Name == "timestamp" { // Matrix evaluation always returns the evaluation time, // so this function needs special handling when given // a vector selector. unwrapParenExpr(&e.Args[0]) arg := unwrapStepInvariantExpr(e.Args[0]) unwrapParenExpr(&arg) vs, ok := arg.(*parser.VectorSelector) if ok { return ev.rangeEvalTimestampFunctionOverVectorSelector(ctx, vs, call, e) } } // Check if the function has a matrix argument. var ( matrixArgIndex int matrixArg bool warnings annotations.Annotations ) for i := range e.Args { unwrapParenExpr(&e.Args[i]) a := unwrapStepInvariantExpr(e.Args[i]) unwrapParenExpr(&a) if _, ok := a.(*parser.MatrixSelector); ok { matrixArgIndex = i matrixArg = true break } // parser.SubqueryExpr can be used in place of parser.MatrixSelector. if subq, ok := a.(*parser.SubqueryExpr); ok { matrixArgIndex = i matrixArg = true // Replacing parser.SubqueryExpr with parser.MatrixSelector. val, totalSamples, ws := ev.evalSubquery(ctx, subq) e.Args[i] = val warnings.Merge(ws) defer func() { // subquery result takes space in the memory. Get rid of that at the end. val.VectorSelector.(*parser.VectorSelector).Series = nil ev.currentSamples -= totalSamples }() break } } // Special handling for functions that work on series not samples. switch e.Func.Name { case "label_replace": return ev.evalLabelReplace(ctx, e.Args) case "label_join": return ev.evalLabelJoin(ctx, e.Args) } if !matrixArg { // Does not have a matrix argument. return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { vec, annos := call(v, e.Args, enh) return vec, warnings.Merge(annos) }, e.Args...) } inArgs := make([]parser.Value, len(e.Args)) // Evaluate any non-matrix arguments. otherArgs := make([]Matrix, len(e.Args)) otherInArgs := make([]Vector, len(e.Args)) for i, e := range e.Args { if i != matrixArgIndex { val, ws := ev.eval(ctx, e) otherArgs[i] = val.(Matrix) otherInArgs[i] = Vector{Sample{}} inArgs[i] = otherInArgs[i] warnings.Merge(ws) } } unwrapParenExpr(&e.Args[matrixArgIndex]) arg := unwrapStepInvariantExpr(e.Args[matrixArgIndex]) unwrapParenExpr(&arg) sel := arg.(*parser.MatrixSelector) selVS := sel.VectorSelector.(*parser.VectorSelector) ws, err := checkAndExpandSeriesSet(ctx, sel) warnings.Merge(ws) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), warnings}) } mat := make(Matrix, 0, len(selVS.Series)) // Output matrix. offset := durationMilliseconds(selVS.Offset) selRange := durationMilliseconds(sel.Range) stepRange := selRange if stepRange > ev.interval { stepRange = ev.interval } // Reuse objects across steps to save memory allocations. var floats []FPoint var histograms []HPoint var prevSS *Series inMatrix := make(Matrix, 1) inArgs[matrixArgIndex] = inMatrix enh := &EvalNodeHelper{Out: make(Vector, 0, 1), enableDelayedNameRemoval: ev.enableDelayedNameRemoval} // Process all the calls for one time series at a time. it := storage.NewBuffer(selRange) var chkIter chunkenc.Iterator // The last_over_time function acts like offset; thus, it // should keep the metric name. For all the other range // vector functions, the only change needed is to drop the // metric name in the output. dropName := e.Func.Name != "last_over_time" for i, s := range selVS.Series { if err := contextDone(ctx, "expression evaluation"); err != nil { ev.error(err) } ev.currentSamples -= len(floats) + totalHPointSize(histograms) if floats != nil { floats = floats[:0] } if histograms != nil { histograms = histograms[:0] } chkIter = s.Iterator(chkIter) it.Reset(chkIter) metric := selVS.Series[i].Labels() if !ev.enableDelayedNameRemoval && dropName { metric = metric.DropMetricName() } ss := Series{ Metric: metric, DropName: dropName, } inMatrix[0].Metric = selVS.Series[i].Labels() for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { step++ // Set the non-matrix arguments. // They are scalar, so it is safe to use the step number // when looking up the argument, as there will be no gaps. for j := range e.Args { if j != matrixArgIndex { otherInArgs[j][0].F = otherArgs[j][0].Floats[step].F } } // Evaluate the matrix selector for this series // for this step, but only if this is the 1st // iteration or no @ modifier has been used. if ts == ev.startTimestamp || selVS.Timestamp == nil { maxt := ts - offset mint := maxt - selRange floats, histograms = ev.matrixIterSlice(it, mint, maxt, floats, histograms) } if len(floats)+len(histograms) == 0 { continue } inMatrix[0].Floats = floats inMatrix[0].Histograms = histograms enh.Ts = ts // Make the function call. outVec, annos := call(inArgs, e.Args, enh) warnings.Merge(annos) ev.samplesStats.IncrementSamplesAtStep(step, int64(len(floats)+totalHPointSize(histograms))) enh.Out = outVec[:0] if len(outVec) > 0 { if outVec[0].H == nil { if ss.Floats == nil { ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps) } ss.Floats = append(ss.Floats, FPoint{F: outVec[0].F, T: ts}) } else { if ss.Histograms == nil { ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps) } ss.Histograms = append(ss.Histograms, HPoint{H: outVec[0].H, T: ts}) } } // Only buffer stepRange milliseconds from the second step on. it.ReduceDelta(stepRange) } histSamples := totalHPointSize(ss.Histograms) if len(ss.Floats)+histSamples > 0 { if ev.currentSamples+len(ss.Floats)+histSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } mat = append(mat, ss) prevSS = &mat[len(mat)-1] ev.currentSamples += len(ss.Floats) + histSamples } ev.samplesStats.UpdatePeak(ev.currentSamples) if e.Func.Name == "rate" || e.Func.Name == "increase" { metricName := inMatrix[0].Metric.Get(labels.MetricName) if metricName != "" && len(ss.Floats) > 0 && !strings.HasSuffix(metricName, "_total") && !strings.HasSuffix(metricName, "_sum") && !strings.HasSuffix(metricName, "_count") && !strings.HasSuffix(metricName, "_bucket") { warnings.Add(annotations.NewPossibleNonCounterInfo(metricName, e.Args[0].PositionRange())) } } } ev.samplesStats.UpdatePeak(ev.currentSamples) ev.currentSamples -= len(floats) + totalHPointSize(histograms) putFPointSlice(floats) putMatrixSelectorHPointSlice(histograms) // The absent_over_time function returns 0 or 1 series. So far, the matrix // contains multiple series. The following code will create a new series // with values of 1 for the timestamps where no series has value. if e.Func.Name == "absent_over_time" { steps := int(1 + (ev.endTimestamp-ev.startTimestamp)/ev.interval) // Iterate once to look for a complete series. for _, s := range mat { if len(s.Floats)+len(s.Histograms) == steps { return Matrix{}, warnings } } found := map[int64]struct{}{} for i, s := range mat { for _, p := range s.Floats { found[p.T] = struct{}{} } for _, p := range s.Histograms { found[p.T] = struct{}{} } if i > 0 && len(found) == steps { return Matrix{}, warnings } } newp := make([]FPoint, 0, steps-len(found)) for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { if _, ok := found[ts]; !ok { newp = append(newp, FPoint{T: ts, F: 1}) } } return Matrix{ Series{ Metric: createLabelsForAbsentFunction(e.Args[0]), Floats: newp, DropName: dropName, }, }, warnings } if !ev.enableDelayedNameRemoval && mat.ContainsSameLabelset() { ev.errorf("vector cannot contain metrics with the same labelset") } return mat, warnings case *parser.ParenExpr: return ev.eval(ctx, e.Expr) case *parser.UnaryExpr: val, ws := ev.eval(ctx, e.Expr) mat := val.(Matrix) if e.Op == parser.SUB { for i := range mat { if !ev.enableDelayedNameRemoval { mat[i].Metric = mat[i].Metric.DropMetricName() } mat[i].DropName = true for j := range mat[i].Floats { mat[i].Floats[j].F = -mat[i].Floats[j].F } for j := range mat[i].Histograms { mat[i].Histograms[j].H = mat[i].Histograms[j].H.Copy().Mul(-1) } } if !ev.enableDelayedNameRemoval && mat.ContainsSameLabelset() { ev.errorf("vector cannot contain metrics with the same labelset") } } return mat, ws case *parser.BinaryExpr: switch lt, rt := e.LHS.Type(), e.RHS.Type(); { case lt == parser.ValueTypeScalar && rt == parser.ValueTypeScalar: return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { val := scalarBinop(e.Op, v[0].(Vector)[0].F, v[1].(Vector)[0].F) return append(enh.Out, Sample{F: val}), nil }, e.LHS, e.RHS) case lt == parser.ValueTypeVector && rt == parser.ValueTypeVector: // Function to compute the join signature for each series. buf := make([]byte, 0, 1024) sigf := signatureFunc(e.VectorMatching.On, buf, e.VectorMatching.MatchingLabels...) initSignatures := func(series labels.Labels, h *EvalSeriesHelper) { h.signature = sigf(series) } switch e.Op { case parser.LAND: return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil }, e.LHS, e.RHS) case parser.LOR: return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil }, e.LHS, e.RHS) case parser.LUNLESS: return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil }, e.LHS, e.RHS) default: return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { vec, err := ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, sh[0], sh[1], enh, e.PositionRange()) return vec, handleVectorBinopError(err, e) }, e.LHS, e.RHS) } case lt == parser.ValueTypeVector && rt == parser.ValueTypeScalar: return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { vec, err := ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].F}, false, e.ReturnBool, enh, e.PositionRange()) return vec, handleVectorBinopError(err, e) }, e.LHS, e.RHS) case lt == parser.ValueTypeScalar && rt == parser.ValueTypeVector: return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { vec, err := ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].F}, true, e.ReturnBool, enh, e.PositionRange()) return vec, handleVectorBinopError(err, e) }, e.LHS, e.RHS) } case *parser.NumberLiteral: span.SetAttributes(attribute.Float64("value", e.Val)) return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { return append(enh.Out, Sample{F: e.Val, Metric: labels.EmptyLabels()}), nil }) case *parser.StringLiteral: span.SetAttributes(attribute.String("value", e.Val)) return String{V: e.Val, T: ev.startTimestamp}, nil case *parser.VectorSelector: ws, err := checkAndExpandSeriesSet(ctx, e) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } mat := ev.evalVectorSelector(ctx, e) return mat, ws case *parser.MatrixSelector: if ev.startTimestamp != ev.endTimestamp { panic(errors.New("cannot do range evaluation of matrix selector")) } return ev.matrixSelector(ctx, e) case *parser.SubqueryExpr: offsetMillis := durationMilliseconds(e.Offset) rangeMillis := durationMilliseconds(e.Range) newEv := &evaluator{ endTimestamp: ev.endTimestamp - offsetMillis, currentSamples: ev.currentSamples, maxSamples: ev.maxSamples, logger: ev.logger, lookbackDelta: ev.lookbackDelta, samplesStats: ev.samplesStats.NewChild(), noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn, enableDelayedNameRemoval: ev.enableDelayedNameRemoval, } if e.Step != 0 { newEv.interval = durationMilliseconds(e.Step) } else { newEv.interval = ev.noStepSubqueryIntervalFn(rangeMillis) } // Start with the first timestamp after (ev.startTimestamp - offset - range) // that is aligned with the step (multiple of 'newEv.interval'). newEv.startTimestamp = newEv.interval * ((ev.startTimestamp - offsetMillis - rangeMillis) / newEv.interval) if newEv.startTimestamp < (ev.startTimestamp - offsetMillis - rangeMillis) { newEv.startTimestamp += newEv.interval } if newEv.startTimestamp != ev.startTimestamp { // Adjust the offset of selectors based on the new // start time of the evaluator since the calculation // of the offset with @ happens w.r.t. the start time. setOffsetForAtModifier(newEv.startTimestamp, e.Expr) } res, ws := newEv.eval(ctx, e.Expr) ev.currentSamples = newEv.currentSamples ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats) ev.samplesStats.IncrementSamplesAtTimestamp(ev.endTimestamp, newEv.samplesStats.TotalSamples) return res, ws case *parser.StepInvariantExpr: switch ce := e.Expr.(type) { case *parser.StringLiteral, *parser.NumberLiteral: return ev.eval(ctx, ce) } newEv := &evaluator{ startTimestamp: ev.startTimestamp, endTimestamp: ev.startTimestamp, // Always a single evaluation. interval: ev.interval, currentSamples: ev.currentSamples, maxSamples: ev.maxSamples, logger: ev.logger, lookbackDelta: ev.lookbackDelta, samplesStats: ev.samplesStats.NewChild(), noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn, enableDelayedNameRemoval: ev.enableDelayedNameRemoval, } res, ws := newEv.eval(ctx, e.Expr) ev.currentSamples = newEv.currentSamples ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats) for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { step++ ev.samplesStats.IncrementSamplesAtStep(step, newEv.samplesStats.TotalSamples) } switch e.Expr.(type) { case *parser.MatrixSelector, *parser.SubqueryExpr: // We do not duplicate results for range selectors since result is a matrix // with their unique timestamps which does not depend on the step. return res, ws } // For every evaluation while the value remains same, the timestamp for that // value would change for different eval times. Hence we duplicate the result // with changed timestamps. mat, ok := res.(Matrix) if !ok { panic(fmt.Errorf("unexpected result in StepInvariantExpr evaluation: %T", expr)) } for i := range mat { if len(mat[i].Floats)+len(mat[i].Histograms) != 1 { panic(fmt.Errorf("unexpected number of samples")) } for ts := ev.startTimestamp + ev.interval; ts <= ev.endTimestamp; ts += ev.interval { if len(mat[i].Floats) > 0 { mat[i].Floats = append(mat[i].Floats, FPoint{ T: ts, F: mat[i].Floats[0].F, }) ev.currentSamples++ } else { point := HPoint{ T: ts, H: mat[i].Histograms[0].H, } mat[i].Histograms = append(mat[i].Histograms, point) ev.currentSamples += point.size() } if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } } } ev.samplesStats.UpdatePeak(ev.currentSamples) return res, ws } panic(fmt.Errorf("unhandled expression of type: %T", expr)) } // reuseOrGetHPointSlices reuses the space from previous slice to create new slice if the former has lots of room. // The previous slices capacity is adjusted so when it is re-used from the pool it doesn't overflow into the new one. func reuseOrGetHPointSlices(prevSS *Series, numSteps int) (r []HPoint) { if prevSS != nil && cap(prevSS.Histograms)-2*len(prevSS.Histograms) > 0 { r = prevSS.Histograms[len(prevSS.Histograms):] prevSS.Histograms = prevSS.Histograms[0:len(prevSS.Histograms):len(prevSS.Histograms)] return } return getHPointSlice(numSteps) } // reuseOrGetFPointSlices reuses the space from previous slice to create new slice if the former has lots of room. // The previous slices capacity is adjusted so when it is re-used from the pool it doesn't overflow into the new one. func reuseOrGetFPointSlices(prevSS *Series, numSteps int) (r []FPoint) { if prevSS != nil && cap(prevSS.Floats)-2*len(prevSS.Floats) > 0 { r = prevSS.Floats[len(prevSS.Floats):] prevSS.Floats = prevSS.Floats[0:len(prevSS.Floats):len(prevSS.Floats)] return } return getFPointSlice(numSteps) } func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(ctx context.Context, vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) { ws, err := checkAndExpandSeriesSet(ctx, vs) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } seriesIterators := make([]*storage.MemoizedSeriesIterator, len(vs.Series)) for i, s := range vs.Series { it := s.Iterator(nil) seriesIterators[i] = storage.NewMemoizedIterator(it, durationMilliseconds(ev.lookbackDelta)-1) } return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { if vs.Timestamp != nil { // This is a special case for "timestamp()" when the @ modifier is used, to ensure that // we return a point for each time step in this case. // See https://github.com/prometheus/prometheus/issues/8433. vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond } vec := make(Vector, 0, len(vs.Series)) for i, s := range vs.Series { it := seriesIterators[i] t, _, _, ok := ev.vectorSelectorSingle(it, vs, enh.Ts) if !ok { continue } // Note that we ignore the sample values because call only cares about the timestamp. vec = append(vec, Sample{ Metric: s.Labels(), T: t, }) ev.currentSamples++ ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, 1) if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } } ev.samplesStats.UpdatePeak(ev.currentSamples) vec, annos := call([]parser.Value{vec}, e.Args, enh) return vec, ws.Merge(annos) }) } // vectorSelectorSingle evaluates an instant vector for the iterator of one time series. func (ev *evaluator) vectorSelectorSingle(it *storage.MemoizedSeriesIterator, node *parser.VectorSelector, ts int64) ( int64, float64, *histogram.FloatHistogram, bool, ) { refTime := ts - durationMilliseconds(node.Offset) var t int64 var v float64 var h *histogram.FloatHistogram valueType := it.Seek(refTime) switch valueType { case chunkenc.ValNone: if it.Err() != nil { ev.error(it.Err()) } case chunkenc.ValFloat: t, v = it.At() case chunkenc.ValFloatHistogram: t, h = it.AtFloatHistogram() default: panic(fmt.Errorf("unknown value type %v", valueType)) } if valueType == chunkenc.ValNone || t > refTime { var ok bool t, v, h, ok = it.PeekPrev() if !ok || t <= refTime-durationMilliseconds(ev.lookbackDelta) { return 0, 0, nil, false } } if value.IsStaleNaN(v) || (h != nil && value.IsStaleNaN(h.Sum)) { return 0, 0, nil, false } return t, v, h, true } var ( fPointPool zeropool.Pool[[]FPoint] hPointPool zeropool.Pool[[]HPoint] // matrixSelectorHPool holds reusable histogram slices used by the matrix // selector. The key difference between this pool and the hPointPool is that // slices returned by this pool should never hold multiple copies of the same // histogram pointer since histogram objects are reused across query evaluation // steps. matrixSelectorHPool zeropool.Pool[[]HPoint] ) func getFPointSlice(sz int) []FPoint { if p := fPointPool.Get(); p != nil { return p } if sz > maxPointsSliceSize { sz = maxPointsSliceSize } return make([]FPoint, 0, sz) } // putFPointSlice will return a FPoint slice of size max(maxPointsSliceSize, sz). // This function is called with an estimated size which often can be over-estimated. func putFPointSlice(p []FPoint) { if p != nil { fPointPool.Put(p[:0]) } } // getHPointSlice will return a HPoint slice of size max(maxPointsSliceSize, sz). // This function is called with an estimated size which often can be over-estimated. func getHPointSlice(sz int) []HPoint { if p := hPointPool.Get(); p != nil { return p } if sz > maxPointsSliceSize { sz = maxPointsSliceSize } return make([]HPoint, 0, sz) } func putHPointSlice(p []HPoint) { if p != nil { hPointPool.Put(p[:0]) } } func getMatrixSelectorHPoints() []HPoint { if p := matrixSelectorHPool.Get(); p != nil { return p } return make([]HPoint, 0, matrixSelectorSliceSize) } func putMatrixSelectorHPointSlice(p []HPoint) { if p != nil { matrixSelectorHPool.Put(p[:0]) } } // matrixSelector evaluates a *parser.MatrixSelector expression. func (ev *evaluator) matrixSelector(ctx context.Context, node *parser.MatrixSelector) (Matrix, annotations.Annotations) { var ( vs = node.VectorSelector.(*parser.VectorSelector) offset = durationMilliseconds(vs.Offset) maxt = ev.startTimestamp - offset mint = maxt - durationMilliseconds(node.Range) matrix = make(Matrix, 0, len(vs.Series)) it = storage.NewBuffer(durationMilliseconds(node.Range)) ) ws, err := checkAndExpandSeriesSet(ctx, node) if err != nil { ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) } var chkIter chunkenc.Iterator series := vs.Series for i, s := range series { if err := contextDone(ctx, "expression evaluation"); err != nil { ev.error(err) } chkIter = s.Iterator(chkIter) it.Reset(chkIter) ss := Series{ Metric: series[i].Labels(), } ss.Floats, ss.Histograms = ev.matrixIterSlice(it, mint, maxt, nil, nil) totalSize := int64(len(ss.Floats)) + int64(totalHPointSize(ss.Histograms)) ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, totalSize) if totalSize > 0 { matrix = append(matrix, ss) } else { putFPointSlice(ss.Floats) putHPointSlice(ss.Histograms) } } return matrix, ws } // matrixIterSlice populates a matrix vector covering the requested range for a // single time series, with points retrieved from an iterator. // // As an optimization, the matrix vector may already contain points of the same // time series from the evaluation of an earlier step (with lower mint and maxt // values). Any such points falling before mint are discarded; points that fall // into the [mint, maxt] range are retained; only points with later timestamps // are populated from the iterator. func (ev *evaluator) matrixIterSlice( it *storage.BufferedSeriesIterator, mint, maxt int64, floats []FPoint, histograms []HPoint, ) ([]FPoint, []HPoint) { mintFloats, mintHistograms := mint, mint // First floats... if len(floats) > 0 && floats[len(floats)-1].T > mint { // There is an overlap between previous and current ranges, retain common // points. In most such cases: // (a) the overlap is significantly larger than the eval step; and/or // (b) the number of samples is relatively small. // so a linear search will be as fast as a binary search. var drop int for drop = 0; floats[drop].T <= mint; drop++ { } ev.currentSamples -= drop copy(floats, floats[drop:]) floats = floats[:len(floats)-drop] // Only append points with timestamps after the last timestamp we have. mintFloats = floats[len(floats)-1].T } else { ev.currentSamples -= len(floats) if floats != nil { floats = floats[:0] } } // ...then the same for histograms. TODO(beorn7): Use generics? if len(histograms) > 0 && histograms[len(histograms)-1].T > mint { // There is an overlap between previous and current ranges, retain common // points. In most such cases: // (a) the overlap is significantly larger than the eval step; and/or // (b) the number of samples is relatively small. // so a linear search will be as fast as a binary search. var drop int for drop = 0; histograms[drop].T <= mint; drop++ { } // Rotate the buffer around the drop index so that points before mint can be // reused to store new histograms. tail := make([]HPoint, drop) copy(tail, histograms[:drop]) copy(histograms, histograms[drop:]) copy(histograms[len(histograms)-drop:], tail) histograms = histograms[:len(histograms)-drop] ev.currentSamples -= totalHPointSize(histograms) // Only append points with timestamps after the last timestamp we have. mintHistograms = histograms[len(histograms)-1].T } else { ev.currentSamples -= totalHPointSize(histograms) if histograms != nil { histograms = histograms[:0] } } soughtValueType := it.Seek(maxt) if soughtValueType == chunkenc.ValNone { if it.Err() != nil { ev.error(it.Err()) } } buf := it.Buffer() loop: for { switch buf.Next() { case chunkenc.ValNone: break loop case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: t := buf.AtT() // Values in the buffer are guaranteed to be smaller than maxt. if t > mintHistograms { if histograms == nil { histograms = getMatrixSelectorHPoints() } n := len(histograms) if n < cap(histograms) { histograms = histograms[:n+1] } else { histograms = append(histograms, HPoint{H: &histogram.FloatHistogram{}}) } histograms[n].T, histograms[n].H = buf.AtFloatHistogram(histograms[n].H) if value.IsStaleNaN(histograms[n].H.Sum) { histograms = histograms[:n] continue loop } ev.currentSamples += histograms[n].size() if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } } case chunkenc.ValFloat: t, f := buf.At() if value.IsStaleNaN(f) { continue loop } // Values in the buffer are guaranteed to be smaller than maxt. if t > mintFloats { ev.currentSamples++ if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } if floats == nil { floats = getFPointSlice(16) } floats = append(floats, FPoint{T: t, F: f}) } } } // The sought sample might also be in the range. switch soughtValueType { case chunkenc.ValFloatHistogram, chunkenc.ValHistogram: if it.AtT() != maxt { break } if histograms == nil { histograms = getMatrixSelectorHPoints() } n := len(histograms) if n < cap(histograms) { histograms = histograms[:n+1] } else { histograms = append(histograms, HPoint{H: &histogram.FloatHistogram{}}) } if histograms[n].H == nil { // Make sure to pass non-nil H to AtFloatHistogram so that it does a deep-copy. // Not an issue in the loop above since that uses an intermediate buffer. histograms[n].H = &histogram.FloatHistogram{} } histograms[n].T, histograms[n].H = it.AtFloatHistogram(histograms[n].H) if value.IsStaleNaN(histograms[n].H.Sum) { histograms = histograms[:n] break } ev.currentSamples += histograms[n].size() if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } case chunkenc.ValFloat: t, f := it.At() if t == maxt && !value.IsStaleNaN(f) { ev.currentSamples++ if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } if floats == nil { floats = getFPointSlice(16) } floats = append(floats, FPoint{T: t, F: f}) } } ev.samplesStats.UpdatePeak(ev.currentSamples) return floats, histograms } func (ev *evaluator) VectorAnd(lhs, rhs Vector, matching *parser.VectorMatching, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector { if matching.Card != parser.CardManyToMany { panic("set operations must only use many-to-many matching") } if len(lhs) == 0 || len(rhs) == 0 { return nil // Short-circuit: AND with nothing is nothing. } // The set of signatures for the right-hand side Vector. rightSigs := map[string]struct{}{} // Add all rhs samples to a map so we can easily find matches later. for _, sh := range rhsh { rightSigs[sh.signature] = struct{}{} } for i, ls := range lhs { // If there's a matching entry in the right-hand side Vector, add the sample. if _, ok := rightSigs[lhsh[i].signature]; ok { enh.Out = append(enh.Out, ls) } } return enh.Out } func (ev *evaluator) VectorOr(lhs, rhs Vector, matching *parser.VectorMatching, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector { switch { case matching.Card != parser.CardManyToMany: panic("set operations must only use many-to-many matching") case len(lhs) == 0: // Short-circuit. enh.Out = append(enh.Out, rhs...) return enh.Out case len(rhs) == 0: enh.Out = append(enh.Out, lhs...) return enh.Out } leftSigs := map[string]struct{}{} // Add everything from the left-hand-side Vector. for i, ls := range lhs { leftSigs[lhsh[i].signature] = struct{}{} enh.Out = append(enh.Out, ls) } // Add all right-hand side elements which have not been added from the left-hand side. for j, rs := range rhs { if _, ok := leftSigs[rhsh[j].signature]; !ok { enh.Out = append(enh.Out, rs) } } return enh.Out } func (ev *evaluator) VectorUnless(lhs, rhs Vector, matching *parser.VectorMatching, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper) Vector { if matching.Card != parser.CardManyToMany { panic("set operations must only use many-to-many matching") } // Short-circuit: empty rhs means we will return everything in lhs; // empty lhs means we will return empty - don't need to build a map. if len(lhs) == 0 || len(rhs) == 0 { enh.Out = append(enh.Out, lhs...) return enh.Out } rightSigs := map[string]struct{}{} for _, sh := range rhsh { rightSigs[sh.signature] = struct{}{} } for i, ls := range lhs { if _, ok := rightSigs[lhsh[i].signature]; !ok { enh.Out = append(enh.Out, ls) } } return enh.Out } // VectorBinop evaluates a binary operation between two Vectors, excluding set operators. func (ev *evaluator) VectorBinop(op parser.ItemType, lhs, rhs Vector, matching *parser.VectorMatching, returnBool bool, lhsh, rhsh []EvalSeriesHelper, enh *EvalNodeHelper, pos posrange.PositionRange) (Vector, error) { if matching.Card == parser.CardManyToMany { panic("many-to-many only allowed for set operators") } if len(lhs) == 0 || len(rhs) == 0 { return nil, nil // Short-circuit: nothing is going to match. } // The control flow below handles one-to-one or many-to-one matching. // For one-to-many, swap sidedness and account for the swap when calculating // values. if matching.Card == parser.CardOneToMany { lhs, rhs = rhs, lhs lhsh, rhsh = rhsh, lhsh } // All samples from the rhs hashed by the matching label/values. if enh.rightSigs == nil { enh.rightSigs = make(map[string]Sample, len(enh.Out)) } else { for k := range enh.rightSigs { delete(enh.rightSigs, k) } } rightSigs := enh.rightSigs // Add all rhs samples to a map so we can easily find matches later. for i, rs := range rhs { sig := rhsh[i].signature // The rhs is guaranteed to be the 'one' side. Having multiple samples // with the same signature means that the matching is many-to-many. if duplSample, found := rightSigs[sig]; found { // oneSide represents which side of the vector represents the 'one' in the many-to-one relationship. oneSide := "right" if matching.Card == parser.CardOneToMany { oneSide = "left" } matchedLabels := rs.Metric.MatchLabels(matching.On, matching.MatchingLabels...) // Many-to-many matching not allowed. ev.errorf("found duplicate series for the match group %s on the %s hand-side of the operation: [%s, %s]"+ ";many-to-many matching not allowed: matching labels must be unique on one side", matchedLabels.String(), oneSide, rs.Metric.String(), duplSample.Metric.String()) } rightSigs[sig] = rs } // Tracks the match-signature. For one-to-one operations the value is nil. For many-to-one // the value is a set of signatures to detect duplicated result elements. if enh.matchedSigs == nil { enh.matchedSigs = make(map[string]map[uint64]struct{}, len(rightSigs)) } else { for k := range enh.matchedSigs { delete(enh.matchedSigs, k) } } matchedSigs := enh.matchedSigs // For all lhs samples find a respective rhs sample and perform // the binary operation. var lastErr error for i, ls := range lhs { sig := lhsh[i].signature rs, found := rightSigs[sig] // Look for a match in the rhs Vector. if !found { continue } // Account for potentially swapped sidedness. fl, fr := ls.F, rs.F hl, hr := ls.H, rs.H if matching.Card == parser.CardOneToMany { fl, fr = fr, fl hl, hr = hr, hl } floatValue, histogramValue, keep, err := vectorElemBinop(op, fl, fr, hl, hr, pos) if err != nil { lastErr = err } switch { case returnBool: if keep { floatValue = 1.0 } else { floatValue = 0.0 } case !keep: continue } metric := resultMetric(ls.Metric, rs.Metric, op, matching, enh) if !ev.enableDelayedNameRemoval && returnBool { metric = metric.DropMetricName() } insertedSigs, exists := matchedSigs[sig] if matching.Card == parser.CardOneToOne { if exists { ev.errorf("multiple matches for labels: many-to-one matching must be explicit (group_left/group_right)") } matchedSigs[sig] = nil // Set existence to true. } else { // In many-to-one matching the grouping labels have to ensure a unique metric // for the result Vector. Check whether those labels have already been added for // the same matching labels. insertSig := metric.Hash() if !exists { insertedSigs = map[uint64]struct{}{} matchedSigs[sig] = insertedSigs } else if _, duplicate := insertedSigs[insertSig]; duplicate { ev.errorf("multiple matches for labels: grouping labels must ensure unique matches") } insertedSigs[insertSig] = struct{}{} } enh.Out = append(enh.Out, Sample{ Metric: metric, F: floatValue, H: histogramValue, DropName: returnBool, }) } return enh.Out, lastErr } func signatureFunc(on bool, b []byte, names ...string) func(labels.Labels) string { if on { slices.Sort(names) return func(lset labels.Labels) string { return string(lset.BytesWithLabels(b, names...)) } } names = append([]string{labels.MetricName}, names...) slices.Sort(names) return func(lset labels.Labels) string { return string(lset.BytesWithoutLabels(b, names...)) } } // resultMetric returns the metric for the given sample(s) based on the Vector // binary operation and the matching options. func resultMetric(lhs, rhs labels.Labels, op parser.ItemType, matching *parser.VectorMatching, enh *EvalNodeHelper) labels.Labels { if enh.resultMetric == nil { enh.resultMetric = make(map[string]labels.Labels, len(enh.Out)) } enh.resetBuilder(lhs) buf := bytes.NewBuffer(enh.lblResultBuf[:0]) enh.lblBuf = lhs.Bytes(enh.lblBuf) buf.Write(enh.lblBuf) enh.lblBuf = rhs.Bytes(enh.lblBuf) buf.Write(enh.lblBuf) enh.lblResultBuf = buf.Bytes() if ret, ok := enh.resultMetric[string(enh.lblResultBuf)]; ok { return ret } str := string(enh.lblResultBuf) if shouldDropMetricName(op) { enh.lb.Del(labels.MetricName) } if matching.Card == parser.CardOneToOne { if matching.On { enh.lb.Keep(matching.MatchingLabels...) } else { enh.lb.Del(matching.MatchingLabels...) } } for _, ln := range matching.Include { // Included labels from the `group_x` modifier are taken from the "one"-side. if v := rhs.Get(ln); v != "" { enh.lb.Set(ln, v) } else { enh.lb.Del(ln) } } ret := enh.lb.Labels() enh.resultMetric[str] = ret return ret } // VectorscalarBinop evaluates a binary operation between a Vector and a Scalar. func (ev *evaluator) VectorscalarBinop(op parser.ItemType, lhs Vector, rhs Scalar, swap, returnBool bool, enh *EvalNodeHelper, pos posrange.PositionRange) (Vector, error) { var lastErr error for _, lhsSample := range lhs { lf, rf := lhsSample.F, rhs.V var rh *histogram.FloatHistogram lh := lhsSample.H // lhs always contains the Vector. If the original position was different // swap for calculating the value. if swap { lf, rf = rf, lf lh, rh = rh, lh } float, histogram, keep, err := vectorElemBinop(op, lf, rf, lh, rh, pos) if err != nil { lastErr = err } // Catch cases where the scalar is the LHS in a scalar-vector comparison operation. // We want to always keep the vector element value as the output value, even if it's on the RHS. if op.IsComparisonOperator() && swap { float = rf histogram = rh } if returnBool { if keep { float = 1.0 } else { float = 0.0 } keep = true } if keep { lhsSample.F = float lhsSample.H = histogram if shouldDropMetricName(op) || returnBool { if !ev.enableDelayedNameRemoval { lhsSample.Metric = lhsSample.Metric.DropMetricName() } lhsSample.DropName = true } enh.Out = append(enh.Out, lhsSample) } } return enh.Out, lastErr } // scalarBinop evaluates a binary operation between two Scalars. func scalarBinop(op parser.ItemType, lhs, rhs float64) float64 { switch op { case parser.ADD: return lhs + rhs case parser.SUB: return lhs - rhs case parser.MUL: return lhs * rhs case parser.DIV: return lhs / rhs case parser.POW: return math.Pow(lhs, rhs) case parser.MOD: return math.Mod(lhs, rhs) case parser.EQLC: return btos(lhs == rhs) case parser.NEQ: return btos(lhs != rhs) case parser.GTR: return btos(lhs > rhs) case parser.LSS: return btos(lhs < rhs) case parser.GTE: return btos(lhs >= rhs) case parser.LTE: return btos(lhs <= rhs) case parser.ATAN2: return math.Atan2(lhs, rhs) } panic(fmt.Errorf("operator %q not allowed for Scalar operations", op)) } // vectorElemBinop evaluates a binary operation between two Vector elements. func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram.FloatHistogram, pos posrange.PositionRange) (float64, *histogram.FloatHistogram, bool, error) { switch op { case parser.ADD: if hlhs != nil && hrhs != nil { res, err := hlhs.Copy().Add(hrhs) if err != nil { return 0, nil, false, err } return 0, res.Compact(0), true, nil } if hlhs == nil && hrhs == nil { return lhs + rhs, nil, true, nil } if hlhs != nil { return 0, nil, false, annotations.NewIncompatibleTypesInBinOpInfo("histogram", "+", "float", pos) } return 0, nil, false, annotations.NewIncompatibleTypesInBinOpInfo("float", "+", "histogram", pos) case parser.SUB: if hlhs != nil && hrhs != nil { res, err := hlhs.Copy().Sub(hrhs) if err != nil { return 0, nil, false, err } return 0, res.Compact(0), true, nil } if hlhs == nil && hrhs == nil { return lhs - rhs, nil, true, nil } if hlhs != nil { return 0, nil, false, annotations.NewIncompatibleTypesInBinOpInfo("histogram", "-", "float", pos) } return 0, nil, false, annotations.NewIncompatibleTypesInBinOpInfo("float", "-", "histogram", pos) case parser.MUL: if hlhs != nil && hrhs == nil { return 0, hlhs.Copy().Mul(rhs), true, nil } if hlhs == nil && hrhs != nil { return 0, hrhs.Copy().Mul(lhs), true, nil } if hlhs != nil && hrhs != nil { return 0, nil, false, annotations.NewIncompatibleTypesInBinOpInfo("histogram", "*", "histogram", pos) } return lhs * rhs, nil, true, nil case parser.DIV: if hlhs != nil && hrhs == nil { return 0, hlhs.Copy().Div(rhs), true, nil } if hrhs != nil { if hlhs != nil { return 0, nil, false, annotations.NewIncompatibleTypesInBinOpInfo("histogram", "/", "histogram", pos) } return 0, nil, false, annotations.NewIncompatibleTypesInBinOpInfo("float", "/", "histogram", pos) } return lhs / rhs, nil, true, nil case parser.POW: return math.Pow(lhs, rhs), nil, true, nil case parser.MOD: return math.Mod(lhs, rhs), nil, true, nil case parser.EQLC: return lhs, nil, lhs == rhs, nil case parser.NEQ: return lhs, nil, lhs != rhs, nil case parser.GTR: return lhs, nil, lhs > rhs, nil case parser.LSS: return lhs, nil, lhs < rhs, nil case parser.GTE: return lhs, nil, lhs >= rhs, nil case parser.LTE: return lhs, nil, lhs <= rhs, nil case parser.ATAN2: return math.Atan2(lhs, rhs), nil, true, nil } panic(fmt.Errorf("operator %q not allowed for operations between Vectors", op)) } type groupedAggregation struct { floatValue float64 histogramValue *histogram.FloatHistogram floatMean float64 floatKahanC float64 // "Compensating value" for Kahan summation. groupCount float64 heap vectorByValueHeap // All bools together for better packing within the struct. seen bool // Was this output groups seen in the input at this timestamp. hasFloat bool // Has at least 1 float64 sample aggregated. hasHistogram bool // Has at least 1 histogram sample aggregated. incompatibleHistograms bool // If true, group has seen mixed exponential and custom buckets, or incompatible custom buckets. groupAggrComplete bool // Used by LIMITK to short-cut series loop when we've reached K elem on every group. incrementalMean bool // True after reverting to incremental calculation of the mean value. } // aggregation evaluates sum, avg, count, stdvar, stddev or quantile at one timestep on inputMatrix. // These functions produce one output series for each group specified in the expression, with just the labels from `by(...)`. // outputMatrix should be already populated with grouping labels; groups is one-to-one with outputMatrix. // seriesToResult maps inputMatrix indexes to outputMatrix indexes. func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix, outputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper) annotations.Annotations { op := e.Op var annos annotations.Annotations for i := range groups { groups[i].seen = false } for si := range inputMatrix { f, h, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) if !ok { continue } group := &groups[seriesToResult[si]] // Initialize this group if it's the first time we've seen it. if !group.seen { *group = groupedAggregation{ seen: true, floatValue: f, floatMean: f, incompatibleHistograms: false, groupCount: 1, } switch op { case parser.AVG, parser.SUM: if h == nil { group.hasFloat = true } else { group.histogramValue = h.Copy() group.hasHistogram = true } case parser.STDVAR, parser.STDDEV: group.floatValue = 0 case parser.QUANTILE: group.heap = make(vectorByValueHeap, 1) group.heap[0] = Sample{F: f} case parser.GROUP: group.floatValue = 1 } continue } if group.incompatibleHistograms { continue } switch op { case parser.SUM: if h != nil { group.hasHistogram = true if group.histogramValue != nil { _, err := group.histogramValue.Add(h) if err != nil { handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos) group.incompatibleHistograms = true } } // Otherwise the aggregation contained floats // previously and will be invalid anyway. No // point in copying the histogram in that case. } else { group.hasFloat = true group.floatValue, group.floatKahanC = kahanSumInc(f, group.floatValue, group.floatKahanC) } case parser.AVG: group.groupCount++ if h != nil { group.hasHistogram = true if group.histogramValue != nil { left := h.Copy().Div(group.groupCount) right := group.histogramValue.Copy().Div(group.groupCount) toAdd, err := left.Sub(right) if err != nil { handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos) group.incompatibleHistograms = true continue } _, err = group.histogramValue.Add(toAdd) if err != nil { handleAggregationError(err, e, inputMatrix[si].Metric.Get(model.MetricNameLabel), &annos) group.incompatibleHistograms = true continue } } // Otherwise the aggregation contained floats // previously and will be invalid anyway. No // point in copying the histogram in that case. } else { group.hasFloat = true if !group.incrementalMean { newV, newC := kahanSumInc(f, group.floatValue, group.floatKahanC) if !math.IsInf(newV, 0) { // The sum doesn't overflow, so we propagate it to the // group struct and continue with the regular // calculation of the mean value. group.floatValue, group.floatKahanC = newV, newC break } // If we are here, we know that the sum _would_ overflow. So // instead of continue to sum up, we revert to incremental // calculation of the mean value from here on. group.incrementalMean = true group.floatMean = group.floatValue / (group.groupCount - 1) group.floatKahanC /= group.groupCount - 1 } if math.IsInf(group.floatMean, 0) { if math.IsInf(f, 0) && (group.floatMean > 0) == (f > 0) { // The `floatMean` and `s.F` values are `Inf` of the same sign. They // can't be subtracted, but the value of `floatMean` is correct // already. break } if !math.IsInf(f, 0) && !math.IsNaN(f) { // At this stage, the mean is an infinite. If the added // value is neither an Inf or a Nan, we can keep that mean // value. // This is required because our calculation below removes // the mean value, which would look like Inf += x - Inf and // end up as a NaN. break } } currentMean := group.floatMean + group.floatKahanC group.floatMean, group.floatKahanC = kahanSumInc( // Divide each side of the `-` by `group.groupCount` to avoid float64 overflows. f/group.groupCount-currentMean/group.groupCount, group.floatMean, group.floatKahanC, ) } case parser.GROUP: // Do nothing. Required to avoid the panic in `default:` below. case parser.MAX: if group.floatValue < f || math.IsNaN(group.floatValue) { group.floatValue = f } case parser.MIN: if group.floatValue > f || math.IsNaN(group.floatValue) { group.floatValue = f } case parser.COUNT: group.groupCount++ case parser.STDVAR, parser.STDDEV: if h == nil { // Ignore native histograms. group.groupCount++ delta := f - group.floatMean group.floatMean += delta / group.groupCount group.floatValue += delta * (f - group.floatMean) } case parser.QUANTILE: group.heap = append(group.heap, Sample{F: f}) default: panic(fmt.Errorf("expected aggregation operator but got %q", op)) } } // Construct the output matrix from the aggregated groups. numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 for ri, aggr := range groups { if !aggr.seen { continue } switch op { case parser.AVG: if aggr.hasFloat && aggr.hasHistogram { // We cannot aggregate histogram sample with a float64 sample. annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange())) continue } switch { case aggr.incompatibleHistograms: continue case aggr.hasHistogram: aggr.histogramValue = aggr.histogramValue.Compact(0) case aggr.incrementalMean: aggr.floatValue = aggr.floatMean + aggr.floatKahanC default: aggr.floatValue = (aggr.floatValue + aggr.floatKahanC) / aggr.groupCount } case parser.COUNT: aggr.floatValue = aggr.groupCount case parser.STDVAR: aggr.floatValue /= aggr.groupCount case parser.STDDEV: aggr.floatValue = math.Sqrt(aggr.floatValue / aggr.groupCount) case parser.QUANTILE: aggr.floatValue = quantile(q, aggr.heap) case parser.SUM: if aggr.hasFloat && aggr.hasHistogram { // We cannot aggregate histogram sample with a float64 sample. annos.Add(annotations.NewMixedFloatsHistogramsAggWarning(e.Expr.PositionRange())) continue } switch { case aggr.incompatibleHistograms: continue case aggr.hasHistogram: aggr.histogramValue.Compact(0) default: aggr.floatValue += aggr.floatKahanC } default: // For other aggregations, we already have the right value. } ss := &outputMatrix[ri] addToSeries(ss, enh.Ts, aggr.floatValue, aggr.histogramValue, numSteps) ss.DropName = inputMatrix[ri].DropName } return annos } // aggregationK evaluates topk, bottomk, limitk, or limit_ratio at one timestep on inputMatrix. // Output that has the same labels as the input, but just k of them per group. // seriesToResult maps inputMatrix indexes to groups indexes. // For an instant query, returns a Matrix in descending order for topk or ascending for bottomk, or without any order for limitk / limit_ratio. // For a range query, aggregates output in the seriess map. func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int, r float64, inputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { op := e.Op var s Sample var annos annotations.Annotations // Used to short-cut the loop for LIMITK if we already collected k elements for every group groupsRemaining := len(groups) for i := range groups { groups[i].seen = false } seriesLoop: for si := range inputMatrix { f, _, ok := ev.nextValues(enh.Ts, &inputMatrix[si]) if !ok { continue } s = Sample{Metric: inputMatrix[si].Metric, F: f, DropName: inputMatrix[si].DropName} group := &groups[seriesToResult[si]] // Initialize this group if it's the first time we've seen it. if !group.seen { // LIMIT_RATIO is a special case, as we may not add this very sample to the heap, // while we also don't know the final size of it. if op == parser.LIMIT_RATIO { *group = groupedAggregation{ seen: true, heap: make(vectorByValueHeap, 0), } if ratiosampler.AddRatioSample(r, &s) { heap.Push(&group.heap, &s) } } else { *group = groupedAggregation{ seen: true, heap: make(vectorByValueHeap, 1, k), } group.heap[0] = s } continue } switch op { case parser.TOPK: // We build a heap of up to k elements, with the smallest element at heap[0]. switch { case len(group.heap) < k: heap.Push(&group.heap, &s) case group.heap[0].F < s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)): // This new element is bigger than the previous smallest element - overwrite that. group.heap[0] = s if k > 1 { heap.Fix(&group.heap, 0) // Maintain the heap invariant. } } case parser.BOTTOMK: // We build a heap of up to k elements, with the biggest element at heap[0]. switch { case len(group.heap) < k: heap.Push((*vectorByReverseValueHeap)(&group.heap), &s) case group.heap[0].F > s.F || (math.IsNaN(group.heap[0].F) && !math.IsNaN(s.F)): // This new element is smaller than the previous biggest element - overwrite that. group.heap[0] = s if k > 1 { heap.Fix((*vectorByReverseValueHeap)(&group.heap), 0) // Maintain the heap invariant. } } case parser.LIMITK: if len(group.heap) < k { heap.Push(&group.heap, &s) } // LIMITK optimization: early break if we've added K elem to _every_ group, // especially useful for large timeseries where the user is exploring labels via e.g. // limitk(10, my_metric) if !group.groupAggrComplete && len(group.heap) == k { group.groupAggrComplete = true groupsRemaining-- if groupsRemaining == 0 { break seriesLoop } } case parser.LIMIT_RATIO: if ratiosampler.AddRatioSample(r, &s) { heap.Push(&group.heap, &s) } default: panic(fmt.Errorf("expected aggregation operator but got %q", op)) } } // Construct the result from the aggregated groups. numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 var mat Matrix if ev.endTimestamp == ev.startTimestamp { mat = make(Matrix, 0, len(groups)) } add := func(lbls labels.Labels, f float64, dropName bool) { // If this could be an instant query, add directly to the matrix so the result is in consistent order. if ev.endTimestamp == ev.startTimestamp { mat = append(mat, Series{Metric: lbls, Floats: []FPoint{{T: enh.Ts, F: f}}, DropName: dropName}) } else { // Otherwise the results are added into seriess elements. hash := lbls.Hash() ss, ok := seriess[hash] if !ok { ss = Series{Metric: lbls, DropName: dropName} } addToSeries(&ss, enh.Ts, f, nil, numSteps) seriess[hash] = ss } } for _, aggr := range groups { if !aggr.seen { continue } switch op { case parser.TOPK: // The heap keeps the lowest value on top, so reverse it. if len(aggr.heap) > 1 { sort.Sort(sort.Reverse(aggr.heap)) } for _, v := range aggr.heap { add(v.Metric, v.F, v.DropName) } case parser.BOTTOMK: // The heap keeps the highest value on top, so reverse it. if len(aggr.heap) > 1 { sort.Sort(sort.Reverse((*vectorByReverseValueHeap)(&aggr.heap))) } for _, v := range aggr.heap { add(v.Metric, v.F, v.DropName) } case parser.LIMITK, parser.LIMIT_RATIO: for _, v := range aggr.heap { add(v.Metric, v.F, v.DropName) } } } return mat, annos } // aggregationCountValues evaluates count_values on vec. // Outputs as many series per group as there are values in the input. func (ev *evaluator) aggregationCountValues(e *parser.AggregateExpr, grouping []string, valueLabel string, vec Vector, enh *EvalNodeHelper) (Vector, annotations.Annotations) { type groupCount struct { labels labels.Labels count int } result := map[uint64]*groupCount{} var buf []byte for _, s := range vec { enh.resetBuilder(s.Metric) enh.lb.Set(valueLabel, strconv.FormatFloat(s.F, 'f', -1, 64)) metric := enh.lb.Labels() // Considering the count_values() // operator is less frequently used than other aggregations, we're fine having to // re-compute the grouping key on each step for this case. var groupingKey uint64 groupingKey, buf = generateGroupingKey(metric, grouping, e.Without, buf) group, ok := result[groupingKey] // Add a new group if it doesn't exist. if !ok { result[groupingKey] = &groupCount{ labels: generateGroupingLabels(enh, metric, e.Without, grouping), count: 1, } continue } group.count++ } // Construct the result Vector from the aggregated groups. for _, aggr := range result { enh.Out = append(enh.Out, Sample{ Metric: aggr.labels, F: float64(aggr.count), }) } return enh.Out, nil } func (ev *evaluator) cleanupMetricLabels(v parser.Value) { if v.Type() == parser.ValueTypeMatrix { mat := v.(Matrix) for i := range mat { if mat[i].DropName { mat[i].Metric = mat[i].Metric.DropMetricName() } } if mat.ContainsSameLabelset() { ev.errorf("vector cannot contain metrics with the same labelset") } } else if v.Type() == parser.ValueTypeVector { vec := v.(Vector) for i := range vec { if vec[i].DropName { vec[i].Metric = vec[i].Metric.DropMetricName() } } if vec.ContainsSameLabelset() { ev.errorf("vector cannot contain metrics with the same labelset") } } } func addToSeries(ss *Series, ts int64, f float64, h *histogram.FloatHistogram, numSteps int) { if h == nil { if ss.Floats == nil { ss.Floats = getFPointSlice(numSteps) } ss.Floats = append(ss.Floats, FPoint{T: ts, F: f}) return } if ss.Histograms == nil { ss.Histograms = getHPointSlice(numSteps) } ss.Histograms = append(ss.Histograms, HPoint{T: ts, H: h}) } func (ev *evaluator) nextValues(ts int64, series *Series) (f float64, h *histogram.FloatHistogram, b bool) { switch { case len(series.Floats) > 0 && series.Floats[0].T == ts: f = series.Floats[0].F series.Floats = series.Floats[1:] // Move input vectors forward case len(series.Histograms) > 0 && series.Histograms[0].T == ts: h = series.Histograms[0].H series.Histograms = series.Histograms[1:] default: return f, h, false } return f, h, true } // handleAggregationError adds the appropriate annotation based on the aggregation error. func handleAggregationError(err error, e *parser.AggregateExpr, metricName string, annos *annotations.Annotations) { pos := e.Expr.PositionRange() if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { annos.Add(annotations.NewMixedExponentialCustomHistogramsWarning(metricName, pos)) } else if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { annos.Add(annotations.NewIncompatibleCustomBucketsHistogramsWarning(metricName, pos)) } } // handleVectorBinopError returns the appropriate annotation based on the vector binary operation error. func handleVectorBinopError(err error, e *parser.BinaryExpr) annotations.Annotations { if err == nil { return nil } metricName := "" pos := e.PositionRange() if errors.Is(err, annotations.PromQLInfo) || errors.Is(err, annotations.PromQLWarning) { return annotations.New().Add(err) } if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { return annotations.New().Add(annotations.NewMixedExponentialCustomHistogramsWarning(metricName, pos)) } else if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { return annotations.New().Add(annotations.NewIncompatibleCustomBucketsHistogramsWarning(metricName, pos)) } return nil } // groupingKey builds and returns the grouping key for the given metric and // grouping labels. func generateGroupingKey(metric labels.Labels, grouping []string, without bool, buf []byte) (uint64, []byte) { if without { return metric.HashWithoutLabels(buf, grouping...) } if len(grouping) == 0 { // No need to generate any hash if there are no grouping labels. return 0, buf } return metric.HashForLabels(buf, grouping...) } func generateGroupingLabels(enh *EvalNodeHelper, metric labels.Labels, without bool, grouping []string) labels.Labels { enh.resetBuilder(metric) switch { case without: enh.lb.Del(grouping...) enh.lb.Del(labels.MetricName) return enh.lb.Labels() case len(grouping) > 0: enh.lb.Keep(grouping...) return enh.lb.Labels() default: return labels.EmptyLabels() } } // btos returns 1 if b is true, 0 otherwise. func btos(b bool) float64 { if b { return 1 } return 0 } // shouldDropMetricName returns whether the metric name should be dropped in the // result of the op operation. func shouldDropMetricName(op parser.ItemType) bool { switch op { case parser.ADD, parser.SUB, parser.DIV, parser.MUL, parser.POW, parser.MOD, parser.ATAN2: return true default: return false } } // NewOriginContext returns a new context with data about the origin attached. func NewOriginContext(ctx context.Context, data map[string]interface{}) context.Context { return context.WithValue(ctx, QueryOrigin{}, data) } func formatDate(t time.Time) string { return t.UTC().Format("2006-01-02T15:04:05.000Z07:00") } // unwrapParenExpr does the AST equivalent of removing parentheses around a expression. func unwrapParenExpr(e *parser.Expr) { for { if p, ok := (*e).(*parser.ParenExpr); ok { *e = p.Expr } else { break } } } func unwrapStepInvariantExpr(e parser.Expr) parser.Expr { if p, ok := e.(*parser.StepInvariantExpr); ok { return p.Expr } return e } // PreprocessExpr wraps all possible step invariant parts of the given expression with // StepInvariantExpr. It also resolves the preprocessors. func PreprocessExpr(expr parser.Expr, start, end time.Time) parser.Expr { detectHistogramStatsDecoding(expr) isStepInvariant := preprocessExprHelper(expr, start, end) if isStepInvariant { return newStepInvariantExpr(expr) } return expr } // preprocessExprHelper wraps the child nodes of the expression // with a StepInvariantExpr wherever it's step invariant. The returned boolean is true if the // passed expression qualifies to be wrapped by StepInvariantExpr. // It also resolves the preprocessors. func preprocessExprHelper(expr parser.Expr, start, end time.Time) bool { switch n := expr.(type) { case *parser.VectorSelector: switch n.StartOrEnd { case parser.START: n.Timestamp = makeInt64Pointer(timestamp.FromTime(start)) case parser.END: n.Timestamp = makeInt64Pointer(timestamp.FromTime(end)) } return n.Timestamp != nil case *parser.AggregateExpr: return preprocessExprHelper(n.Expr, start, end) case *parser.BinaryExpr: isInvariant1, isInvariant2 := preprocessExprHelper(n.LHS, start, end), preprocessExprHelper(n.RHS, start, end) if isInvariant1 && isInvariant2 { return true } if isInvariant1 { n.LHS = newStepInvariantExpr(n.LHS) } if isInvariant2 { n.RHS = newStepInvariantExpr(n.RHS) } return false case *parser.Call: _, ok := AtModifierUnsafeFunctions[n.Func.Name] isStepInvariant := !ok isStepInvariantSlice := make([]bool, len(n.Args)) for i := range n.Args { isStepInvariantSlice[i] = preprocessExprHelper(n.Args[i], start, end) isStepInvariant = isStepInvariant && isStepInvariantSlice[i] } if isStepInvariant { // The function and all arguments are step invariant. return true } for i, isi := range isStepInvariantSlice { if isi { n.Args[i] = newStepInvariantExpr(n.Args[i]) } } return false case *parser.MatrixSelector: return preprocessExprHelper(n.VectorSelector, start, end) case *parser.SubqueryExpr: // Since we adjust offset for the @ modifier evaluation, // it gets tricky to adjust it for every subquery step. // Hence we wrap the inside of subquery irrespective of // @ on subquery (given it is also step invariant) so that // it is evaluated only once w.r.t. the start time of subquery. isInvariant := preprocessExprHelper(n.Expr, start, end) if isInvariant { n.Expr = newStepInvariantExpr(n.Expr) } switch n.StartOrEnd { case parser.START: n.Timestamp = makeInt64Pointer(timestamp.FromTime(start)) case parser.END: n.Timestamp = makeInt64Pointer(timestamp.FromTime(end)) } return n.Timestamp != nil case *parser.ParenExpr: return preprocessExprHelper(n.Expr, start, end) case *parser.UnaryExpr: return preprocessExprHelper(n.Expr, start, end) case *parser.StringLiteral, *parser.NumberLiteral: return true } panic(fmt.Sprintf("found unexpected node %#v", expr)) } func newStepInvariantExpr(expr parser.Expr) parser.Expr { return &parser.StepInvariantExpr{Expr: expr} } // setOffsetForAtModifier modifies the offset of vector and matrix selector // and subquery in the tree to accommodate the timestamp of @ modifier. // The offset is adjusted w.r.t. the given evaluation time. func setOffsetForAtModifier(evalTime int64, expr parser.Expr) { getOffset := func(ts *int64, originalOffset time.Duration, path []parser.Node) time.Duration { if ts == nil { return originalOffset } subqOffset, _, subqTs := subqueryTimes(path) if subqTs != nil { subqOffset += time.Duration(evalTime-*subqTs) * time.Millisecond } offsetForTs := time.Duration(evalTime-*ts) * time.Millisecond offsetDiff := offsetForTs - subqOffset return originalOffset + offsetDiff } parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { switch n := node.(type) { case *parser.VectorSelector: n.Offset = getOffset(n.Timestamp, n.OriginalOffset, path) case *parser.MatrixSelector: vs := n.VectorSelector.(*parser.VectorSelector) vs.Offset = getOffset(vs.Timestamp, vs.OriginalOffset, path) case *parser.SubqueryExpr: n.Offset = getOffset(n.Timestamp, n.OriginalOffset, path) } return nil }) } // detectHistogramStatsDecoding modifies the expression by setting the // SkipHistogramBuckets field in those vector selectors for which it is safe to // return only histogram statistics (sum and count), excluding histogram spans // and buckets. The function can be treated as an optimization and is not // required for correctness. func detectHistogramStatsDecoding(expr parser.Expr) { parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { if n, ok := node.(*parser.BinaryExpr); ok { detectHistogramStatsDecoding(n.LHS) detectHistogramStatsDecoding(n.RHS) return fmt.Errorf("stop") } n, ok := (node).(*parser.VectorSelector) if !ok { return nil } for _, p := range path { call, ok := p.(*parser.Call) if !ok { continue } if call.Func.Name == "histogram_count" || call.Func.Name == "histogram_sum" { n.SkipHistogramBuckets = true break } if call.Func.Name == "histogram_quantile" || call.Func.Name == "histogram_fraction" { n.SkipHistogramBuckets = false break } } return fmt.Errorf("stop") }) } func makeInt64Pointer(val int64) *int64 { valp := new(int64) *valp = val return valp } // RatioSampler allows unit-testing (previously: Randomizer). type RatioSampler interface { // Return this sample "offset" between [0.0, 1.0] sampleOffset(ts int64, sample *Sample) float64 AddRatioSample(r float64, sample *Sample) bool } // HashRatioSampler uses Hash(labels.String()) / maxUint64 as a "deterministic" // value in [0.0, 1.0]. type HashRatioSampler struct{} var ratiosampler RatioSampler = NewHashRatioSampler() func NewHashRatioSampler() *HashRatioSampler { return &HashRatioSampler{} } func (s *HashRatioSampler) sampleOffset(ts int64, sample *Sample) float64 { const ( float64MaxUint64 = float64(math.MaxUint64) ) return float64(sample.Metric.Hash()) / float64MaxUint64 } func (s *HashRatioSampler) AddRatioSample(ratioLimit float64, sample *Sample) bool { // If ratioLimit >= 0: add sample if sampleOffset is lesser than ratioLimit // // 0.0 ratioLimit 1.0 // [---------|--------------------------] // [#########...........................] // // e.g.: // sampleOffset==0.3 && ratioLimit==0.4 // 0.3 < 0.4 ? --> add sample // // Else if ratioLimit < 0: add sample if rand() return the "complement" of ratioLimit>=0 case // (loosely similar behavior to negative array index in other programming languages) // // 0.0 1+ratioLimit 1.0 // [---------|--------------------------] // [.........###########################] // // e.g.: // sampleOffset==0.3 && ratioLimit==-0.6 // 0.3 >= 0.4 ? --> don't add sample sampleOffset := s.sampleOffset(sample.T, sample) return (ratioLimit >= 0 && sampleOffset < ratioLimit) || (ratioLimit < 0 && sampleOffset >= (1.0+ratioLimit)) } type histogramStatsSeries struct { storage.Series } func newHistogramStatsSeries(series storage.Series) *histogramStatsSeries { return &histogramStatsSeries{Series: series} } func (s histogramStatsSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return NewHistogramStatsIterator(s.Series.Iterator(it)) }