From 606ef33d9106632ea9d54f75993f39a89bc869da Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 1 Feb 2022 18:07:23 -0800 Subject: [PATCH] Track and report Samples Queried per query We always track total samples queried and add those to the standard set of stats queries can report. We also allow optionally tracking per-step samples queried. This must be enabled both at the engine and query level to be tracked and rendered. The engine flag is exposed via a Prometheus feature flag, while the query flag is set when stats=all. Co-authored-by: Alan Protasio Co-authored-by: Andrew Bloomgarden Co-authored-by: Harkishen Singh Signed-off-by: Andrew Bloomgarden --- cmd/prometheus/main.go | 7 +- cmd/promtool/unittest.go | 2 +- docs/feature_flags.md | 10 + promql/bench_test.go | 2 +- promql/engine.go | 77 +++++-- promql/engine_test.go | 465 ++++++++++++++++++++++++++++++++++++-- promql/functions_test.go | 2 +- promql/test.go | 5 +- rules/manager.go | 2 +- util/stats/query_stats.go | 145 +++++++++++- util/stats/stats_test.go | 18 +- web/api/v1/api.go | 16 +- web/api/v1/api_test.go | 93 ++++++++ 13 files changed, 793 insertions(+), 51 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 6e8fad8063..6d403ec27c 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -148,6 +148,7 @@ type flagConfig struct { // for ease of use. enableExpandExternalLabels bool enableNewSDManager bool + enablePerStepStats bool prometheusURL string corsRegexString string @@ -182,6 +183,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "agent": agentMode = true level.Info(logger).Log("msg", "Experimental agent mode enabled.") + case "promql-per-step-stats": + c.enablePerStepStats = true + level.Info(logger).Log("msg", "Experimental per-step statistics reporting") case "": continue case "promql-at-modifier", "promql-negative-offset": @@ -378,7 +382,7 @@ func main() { serverOnlyFlag(a, "query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return."). Default("50000000").IntVar(&cfg.queryMaxSamples) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). Default("").StringsVar(&cfg.featureList) promlogflag.AddFlags(a, &cfg.promlogConfig) @@ -573,6 +577,7 @@ func main() { // always on for regular PromQL as of Prometheus v2.33. EnableAtModifier: true, EnableNegativeOffset: true, + EnablePerStepStats: cfg.enablePerStepStats, } queryEngine = promql.NewEngine(opts) diff --git a/cmd/promtool/unittest.go b/cmd/promtool/unittest.go index b8704231d0..0e05997b80 100644 --- a/cmd/promtool/unittest.go +++ b/cmd/promtool/unittest.go @@ -435,7 +435,7 @@ func (tg *testGroup) maxEvalTime() time.Duration { } func query(ctx context.Context, qs string, t time.Time, engine *promql.Engine, qu storage.Queryable) (promql.Vector, error) { - q, err := engine.NewInstantQuery(qu, qs, t) + q, err := engine.NewInstantQuery(qu, nil, qs, t) if err != nil { return nil, err } diff --git a/docs/feature_flags.md b/docs/feature_flags.md index 7715655e7a..17923162b6 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -78,3 +78,13 @@ discovery, scrape and remote write. This is useful when you do not need to query the Prometheus data locally, but only from a central [remote endpoint](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage). + +## Per-step stats + +`--enable-feature=promql-per-step-stats` + +When enabled, passing `stats=all` in a query request returns per-step +statistics. Currently this is limited to totalQueryableSamples. + +When disabled in either the engine or the query, per-step statistics are not +computed at all. diff --git a/promql/bench_test.go b/promql/bench_test.go index a46b4fcf7a..c3de6ca47d 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -217,7 +217,7 @@ func BenchmarkRangeQuery(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { qry, err := engine.NewRangeQuery( - stor, c.expr, + stor, nil, c.expr, time.Unix(int64((numIntervals-c.steps)*10), 0), time.Unix(int64(numIntervals*10), 0), time.Second*10) if err != nil { diff --git a/promql/engine.go b/promql/engine.go index 687ad678a7..22ce7649b0 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -118,13 +118,18 @@ type Query interface { // Statement returns the parsed statement of the query. Statement() parser.Statement // Stats returns statistics about the lifetime of the query. - Stats() *stats.QueryTimers + Stats() *stats.Statistics // Cancel signals that a running query execution should be aborted. Cancel() // String returns the original query string. String() string } +type QueryOpts struct { + // Enables recording per-step statistics if the engine has it enabled as well. Disabled by default. + EnablePerStepStats bool +} + // query implements the Query interface. type query struct { // Underlying data provider. @@ -135,6 +140,8 @@ type query struct { stmt parser.Statement // Timer stats for the query execution. stats *stats.QueryTimers + // Sample stats for the query execution. + sampleStats *stats.QuerySamples // Result matrix for reuse. matrix Matrix // Cancellation function for the query. @@ -159,8 +166,11 @@ func (q *query) String() string { } // Stats implements the Query interface. -func (q *query) Stats() *stats.QueryTimers { - return q.stats +func (q *query) Stats() *stats.Statistics { + return &stats.Statistics{ + Timers: q.stats, + Samples: q.sampleStats, + } } // Cancel implements the Query interface. @@ -254,6 +264,9 @@ type EngineOpts struct { // is still provided here for those using the Engine outside of // Prometheus. EnableNegativeOffset bool + + // EnablePerStepStats if true allows for per-step stats to be computed on request. Disabled otherwise. + EnablePerStepStats bool } // Engine handles the lifetime of queries from beginning to end. @@ -270,6 +283,7 @@ type Engine struct { noStepSubqueryIntervalFn func(rangeMillis int64) int64 enableAtModifier bool enableNegativeOffset bool + enablePerStepStats bool } // NewEngine returns a new engine. @@ -352,6 +366,7 @@ func NewEngine(opts EngineOpts) *Engine { noStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn, enableAtModifier: opts.EnableAtModifier, enableNegativeOffset: opts.EnableNegativeOffset, + enablePerStepStats: opts.EnablePerStepStats, } } @@ -379,12 +394,12 @@ func (ng *Engine) SetQueryLogger(l QueryLogger) { } // NewInstantQuery returns an evaluation query for the given expression at the given time. -func (ng *Engine) NewInstantQuery(q storage.Queryable, qs string, ts time.Time) (Query, error) { +func (ng *Engine) NewInstantQuery(q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) { expr, err := parser.ParseExpr(qs) if err != nil { return nil, err } - qry, err := ng.newQuery(q, expr, ts, ts, 0) + qry, err := ng.newQuery(q, opts, expr, ts, ts, 0) if err != nil { return nil, err } @@ -395,7 +410,7 @@ func (ng *Engine) NewInstantQuery(q storage.Queryable, qs string, ts time.Time) // NewRangeQuery returns an evaluation query for the given time range and with // the resolution set by the interval. -func (ng *Engine) NewRangeQuery(q storage.Queryable, qs string, start, end time.Time, interval time.Duration) (Query, error) { +func (ng *Engine) NewRangeQuery(q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) { expr, err := parser.ParseExpr(qs) if err != nil { return nil, err @@ -403,7 +418,7 @@ func (ng *Engine) NewRangeQuery(q storage.Queryable, qs string, start, end time. if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar { return nil, errors.Errorf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type())) } - qry, err := ng.newQuery(q, expr, start, end, interval) + qry, err := ng.newQuery(q, opts, expr, start, end, interval) if err != nil { return nil, err } @@ -412,11 +427,16 @@ func (ng *Engine) NewRangeQuery(q storage.Queryable, qs string, start, end time. return qry, nil } -func (ng *Engine) newQuery(q storage.Queryable, expr parser.Expr, start, end time.Time, interval time.Duration) (*query, error) { +func (ng *Engine) newQuery(q storage.Queryable, opts *QueryOpts, expr parser.Expr, start, end time.Time, interval time.Duration) (*query, error) { if err := ng.validateOpts(expr); err != nil { return nil, err } + // Default to empty QueryOpts if not provided. + if opts == nil { + opts = &QueryOpts{} + } + es := &parser.EvalStmt{ Expr: PreprocessExpr(expr, start, end), Start: start, @@ -424,10 +444,11 @@ func (ng *Engine) newQuery(q storage.Queryable, expr parser.Expr, start, end tim Interval: interval, } qry := &query{ - stmt: es, - ng: ng, - stats: stats.NewQueryTimers(), - queryable: q, + stmt: es, + ng: ng, + stats: stats.NewQueryTimers(), + sampleStats: stats.NewQuerySamples(ng.enablePerStepStats && opts.EnablePerStepStats), + queryable: q, } return qry, nil } @@ -490,10 +511,11 @@ func (ng *Engine) validateOpts(expr parser.Expr) error { func (ng *Engine) newTestQuery(f func(context.Context) error) Query { qry := &query{ - q: "test statement", - stmt: parser.TestStmt(f), - ng: ng, - stats: stats.NewQueryTimers(), + q: "test statement", + stmt: parser.TestStmt(f), + ng: ng, + stats: stats.NewQueryTimers(), + sampleStats: stats.NewQuerySamples(ng.enablePerStepStats), } return qry } @@ -615,8 +637,10 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval maxSamples: ng.maxSamplesPerQuery, logger: ng.logger, lookbackDelta: ng.lookbackDelta, + samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, } + query.sampleStats.InitStepTracking(start, start, 1) val, warnings, err := evaluator.Eval(s.Expr) if err != nil { @@ -665,8 +689,10 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval maxSamples: ng.maxSamplesPerQuery, logger: ng.logger, lookbackDelta: ng.lookbackDelta, + samplesStats: query.sampleStats, noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, } + query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval) val, warnings, err := evaluator.Eval(s.Expr) if err != nil { return nil, warnings, err @@ -896,6 +922,7 @@ type evaluator struct { currentSamples int logger log.Logger lookbackDelta time.Duration + samplesStats *stats.QuerySamples noStepSubqueryIntervalFn func(rangeMillis int64) int64 } @@ -1154,7 +1181,11 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) // evalSubquery evaluates given SubqueryExpr and returns an equivalent // evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set. func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, storage.Warnings) { + sampleStats := ev.samplesStats + // Avoid double counting samples when running a subquery, those samples will be counted in later stage. + ev.samplesStats = nil val, ws := ev.eval(subq) + ev.samplesStats = sampleStats mat := val.(Matrix) vs := &parser.VectorSelector{ OriginalOffset: subq.OriginalOffset, @@ -1360,6 +1391,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { enh.Ts = ts // Make the function call. outVec := call(inArgs, e.Args, enh) + ev.samplesStats.IncrementSamplesAtStep(step, len(points)) enh.Out = outVec[:0] if len(outVec) > 0 { ss.Points = append(ss.Points, Point{V: outVec[0].Point.V, T: ts}) @@ -1511,11 +1543,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { Points: getPointSlice(numSteps), } - for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { + for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { + step++ _, v, ok := ev.vectorSelectorSingle(it, e, ts) if ok { if ev.currentSamples < ev.maxSamples { ss.Points = append(ss.Points, Point{V: v, T: ts}) + ev.samplesStats.IncrementSamplesAtStep(step, 1) ev.currentSamples++ } else { ev.error(ErrTooManySamples(env)) @@ -1547,6 +1581,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { maxSamples: ev.maxSamples, logger: ev.logger, lookbackDelta: ev.lookbackDelta, + samplesStats: ev.samplesStats.NewChild(), noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn, } @@ -1572,6 +1607,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { res, ws := newEv.eval(e.Expr) ev.currentSamples = newEv.currentSamples + ev.samplesStats.IncrementSamplesAtTimestamp(ev.endTimestamp, newEv.samplesStats.TotalSamples) return res, ws case *parser.StepInvariantExpr: switch ce := e.Expr.(type) { @@ -1588,10 +1624,15 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { maxSamples: ev.maxSamples, logger: ev.logger, lookbackDelta: ev.lookbackDelta, + samplesStats: ev.samplesStats.NewChild(), noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn, } res, ws := newEv.eval(e.Expr) ev.currentSamples = newEv.currentSamples + for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts = ts + ev.interval { + step++ + ev.samplesStats.IncrementSamplesAtStep(step, newEv.samplesStats.TotalSamples) + } switch e.Expr.(type) { case *parser.MatrixSelector, *parser.SubqueryExpr: // We do not duplicate results for range selectors since result is a matrix @@ -1646,6 +1687,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect }) ev.currentSamples++ + ev.samplesStats.IncrementSamplesAtTimestamp(ts, 1) if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } @@ -1727,6 +1769,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storag } ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16)) + ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, len(ss.Points)) if len(ss.Points) > 0 { matrix = append(matrix, ss) diff --git a/promql/engine_test.go b/promql/engine_test.go index f7f6b216f4..35a123f264 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -23,6 +23,9 @@ import ( "time" "github.com/go-kit/log" + + "github.com/prometheus/prometheus/util/stats" + "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -225,14 +228,14 @@ func TestQueryError(t *testing.T) { ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() - vectorQuery, err := engine.NewInstantQuery(queryable, "foo", time.Unix(1, 0)) + vectorQuery, err := engine.NewInstantQuery(queryable, nil, "foo", time.Unix(1, 0)) require.NoError(t, err) res := vectorQuery.Exec(ctx) require.Error(t, res.Err, "expected error on failed select but got none") require.True(t, errors.Is(res.Err, errStorage), "expected error doesn't match") - matrixQuery, err := engine.NewInstantQuery(queryable, "foo[1m]", time.Unix(1, 0)) + matrixQuery, err := engine.NewInstantQuery(queryable, nil, "foo[1m]", time.Unix(1, 0)) require.NoError(t, err) res = matrixQuery.Exec(ctx) @@ -559,9 +562,9 @@ func TestSelectHintsSetCorrectly(t *testing.T) { err error ) if tc.end == 0 { - query, err = engine.NewInstantQuery(hintsRecorder, tc.query, timestamp.Time(tc.start)) + query, err = engine.NewInstantQuery(hintsRecorder, nil, tc.query, timestamp.Time(tc.start)) } else { - query, err = engine.NewRangeQuery(hintsRecorder, tc.query, timestamp.Time(tc.start), timestamp.Time(tc.end), time.Second) + query, err = engine.NewRangeQuery(hintsRecorder, nil, tc.query, timestamp.Time(tc.start), timestamp.Time(tc.end), time.Second) } require.NoError(t, err) @@ -719,9 +722,9 @@ load 10s var err error var qry Query if c.Interval == 0 { - qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), c.Query, c.Start) + qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), nil, c.Query, c.Start) } else { - qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval) + qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) } require.NoError(t, err) @@ -736,6 +739,438 @@ load 10s } } +func TestQueryStatistics(t *testing.T) { + test, err := NewTest(t, ` +load 10s + metricWith1SampleEvery10Seconds 1+1x100 + metricWith3SampleEvery10Seconds{a="1",b="1"} 1+1x100 + metricWith3SampleEvery10Seconds{a="2",b="2"} 1+1x100 + metricWith3SampleEvery10Seconds{a="3",b="2"} 1+1x100 +`) + + require.NoError(t, err) + defer test.Close() + + err = test.Run() + require.NoError(t, err) + + cases := []struct { + Query string + TotalSamples int + TotalSamplesPerStep stats.TotalSamplesPerStep + Start time.Time + End time.Time + Interval time.Duration + }{ + { + Query: `"literal string"`, + Start: time.Unix(21, 0), + TotalSamples: 0, + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 21000: 0, + }, + }, + { + Query: "1", + Start: time.Unix(21, 0), + TotalSamples: 0, + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 21000: 0, + }, + }, + { + Query: "metricWith1SampleEvery10Seconds", + Start: time.Unix(21, 0), + TotalSamples: 1, // 1 sample / 10 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 21000: 1, + }, + }, + { + // timestamp function has a special handling. + Query: "timestamp(metricWith1SampleEvery10Seconds)", + Start: time.Unix(21, 0), + TotalSamples: 1, // 1 sample / 10 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 21000: 1, + }, + }, + { + Query: "metricWith1SampleEvery10Seconds", + Start: time.Unix(22, 0), + TotalSamples: 1, // 1 sample / 10 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 22000: 1, // Aligned to the step time, not the sample time. + }, + }, + { + Query: "metricWith1SampleEvery10Seconds offset 10s", + Start: time.Unix(21, 0), + TotalSamples: 1, // 1 sample / 10 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 21000: 1, + }, + }, + { + Query: "metricWith1SampleEvery10Seconds @ 15", + Start: time.Unix(21, 0), + TotalSamples: 1, // 1 sample / 10 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 21000: 1, + }, + }, + { + Query: `metricWith3SampleEvery10Seconds{a="1"}`, + Start: time.Unix(21, 0), + TotalSamples: 1, // 1 sample / 10 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 21000: 1, + }, + }, + { + Query: `metricWith3SampleEvery10Seconds{a="1"} @ 19`, + Start: time.Unix(21, 0), + TotalSamples: 1, // 1 sample / 10 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 21000: 1, + }, + }, + { + Query: `metricWith3SampleEvery10Seconds{a="1"}[20s] @ 19`, + Start: time.Unix(21, 0), + TotalSamples: 2, // (1 sample / 10 seconds) * 20s + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 21000: 2, + }, + }, + { + Query: "metricWith3SampleEvery10Seconds", + Start: time.Unix(21, 0), + TotalSamples: 3, // 3 samples / 10 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 21000: 3, + }, + }, + { + Query: "metricWith1SampleEvery10Seconds[60s]", + Start: time.Unix(201, 0), + TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 6, + }, + }, + { + Query: "max_over_time(metricWith1SampleEvery10Seconds[59s])[20s:5s]", + Start: time.Unix(201, 0), + TotalSamples: 24, // (1 sample / 10 seconds * 60 seconds) * 60/5 (using 59s so we always return 6 samples + // as if we run a query on 00 looking back 60 seconds we will return 7 samples; + // see next test). + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 24, + }, + }, + { + Query: "max_over_time(metricWith1SampleEvery10Seconds[60s])[20s:5s]", + Start: time.Unix(201, 0), + TotalSamples: 26, // (1 sample / 10 seconds * 60 seconds) + 2 as + // max_over_time(metricWith1SampleEvery10Seconds[60s]) @ 190 and 200 will return 7 samples. + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 26, + }, + }, + { + Query: "metricWith1SampleEvery10Seconds[60s] @ 30", + Start: time.Unix(201, 0), + TotalSamples: 4, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 1 series + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 4, + }, + }, + { + Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))", + Start: time.Unix(201, 0), + TotalSamples: 12, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 12, + }, + }, + { + Query: "sum by (b) (max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))", + Start: time.Unix(201, 0), + TotalSamples: 12, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 12, + }, + }, + { + Query: "metricWith1SampleEvery10Seconds[60s] offset 10s", + Start: time.Unix(201, 0), + TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 6, + }, + }, + { + Query: "metricWith3SampleEvery10Seconds[60s]", + Start: time.Unix(201, 0), + TotalSamples: 18, // 3 sample / 10 seconds * 60 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 18, + }, + }, + { + Query: "max_over_time(metricWith1SampleEvery10Seconds[60s])", + Start: time.Unix(201, 0), + TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 6, + }, + }, + { + Query: "absent_over_time(metricWith1SampleEvery10Seconds[60s])", + Start: time.Unix(201, 0), + TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 6, + }, + }, + { + Query: "max_over_time(metricWith3SampleEvery10Seconds[60s])", + Start: time.Unix(201, 0), + TotalSamples: 18, // 3 sample / 10 seconds * 60 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 18, + }, + }, + { + Query: "metricWith1SampleEvery10Seconds[60s:5s]", + Start: time.Unix(201, 0), + TotalSamples: 12, // 1 sample per query * 12 queries (60/5) + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 12, + }, + }, + { + Query: "metricWith1SampleEvery10Seconds[60s:5s] offset 10s", + Start: time.Unix(201, 0), + TotalSamples: 12, // 1 sample per query * 12 queries (60/5) + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 12, + }, + }, + { + Query: "max_over_time(metricWith3SampleEvery10Seconds[60s:5s])", + Start: time.Unix(201, 0), + TotalSamples: 36, // 3 sample per query * 12 queries (60/5) + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 36, + }, + }, + { + Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))", + Start: time.Unix(201, 0), + TotalSamples: 72, // 2 * (3 sample per query * 12 queries (60/5)) + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 72, + }, + }, + { + Query: `metricWith3SampleEvery10Seconds{a="1"}`, + Start: time.Unix(201, 0), + End: time.Unix(220, 0), + Interval: 5 * time.Second, + TotalSamples: 4, // 1 sample per query * 4 steps + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 1, + 206000: 1, + 211000: 1, + 216000: 1, + }, + }, + { + Query: `metricWith3SampleEvery10Seconds{a="1"}`, + Start: time.Unix(204, 0), + End: time.Unix(223, 0), + Interval: 5 * time.Second, + TotalSamples: 4, // 1 sample per query * 4 steps + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 204000: 1, // aligned to the step time, not the sample time + 209000: 1, + 214000: 1, + 219000: 1, + }, + }, + { + // timestamp function as a special handling + Query: "timestamp(metricWith1SampleEvery10Seconds)", + Start: time.Unix(201, 0), + End: time.Unix(220, 0), + Interval: 5 * time.Second, + TotalSamples: 4, // (1 sample / 10 seconds) * 4 steps + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 1, + 206000: 1, + 211000: 1, + 216000: 1, + }, + }, + { + Query: `max_over_time(metricWith3SampleEvery10Seconds{a="1"}[10s])`, + Start: time.Unix(991, 0), + End: time.Unix(1021, 0), + Interval: 10 * time.Second, + TotalSamples: 2, // 1 sample per query * 2 steps with data + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 991000: 1, + 1001000: 1, + 1011000: 0, + 1021000: 0, + }, + }, + { + Query: `metricWith3SampleEvery10Seconds{a="1"} offset 10s`, + Start: time.Unix(201, 0), + End: time.Unix(220, 0), + Interval: 5 * time.Second, + TotalSamples: 4, // 1 sample per query * 4 steps + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 1, + 206000: 1, + 211000: 1, + 216000: 1, + }, + }, + { + Query: "max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30)", + Start: time.Unix(201, 0), + End: time.Unix(220, 0), + Interval: 5 * time.Second, + TotalSamples: 48, // @ modifier force the evaluation timestamp at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series * 4 steps + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 12, + 206000: 12, + 211000: 12, + 216000: 12, + }, + }, + { + Query: `metricWith3SampleEvery10Seconds`, + Start: time.Unix(201, 0), + End: time.Unix(220, 0), + Interval: 5 * time.Second, + TotalSamples: 12, // 3 sample per query * 4 steps + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 3, + 206000: 3, + 211000: 3, + 216000: 3, + }, + }, + { + Query: `max_over_time(metricWith3SampleEvery10Seconds[60s])`, + Start: time.Unix(201, 0), + End: time.Unix(220, 0), + Interval: 5 * time.Second, + TotalSamples: 72, // (3 sample / 10 seconds * 60 seconds) * 4 steps = 72 + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 18, + 206000: 18, + 211000: 18, + 216000: 18, + }, + }, + { + Query: "max_over_time(metricWith3SampleEvery10Seconds[60s:5s])", + Start: time.Unix(201, 0), + End: time.Unix(220, 0), + Interval: 5 * time.Second, + TotalSamples: 144, // 3 sample per query * 12 queries (60/5) * 4 steps + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 36, + 206000: 36, + 211000: 36, + 216000: 36, + }, + }, + { + Query: "max_over_time(metricWith1SampleEvery10Seconds[60s:5s])", + Start: time.Unix(201, 0), + End: time.Unix(220, 0), + Interval: 5 * time.Second, + TotalSamples: 48, // 1 sample per query * 12 queries (60/5) * 4 steps + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 12, + 206000: 12, + 211000: 12, + 216000: 12, + }, + }, + { + Query: "sum by (b) (max_over_time(metricWith1SampleEvery10Seconds[60s:5s]))", + Start: time.Unix(201, 0), + End: time.Unix(220, 0), + Interval: 5 * time.Second, + TotalSamples: 48, // 1 sample per query * 12 queries (60/5) * 4 steps + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 12, + 206000: 12, + 211000: 12, + 216000: 12, + }, + }, + { + Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))", + Start: time.Unix(201, 0), + End: time.Unix(220, 0), + Interval: 5 * time.Second, + TotalSamples: 288, // 2 * (3 sample per query * 12 queries (60/5) * 4 steps) + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 72, + 206000: 72, + 211000: 72, + 216000: 72, + }, + }, + { + Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith1SampleEvery10Seconds[60s:5s]))", + Start: time.Unix(201, 0), + End: time.Unix(220, 0), + Interval: 5 * time.Second, + TotalSamples: 192, // (1 sample per query * 12 queries (60/5) + 3 sample per query * 12 queries (60/5)) * 4 steps + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 48, + 206000: 48, + 211000: 48, + 216000: 48, + }, + }, + } + + engine := test.QueryEngine() + engine.enablePerStepStats = true + for _, c := range cases { + t.Run(c.Query, func(t *testing.T) { + opts := &QueryOpts{EnablePerStepStats: true} + + var err error + var qry Query + if c.Interval == 0 { + qry, err = engine.NewInstantQuery(test.Queryable(), opts, c.Query, c.Start) + } else { + qry, err = engine.NewRangeQuery(test.Queryable(), opts, c.Query, c.Start, c.End, c.Interval) + } + require.NoError(t, err) + + res := qry.Exec(test.Context()) + require.Nil(t, res.Err) + + stats := qry.Stats() + require.Equal(t, c.TotalSamples, stats.Samples.TotalSamples, "Total samples mismatch") + require.Equal(t, &c.TotalSamplesPerStep, stats.Samples.TotalSamplesPerStepMap(), "Total samples per time mismatch") + }) + } +} + func TestMaxQuerySamples(t *testing.T) { test, err := NewTest(t, ` load 10s @@ -892,14 +1327,16 @@ load 10s var err error var qry Query if c.Interval == 0 { - qry, err = engine.NewInstantQuery(test.Queryable(), c.Query, c.Start) + qry, err = engine.NewInstantQuery(test.Queryable(), nil, c.Query, c.Start) } else { - qry, err = engine.NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval) + qry, err = engine.NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) } require.NoError(t, err) res := qry.Exec(test.Context()) + stats := qry.Stats() require.Equal(t, expError, res.Err) + require.NotNil(t, stats) } // Within limit. @@ -1128,9 +1565,9 @@ load 1ms var err error var qry Query if c.end == 0 { - qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), c.query, start) + qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), nil, c.query, start) } else { - qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), c.query, start, end, interval) + qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.query, start, end, interval) } require.NoError(t, err) @@ -1451,7 +1888,7 @@ func TestSubquerySelector(t *testing.T) { engine := test.QueryEngine() for _, c := range tst.cases { t.Run(c.Query, func(t *testing.T) { - qry, err := engine.NewInstantQuery(test.Queryable(), c.Query, c.Start) + qry, err := engine.NewInstantQuery(test.Queryable(), nil, c.Query, c.Start) require.NoError(t, err) res := qry.Exec(test.Context()) @@ -2458,8 +2895,8 @@ func TestEngineOptsValidation(t *testing.T) { for _, c := range cases { eng := NewEngine(c.opts) - _, err1 := eng.NewInstantQuery(nil, c.query, time.Unix(10, 0)) - _, err2 := eng.NewRangeQuery(nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second) + _, err1 := eng.NewInstantQuery(nil, nil, c.query, time.Unix(10, 0)) + _, err2 := eng.NewRangeQuery(nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second) if c.fail { require.Equal(t, c.expError, err1) require.Equal(t, c.expError, err2) @@ -2606,7 +3043,7 @@ func TestRangeQuery(t *testing.T) { err = test.Run() require.NoError(t, err) - qry, err := test.QueryEngine().NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval) + qry, err := test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval) require.NoError(t, err) res := qry.Exec(test.Context()) diff --git a/promql/functions_test.go b/promql/functions_test.go index cde14461fd..f24787af1e 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -55,7 +55,7 @@ func TestDeriv(t *testing.T) { require.NoError(t, a.Commit()) - query, err := engine.NewInstantQuery(storage, "deriv(foo[30m])", timestamp.Time(1493712846939)) + query, err := engine.NewInstantQuery(storage, nil, "deriv(foo[30m])", timestamp.Time(1493712846939)) require.NoError(t, err) result := query.Exec(context.Background()) diff --git a/promql/test.go b/promql/test.go index cd734c76d0..a9408bc4c5 100644 --- a/promql/test.go +++ b/promql/test.go @@ -533,7 +533,7 @@ func (t *Test) exec(tc testCommand) error { } queries = append([]atModifierTestCase{{expr: cmd.expr, evalTime: cmd.start}}, queries...) for _, iq := range queries { - q, err := t.QueryEngine().NewInstantQuery(t.storage, iq.expr, iq.evalTime) + q, err := t.QueryEngine().NewInstantQuery(t.storage, nil, iq.expr, iq.evalTime) if err != nil { return err } @@ -555,7 +555,7 @@ func (t *Test) exec(tc testCommand) error { // Check query returns same result in range mode, // by checking against the middle step. - q, err = t.queryEngine.NewRangeQuery(t.storage, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute) + q, err = t.queryEngine.NewRangeQuery(t.storage, nil, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute) if err != nil { return err } @@ -614,6 +614,7 @@ func (t *Test) clear() { NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(1 * time.Minute) }, EnableAtModifier: true, EnableNegativeOffset: true, + EnablePerStepStats: true, } t.queryEngine = NewEngine(opts) diff --git a/rules/manager.go b/rules/manager.go index 60c5b66043..ca6e363c5e 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -185,7 +185,7 @@ type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector, // It converts scalar into vector results. func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { - q, err := engine.NewInstantQuery(q, qs, t) + q, err := engine.NewInstantQuery(q, nil, qs, t) if err != nil { return nil, err } diff --git a/util/stats/query_stats.go b/util/stats/query_stats.go index 8f2f37e546..7a6c0f18ea 100644 --- a/util/stats/query_stats.go +++ b/util/stats/query_stats.go @@ -15,6 +15,9 @@ package stats import ( "context" + "encoding/json" + "fmt" + "strconv" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel" @@ -75,6 +78,23 @@ func (s QueryTiming) SpanOperation() string { } } +// stepStat represents a single statistic for a given step timestamp. +type stepStat struct { + T int64 + V float64 +} + +func (s stepStat) String() string { + v := strconv.FormatFloat(s.V, 'f', -1, 64) + return fmt.Sprintf("%v @[%v]", v, s.T) +} + +// MarshalJSON implements json.Marshaler. +func (s stepStat) MarshalJSON() ([]byte, error) { + v := strconv.FormatFloat(s.V, 'f', -1, 64) + return json.Marshal([...]interface{}{float64(s.T) / 1000, v}) +} + // queryTimings with all query timers mapped to durations. type queryTimings struct { EvalTotalTime float64 `json:"evalTotalTime"` @@ -85,15 +105,26 @@ type queryTimings struct { ExecTotalTime float64 `json:"execTotalTime"` } +type querySamples struct { + TotalQueryableSamplesPerStep []stepStat `json:"totalQueryableSamplesPerStep,omitempty"` + TotalQueryableSamples int `json:"totalQueryableSamples"` +} + // QueryStats currently only holding query timings. type QueryStats struct { - Timings queryTimings `json:"timings,omitempty"` + Timings queryTimings `json:"timings,omitempty"` + Samples *querySamples `json:"samples,omitempty"` } // NewQueryStats makes a QueryStats struct with all QueryTimings found in the // given TimerGroup. -func NewQueryStats(tg *QueryTimers) *QueryStats { - var qt queryTimings +func NewQueryStats(s *Statistics) *QueryStats { + var ( + qt queryTimings + samples *querySamples + tg = s.Timers + sp = s.Samples + ) for s, timer := range tg.TimerGroup.timers { switch s { @@ -112,10 +143,41 @@ func NewQueryStats(tg *QueryTimers) *QueryStats { } } - qs := QueryStats{Timings: qt} + if sp != nil { + samples = &querySamples{ + TotalQueryableSamples: sp.TotalSamples, + } + samples.TotalQueryableSamplesPerStep = sp.totalSamplesPerStepPoints() + } + + qs := QueryStats{Timings: qt, Samples: samples} return &qs } +func (qs *QuerySamples) TotalSamplesPerStepMap() *TotalSamplesPerStep { + if !qs.EnablePerStepStats { + return nil + } + + ts := TotalSamplesPerStep{} + for _, s := range qs.totalSamplesPerStepPoints() { + ts[s.T] = int(s.V) + } + return &ts +} + +func (qs *QuerySamples) totalSamplesPerStepPoints() []stepStat { + if !qs.EnablePerStepStats { + return nil + } + + ts := make([]stepStat, len(qs.TotalSamplesPerStep)) + for i, c := range qs.TotalSamplesPerStep { + ts[i] = stepStat{T: qs.startTimestamp + int64(i)*qs.interval, V: float64(c)} + } + return ts +} + // SpanTimer unifies tracing and timing, to reduce repetition. type SpanTimer struct { timer *Timer @@ -145,14 +207,89 @@ func (s *SpanTimer) Finish() { } } +type Statistics struct { + Timers *QueryTimers + Samples *QuerySamples +} + type QueryTimers struct { *TimerGroup } +type TotalSamplesPerStep map[int64]int + +type QuerySamples struct { + // TotalSamples represents the total number of samples scanned + // while evaluating a query. + TotalSamples int + + // TotalSamplesPerStep represents the total number of samples scanned + // per step while evaluating a query. Each step should be identical to the + // TotalSamples when a step is run as an instant query, which means + // we intentionally do not account for optimizations that happen inside the + // range query engine that reduce the actual work that happens. + TotalSamplesPerStep []int + + EnablePerStepStats bool + startTimestamp int64 + interval int64 +} + +type Stats struct { + TimerStats *QueryTimers + SampleStats *QuerySamples +} + +func (qs *QuerySamples) InitStepTracking(start, end, interval int64) { + if !qs.EnablePerStepStats { + return + } + + numSteps := int((end-start)/interval) + 1 + qs.TotalSamplesPerStep = make([]int, numSteps) + qs.startTimestamp = start + qs.interval = interval +} + +// IncrementSamplesAtStep increments the total samples count. Use this if you know the step index. +func (qs *QuerySamples) IncrementSamplesAtStep(i, samples int) { + if qs == nil { + return + } + qs.TotalSamples += samples + + if qs.TotalSamplesPerStep != nil { + qs.TotalSamplesPerStep[i] += samples + } +} + +// IncrementSamplesAtTimestamp increments the total samples count. Use this if you only have the corresponding step +// timestamp. +func (qs *QuerySamples) IncrementSamplesAtTimestamp(t int64, samples int) { + if qs == nil { + return + } + qs.TotalSamples += samples + + if qs.TotalSamplesPerStep != nil { + i := int((t - qs.startTimestamp) / qs.interval) + qs.TotalSamplesPerStep[i] += samples + } +} + func NewQueryTimers() *QueryTimers { return &QueryTimers{NewTimerGroup()} } +func NewQuerySamples(enablePerStepStats bool) *QuerySamples { + qs := QuerySamples{EnablePerStepStats: enablePerStepStats} + return &qs +} + +func (qs *QuerySamples) NewChild() *QuerySamples { + return NewQuerySamples(false) +} + func (qs *QueryTimers) GetSpanTimer(ctx context.Context, qt QueryTiming, observers ...prometheus.Observer) (*SpanTimer, context.Context) { return NewSpanTimer(ctx, qt.SpanOperation(), qs.TimerGroup.GetTimer(qt), observers...) } diff --git a/util/stats/stats_test.go b/util/stats/stats_test.go index af0adaaf71..860775280e 100644 --- a/util/stats/stats_test.go +++ b/util/stats/stats_test.go @@ -41,30 +41,38 @@ func TestTimerGroupNewTimer(t *testing.T) { "Expected elapsed time to be greater than time slept.") } -func TestQueryStatsWithTimers(t *testing.T) { +func TestQueryStatsWithTimersAndSamples(t *testing.T) { qt := NewQueryTimers() + qs := NewQuerySamples(true) + qs.InitStepTracking(20001000, 25001000, 1000000) timer := qt.GetTimer(ExecTotalTime) timer.Start() time.Sleep(2 * time.Millisecond) timer.Stop() + qs.IncrementSamplesAtTimestamp(20001000, 5) + qs.IncrementSamplesAtTimestamp(25001000, 5) - qs := NewQueryStats(qt) - actual, err := json.Marshal(qs) + qstats := NewQueryStats(&Statistics{Timers: qt, Samples: qs}) + actual, err := json.Marshal(qstats) require.NoError(t, err, "unexpected error during serialization") // Timing value is one of multiple fields, unit is seconds (float). match, err := regexp.MatchString(`[,{]"execTotalTime":\d+\.\d+[,}]`, string(actual)) require.NoError(t, err, "unexpected error while matching string") require.True(t, match, "Expected timings with one non-zero entry.") + + require.Regexpf(t, `[,{]"totalQueryableSamples":10[,}]`, string(actual), "expected totalQueryableSamples") + require.Regexpf(t, `[,{]"totalQueryableSamplesPerStep":\[\[20001,"5"\],\[21001,"0"\],\[22001,"0"\],\[23001,"0"\],\[24001,"0"\],\[25001,"5"\]\]`, string(actual), "expected totalQueryableSamplesPerStep") } func TestQueryStatsWithSpanTimers(t *testing.T) { qt := NewQueryTimers() + qs := NewQuerySamples(false) ctx := &testutil.MockContext{DoneCh: make(chan struct{})} qst, _ := qt.GetSpanTimer(ctx, ExecQueueTime, prometheus.NewSummary(prometheus.SummaryOpts{})) time.Sleep(5 * time.Millisecond) qst.Finish() - qs := NewQueryStats(qt) - actual, err := json.Marshal(qs) + qstats := NewQueryStats(&Statistics{Timers: qt, Samples: qs}) + actual, err := json.Marshal(qstats) require.NoError(t, err, "unexpected error during serialization") // Timing value is one of multiple fields, unit is seconds (float). match, err := regexp.MatchString(`[,{]"execQueueTime":\d+\.\d+[,}]`, string(actual)) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 0faae93f30..67e01a777d 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -157,8 +157,8 @@ type TSDBAdminStats interface { // QueryEngine defines the interface for the *promql.Engine, so it can be replaced, wrapped or mocked. type QueryEngine interface { SetQueryLogger(l promql.QueryLogger) - NewInstantQuery(q storage.Queryable, qs string, ts time.Time) (promql.Query, error) - NewRangeQuery(q storage.Queryable, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) + NewInstantQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) + NewRangeQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) } // API can register a set of endpoints in a router and handle @@ -376,7 +376,8 @@ func (api *API) query(r *http.Request) (result apiFuncResult) { defer cancel() } - qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, r.FormValue("query"), ts) + opts := extractQueryOpts(r) + qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, opts, r.FormValue("query"), ts) if err != nil { return invalidParamError(err, "query") } @@ -410,6 +411,12 @@ func (api *API) query(r *http.Request) (result apiFuncResult) { }, nil, res.Warnings, qry.Close} } +func extractQueryOpts(r *http.Request) *promql.QueryOpts { + return &promql.QueryOpts{ + EnablePerStepStats: r.FormValue("stats") == "all", + } +} + func (api *API) queryRange(r *http.Request) (result apiFuncResult) { start, err := parseTime(r.FormValue("start")) if err != nil { @@ -451,7 +458,8 @@ func (api *API) queryRange(r *http.Request) (result apiFuncResult) { defer cancel() } - qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, r.FormValue("query"), start, end, step) + opts := extractQueryOpts(r) + qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, opts, r.FormValue("query"), start, end, step) if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 5de0d60f89..6db0495929 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -522,6 +522,99 @@ func TestLabelNames(t *testing.T) { } } +func TestStats(t *testing.T) { + suite, err := promql.NewTest(t, ``) + require.NoError(t, err) + defer suite.Close() + require.NoError(t, suite.Run()) + + api := &API{ + Queryable: suite.Storage(), + QueryEngine: suite.QueryEngine(), + now: func() time.Time { + return time.Unix(123, 0) + }, + } + request := func(method string, param string) (*http.Request, error) { + u, err := url.Parse("http://example.com") + require.NoError(t, err) + q := u.Query() + q.Add("stats", param) + q.Add("query", "up") + q.Add("start", "0") + q.Add("end", "100") + q.Add("step", "10") + u.RawQuery = q.Encode() + + r, err := http.NewRequest(method, u.String(), nil) + if method == http.MethodPost { + r.Header.Set("Content-Type", "application/x-www-form-urlencoded") + } + return r, err + } + + for _, tc := range []struct { + name string + param string + expected func(*testing.T, interface{}) + }{ + { + name: "stats is blank", + param: "", + expected: func(t *testing.T, i interface{}) { + require.IsType(t, i, &queryData{}) + qd := i.(*queryData) + require.Nil(t, qd.Stats) + }, + }, + { + name: "stats is true", + param: "true", + expected: func(t *testing.T, i interface{}) { + require.IsType(t, i, &queryData{}) + qd := i.(*queryData) + require.NotNil(t, qd.Stats) + qs := qd.Stats + require.NotNil(t, qs.Timings) + require.Greater(t, qs.Timings.EvalTotalTime, float64(0)) + require.NotNil(t, qs.Samples) + require.NotNil(t, qs.Samples.TotalQueryableSamples) + require.Nil(t, qs.Samples.TotalQueryableSamplesPerStep) + }, + }, + { + name: "stats is all", + param: "all", + expected: func(t *testing.T, i interface{}) { + require.IsType(t, i, &queryData{}) + qd := i.(*queryData) + require.NotNil(t, qd.Stats) + qs := qd.Stats + require.NotNil(t, qs.Timings) + require.Greater(t, qs.Timings.EvalTotalTime, float64(0)) + require.NotNil(t, qs.Samples) + require.NotNil(t, qs.Samples.TotalQueryableSamples) + require.NotNil(t, qs.Samples.TotalQueryableSamplesPerStep) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + for _, method := range []string{http.MethodGet, http.MethodPost} { + ctx := context.Background() + req, err := request(method, tc.param) + require.NoError(t, err) + res := api.query(req.WithContext(ctx)) + assertAPIError(t, res.err, "") + tc.expected(t, res.data) + + res = api.queryRange(req.WithContext(ctx)) + assertAPIError(t, res.err, "") + tc.expected(t, res.data) + } + }) + } +} + func setupTestTargetRetriever(t *testing.T) *testTargetRetriever { t.Helper()