mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Track and report Samples Queried per query
We always track total samples queried and add those to the standard set of stats queries can report. We also allow optionally tracking per-step samples queried. This must be enabled both at the engine and query level to be tracked and rendered. The engine flag is exposed via a Prometheus feature flag, while the query flag is set when stats=all. Co-authored-by: Alan Protasio <approtas@amazon.com> Co-authored-by: Andrew Bloomgarden <blmgrdn@amazon.com> Co-authored-by: Harkishen Singh <harkishensingh@hotmail.com> Signed-off-by: Andrew Bloomgarden <blmgrdn@amazon.com>
This commit is contained in:
parent
cd739214dd
commit
606ef33d91
|
@ -148,6 +148,7 @@ type flagConfig struct {
|
|||
// for ease of use.
|
||||
enableExpandExternalLabels bool
|
||||
enableNewSDManager bool
|
||||
enablePerStepStats bool
|
||||
|
||||
prometheusURL string
|
||||
corsRegexString string
|
||||
|
@ -182,6 +183,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
|
|||
case "agent":
|
||||
agentMode = true
|
||||
level.Info(logger).Log("msg", "Experimental agent mode enabled.")
|
||||
case "promql-per-step-stats":
|
||||
c.enablePerStepStats = true
|
||||
level.Info(logger).Log("msg", "Experimental per-step statistics reporting")
|
||||
case "":
|
||||
continue
|
||||
case "promql-at-modifier", "promql-negative-offset":
|
||||
|
@ -378,7 +382,7 @@ func main() {
|
|||
serverOnlyFlag(a, "query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return.").
|
||||
Default("50000000").IntVar(&cfg.queryMaxSamples)
|
||||
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
||||
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
|
||||
Default("").StringsVar(&cfg.featureList)
|
||||
|
||||
promlogflag.AddFlags(a, &cfg.promlogConfig)
|
||||
|
@ -573,6 +577,7 @@ func main() {
|
|||
// always on for regular PromQL as of Prometheus v2.33.
|
||||
EnableAtModifier: true,
|
||||
EnableNegativeOffset: true,
|
||||
EnablePerStepStats: cfg.enablePerStepStats,
|
||||
}
|
||||
|
||||
queryEngine = promql.NewEngine(opts)
|
||||
|
|
|
@ -435,7 +435,7 @@ func (tg *testGroup) maxEvalTime() time.Duration {
|
|||
}
|
||||
|
||||
func query(ctx context.Context, qs string, t time.Time, engine *promql.Engine, qu storage.Queryable) (promql.Vector, error) {
|
||||
q, err := engine.NewInstantQuery(qu, qs, t)
|
||||
q, err := engine.NewInstantQuery(qu, nil, qs, t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -78,3 +78,13 @@ discovery, scrape and remote write.
|
|||
|
||||
This is useful when you do not need to query the Prometheus data locally, but
|
||||
only from a central [remote endpoint](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage).
|
||||
|
||||
## Per-step stats
|
||||
|
||||
`--enable-feature=promql-per-step-stats`
|
||||
|
||||
When enabled, passing `stats=all` in a query request returns per-step
|
||||
statistics. Currently this is limited to totalQueryableSamples.
|
||||
|
||||
When disabled in either the engine or the query, per-step statistics are not
|
||||
computed at all.
|
||||
|
|
|
@ -217,7 +217,7 @@ func BenchmarkRangeQuery(b *testing.B) {
|
|||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
qry, err := engine.NewRangeQuery(
|
||||
stor, c.expr,
|
||||
stor, nil, c.expr,
|
||||
time.Unix(int64((numIntervals-c.steps)*10), 0),
|
||||
time.Unix(int64(numIntervals*10), 0), time.Second*10)
|
||||
if err != nil {
|
||||
|
|
|
@ -118,13 +118,18 @@ type Query interface {
|
|||
// Statement returns the parsed statement of the query.
|
||||
Statement() parser.Statement
|
||||
// Stats returns statistics about the lifetime of the query.
|
||||
Stats() *stats.QueryTimers
|
||||
Stats() *stats.Statistics
|
||||
// Cancel signals that a running query execution should be aborted.
|
||||
Cancel()
|
||||
// String returns the original query string.
|
||||
String() string
|
||||
}
|
||||
|
||||
type QueryOpts struct {
|
||||
// Enables recording per-step statistics if the engine has it enabled as well. Disabled by default.
|
||||
EnablePerStepStats bool
|
||||
}
|
||||
|
||||
// query implements the Query interface.
|
||||
type query struct {
|
||||
// Underlying data provider.
|
||||
|
@ -135,6 +140,8 @@ type query struct {
|
|||
stmt parser.Statement
|
||||
// Timer stats for the query execution.
|
||||
stats *stats.QueryTimers
|
||||
// Sample stats for the query execution.
|
||||
sampleStats *stats.QuerySamples
|
||||
// Result matrix for reuse.
|
||||
matrix Matrix
|
||||
// Cancellation function for the query.
|
||||
|
@ -159,8 +166,11 @@ func (q *query) String() string {
|
|||
}
|
||||
|
||||
// Stats implements the Query interface.
|
||||
func (q *query) Stats() *stats.QueryTimers {
|
||||
return q.stats
|
||||
func (q *query) Stats() *stats.Statistics {
|
||||
return &stats.Statistics{
|
||||
Timers: q.stats,
|
||||
Samples: q.sampleStats,
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel implements the Query interface.
|
||||
|
@ -254,6 +264,9 @@ type EngineOpts struct {
|
|||
// is still provided here for those using the Engine outside of
|
||||
// Prometheus.
|
||||
EnableNegativeOffset bool
|
||||
|
||||
// EnablePerStepStats if true allows for per-step stats to be computed on request. Disabled otherwise.
|
||||
EnablePerStepStats bool
|
||||
}
|
||||
|
||||
// Engine handles the lifetime of queries from beginning to end.
|
||||
|
@ -270,6 +283,7 @@ type Engine struct {
|
|||
noStepSubqueryIntervalFn func(rangeMillis int64) int64
|
||||
enableAtModifier bool
|
||||
enableNegativeOffset bool
|
||||
enablePerStepStats bool
|
||||
}
|
||||
|
||||
// NewEngine returns a new engine.
|
||||
|
@ -352,6 +366,7 @@ func NewEngine(opts EngineOpts) *Engine {
|
|||
noStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn,
|
||||
enableAtModifier: opts.EnableAtModifier,
|
||||
enableNegativeOffset: opts.EnableNegativeOffset,
|
||||
enablePerStepStats: opts.EnablePerStepStats,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -379,12 +394,12 @@ func (ng *Engine) SetQueryLogger(l QueryLogger) {
|
|||
}
|
||||
|
||||
// NewInstantQuery returns an evaluation query for the given expression at the given time.
|
||||
func (ng *Engine) NewInstantQuery(q storage.Queryable, qs string, ts time.Time) (Query, error) {
|
||||
func (ng *Engine) NewInstantQuery(q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) {
|
||||
expr, err := parser.ParseExpr(qs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
qry, err := ng.newQuery(q, expr, ts, ts, 0)
|
||||
qry, err := ng.newQuery(q, opts, expr, ts, ts, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -395,7 +410,7 @@ func (ng *Engine) NewInstantQuery(q storage.Queryable, qs string, ts time.Time)
|
|||
|
||||
// NewRangeQuery returns an evaluation query for the given time range and with
|
||||
// the resolution set by the interval.
|
||||
func (ng *Engine) NewRangeQuery(q storage.Queryable, qs string, start, end time.Time, interval time.Duration) (Query, error) {
|
||||
func (ng *Engine) NewRangeQuery(q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) {
|
||||
expr, err := parser.ParseExpr(qs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -403,7 +418,7 @@ func (ng *Engine) NewRangeQuery(q storage.Queryable, qs string, start, end time.
|
|||
if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar {
|
||||
return nil, errors.Errorf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
|
||||
}
|
||||
qry, err := ng.newQuery(q, expr, start, end, interval)
|
||||
qry, err := ng.newQuery(q, opts, expr, start, end, interval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -412,11 +427,16 @@ func (ng *Engine) NewRangeQuery(q storage.Queryable, qs string, start, end time.
|
|||
return qry, nil
|
||||
}
|
||||
|
||||
func (ng *Engine) newQuery(q storage.Queryable, expr parser.Expr, start, end time.Time, interval time.Duration) (*query, error) {
|
||||
func (ng *Engine) newQuery(q storage.Queryable, opts *QueryOpts, expr parser.Expr, start, end time.Time, interval time.Duration) (*query, error) {
|
||||
if err := ng.validateOpts(expr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Default to empty QueryOpts if not provided.
|
||||
if opts == nil {
|
||||
opts = &QueryOpts{}
|
||||
}
|
||||
|
||||
es := &parser.EvalStmt{
|
||||
Expr: PreprocessExpr(expr, start, end),
|
||||
Start: start,
|
||||
|
@ -424,10 +444,11 @@ func (ng *Engine) newQuery(q storage.Queryable, expr parser.Expr, start, end tim
|
|||
Interval: interval,
|
||||
}
|
||||
qry := &query{
|
||||
stmt: es,
|
||||
ng: ng,
|
||||
stats: stats.NewQueryTimers(),
|
||||
queryable: q,
|
||||
stmt: es,
|
||||
ng: ng,
|
||||
stats: stats.NewQueryTimers(),
|
||||
sampleStats: stats.NewQuerySamples(ng.enablePerStepStats && opts.EnablePerStepStats),
|
||||
queryable: q,
|
||||
}
|
||||
return qry, nil
|
||||
}
|
||||
|
@ -490,10 +511,11 @@ func (ng *Engine) validateOpts(expr parser.Expr) error {
|
|||
|
||||
func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
|
||||
qry := &query{
|
||||
q: "test statement",
|
||||
stmt: parser.TestStmt(f),
|
||||
ng: ng,
|
||||
stats: stats.NewQueryTimers(),
|
||||
q: "test statement",
|
||||
stmt: parser.TestStmt(f),
|
||||
ng: ng,
|
||||
stats: stats.NewQueryTimers(),
|
||||
sampleStats: stats.NewQuerySamples(ng.enablePerStepStats),
|
||||
}
|
||||
return qry
|
||||
}
|
||||
|
@ -615,8 +637,10 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
|
|||
maxSamples: ng.maxSamplesPerQuery,
|
||||
logger: ng.logger,
|
||||
lookbackDelta: ng.lookbackDelta,
|
||||
samplesStats: query.sampleStats,
|
||||
noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn,
|
||||
}
|
||||
query.sampleStats.InitStepTracking(start, start, 1)
|
||||
|
||||
val, warnings, err := evaluator.Eval(s.Expr)
|
||||
if err != nil {
|
||||
|
@ -665,8 +689,10 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval
|
|||
maxSamples: ng.maxSamplesPerQuery,
|
||||
logger: ng.logger,
|
||||
lookbackDelta: ng.lookbackDelta,
|
||||
samplesStats: query.sampleStats,
|
||||
noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn,
|
||||
}
|
||||
query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval)
|
||||
val, warnings, err := evaluator.Eval(s.Expr)
|
||||
if err != nil {
|
||||
return nil, warnings, err
|
||||
|
@ -896,6 +922,7 @@ type evaluator struct {
|
|||
currentSamples int
|
||||
logger log.Logger
|
||||
lookbackDelta time.Duration
|
||||
samplesStats *stats.QuerySamples
|
||||
noStepSubqueryIntervalFn func(rangeMillis int64) int64
|
||||
}
|
||||
|
||||
|
@ -1154,7 +1181,11 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
|
|||
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
|
||||
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
|
||||
func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, storage.Warnings) {
|
||||
sampleStats := ev.samplesStats
|
||||
// Avoid double counting samples when running a subquery, those samples will be counted in later stage.
|
||||
ev.samplesStats = nil
|
||||
val, ws := ev.eval(subq)
|
||||
ev.samplesStats = sampleStats
|
||||
mat := val.(Matrix)
|
||||
vs := &parser.VectorSelector{
|
||||
OriginalOffset: subq.OriginalOffset,
|
||||
|
@ -1360,6 +1391,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
|
|||
enh.Ts = ts
|
||||
// Make the function call.
|
||||
outVec := call(inArgs, e.Args, enh)
|
||||
ev.samplesStats.IncrementSamplesAtStep(step, len(points))
|
||||
enh.Out = outVec[:0]
|
||||
if len(outVec) > 0 {
|
||||
ss.Points = append(ss.Points, Point{V: outVec[0].Point.V, T: ts})
|
||||
|
@ -1511,11 +1543,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
|
|||
Points: getPointSlice(numSteps),
|
||||
}
|
||||
|
||||
for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval {
|
||||
for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval {
|
||||
step++
|
||||
_, v, ok := ev.vectorSelectorSingle(it, e, ts)
|
||||
if ok {
|
||||
if ev.currentSamples < ev.maxSamples {
|
||||
ss.Points = append(ss.Points, Point{V: v, T: ts})
|
||||
ev.samplesStats.IncrementSamplesAtStep(step, 1)
|
||||
ev.currentSamples++
|
||||
} else {
|
||||
ev.error(ErrTooManySamples(env))
|
||||
|
@ -1547,6 +1581,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
|
|||
maxSamples: ev.maxSamples,
|
||||
logger: ev.logger,
|
||||
lookbackDelta: ev.lookbackDelta,
|
||||
samplesStats: ev.samplesStats.NewChild(),
|
||||
noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn,
|
||||
}
|
||||
|
||||
|
@ -1572,6 +1607,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
|
|||
|
||||
res, ws := newEv.eval(e.Expr)
|
||||
ev.currentSamples = newEv.currentSamples
|
||||
ev.samplesStats.IncrementSamplesAtTimestamp(ev.endTimestamp, newEv.samplesStats.TotalSamples)
|
||||
return res, ws
|
||||
case *parser.StepInvariantExpr:
|
||||
switch ce := e.Expr.(type) {
|
||||
|
@ -1588,10 +1624,15 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
|
|||
maxSamples: ev.maxSamples,
|
||||
logger: ev.logger,
|
||||
lookbackDelta: ev.lookbackDelta,
|
||||
samplesStats: ev.samplesStats.NewChild(),
|
||||
noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn,
|
||||
}
|
||||
res, ws := newEv.eval(e.Expr)
|
||||
ev.currentSamples = newEv.currentSamples
|
||||
for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts = ts + ev.interval {
|
||||
step++
|
||||
ev.samplesStats.IncrementSamplesAtStep(step, newEv.samplesStats.TotalSamples)
|
||||
}
|
||||
switch e.Expr.(type) {
|
||||
case *parser.MatrixSelector, *parser.SubqueryExpr:
|
||||
// We do not duplicate results for range selectors since result is a matrix
|
||||
|
@ -1646,6 +1687,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect
|
|||
})
|
||||
|
||||
ev.currentSamples++
|
||||
ev.samplesStats.IncrementSamplesAtTimestamp(ts, 1)
|
||||
if ev.currentSamples > ev.maxSamples {
|
||||
ev.error(ErrTooManySamples(env))
|
||||
}
|
||||
|
@ -1727,6 +1769,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, storag
|
|||
}
|
||||
|
||||
ss.Points = ev.matrixIterSlice(it, mint, maxt, getPointSlice(16))
|
||||
ev.samplesStats.IncrementSamplesAtTimestamp(ev.startTimestamp, len(ss.Points))
|
||||
|
||||
if len(ss.Points) > 0 {
|
||||
matrix = append(matrix, ss)
|
||||
|
|
|
@ -23,6 +23,9 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
|
||||
"github.com/prometheus/prometheus/util/stats"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/goleak"
|
||||
|
||||
|
@ -225,14 +228,14 @@ func TestQueryError(t *testing.T) {
|
|||
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||
defer cancelCtx()
|
||||
|
||||
vectorQuery, err := engine.NewInstantQuery(queryable, "foo", time.Unix(1, 0))
|
||||
vectorQuery, err := engine.NewInstantQuery(queryable, nil, "foo", time.Unix(1, 0))
|
||||
require.NoError(t, err)
|
||||
|
||||
res := vectorQuery.Exec(ctx)
|
||||
require.Error(t, res.Err, "expected error on failed select but got none")
|
||||
require.True(t, errors.Is(res.Err, errStorage), "expected error doesn't match")
|
||||
|
||||
matrixQuery, err := engine.NewInstantQuery(queryable, "foo[1m]", time.Unix(1, 0))
|
||||
matrixQuery, err := engine.NewInstantQuery(queryable, nil, "foo[1m]", time.Unix(1, 0))
|
||||
require.NoError(t, err)
|
||||
|
||||
res = matrixQuery.Exec(ctx)
|
||||
|
@ -559,9 +562,9 @@ func TestSelectHintsSetCorrectly(t *testing.T) {
|
|||
err error
|
||||
)
|
||||
if tc.end == 0 {
|
||||
query, err = engine.NewInstantQuery(hintsRecorder, tc.query, timestamp.Time(tc.start))
|
||||
query, err = engine.NewInstantQuery(hintsRecorder, nil, tc.query, timestamp.Time(tc.start))
|
||||
} else {
|
||||
query, err = engine.NewRangeQuery(hintsRecorder, tc.query, timestamp.Time(tc.start), timestamp.Time(tc.end), time.Second)
|
||||
query, err = engine.NewRangeQuery(hintsRecorder, nil, tc.query, timestamp.Time(tc.start), timestamp.Time(tc.end), time.Second)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -719,9 +722,9 @@ load 10s
|
|||
var err error
|
||||
var qry Query
|
||||
if c.Interval == 0 {
|
||||
qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), c.Query, c.Start)
|
||||
qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), nil, c.Query, c.Start)
|
||||
} else {
|
||||
qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval)
|
||||
qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -736,6 +739,438 @@ load 10s
|
|||
}
|
||||
}
|
||||
|
||||
func TestQueryStatistics(t *testing.T) {
|
||||
test, err := NewTest(t, `
|
||||
load 10s
|
||||
metricWith1SampleEvery10Seconds 1+1x100
|
||||
metricWith3SampleEvery10Seconds{a="1",b="1"} 1+1x100
|
||||
metricWith3SampleEvery10Seconds{a="2",b="2"} 1+1x100
|
||||
metricWith3SampleEvery10Seconds{a="3",b="2"} 1+1x100
|
||||
`)
|
||||
|
||||
require.NoError(t, err)
|
||||
defer test.Close()
|
||||
|
||||
err = test.Run()
|
||||
require.NoError(t, err)
|
||||
|
||||
cases := []struct {
|
||||
Query string
|
||||
TotalSamples int
|
||||
TotalSamplesPerStep stats.TotalSamplesPerStep
|
||||
Start time.Time
|
||||
End time.Time
|
||||
Interval time.Duration
|
||||
}{
|
||||
{
|
||||
Query: `"literal string"`,
|
||||
Start: time.Unix(21, 0),
|
||||
TotalSamples: 0,
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
21000: 0,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "1",
|
||||
Start: time.Unix(21, 0),
|
||||
TotalSamples: 0,
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
21000: 0,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "metricWith1SampleEvery10Seconds",
|
||||
Start: time.Unix(21, 0),
|
||||
TotalSamples: 1, // 1 sample / 10 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
21000: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
// timestamp function has a special handling.
|
||||
Query: "timestamp(metricWith1SampleEvery10Seconds)",
|
||||
Start: time.Unix(21, 0),
|
||||
TotalSamples: 1, // 1 sample / 10 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
21000: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "metricWith1SampleEvery10Seconds",
|
||||
Start: time.Unix(22, 0),
|
||||
TotalSamples: 1, // 1 sample / 10 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
22000: 1, // Aligned to the step time, not the sample time.
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "metricWith1SampleEvery10Seconds offset 10s",
|
||||
Start: time.Unix(21, 0),
|
||||
TotalSamples: 1, // 1 sample / 10 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
21000: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "metricWith1SampleEvery10Seconds @ 15",
|
||||
Start: time.Unix(21, 0),
|
||||
TotalSamples: 1, // 1 sample / 10 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
21000: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: `metricWith3SampleEvery10Seconds{a="1"}`,
|
||||
Start: time.Unix(21, 0),
|
||||
TotalSamples: 1, // 1 sample / 10 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
21000: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: `metricWith3SampleEvery10Seconds{a="1"} @ 19`,
|
||||
Start: time.Unix(21, 0),
|
||||
TotalSamples: 1, // 1 sample / 10 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
21000: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: `metricWith3SampleEvery10Seconds{a="1"}[20s] @ 19`,
|
||||
Start: time.Unix(21, 0),
|
||||
TotalSamples: 2, // (1 sample / 10 seconds) * 20s
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
21000: 2,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "metricWith3SampleEvery10Seconds",
|
||||
Start: time.Unix(21, 0),
|
||||
TotalSamples: 3, // 3 samples / 10 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
21000: 3,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "metricWith1SampleEvery10Seconds[60s]",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 6,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "max_over_time(metricWith1SampleEvery10Seconds[59s])[20s:5s]",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 24, // (1 sample / 10 seconds * 60 seconds) * 60/5 (using 59s so we always return 6 samples
|
||||
// as if we run a query on 00 looking back 60 seconds we will return 7 samples;
|
||||
// see next test).
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 24,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "max_over_time(metricWith1SampleEvery10Seconds[60s])[20s:5s]",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 26, // (1 sample / 10 seconds * 60 seconds) + 2 as
|
||||
// max_over_time(metricWith1SampleEvery10Seconds[60s]) @ 190 and 200 will return 7 samples.
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 26,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "metricWith1SampleEvery10Seconds[60s] @ 30",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 4, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 1 series
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 4,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 12, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 12,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "sum by (b) (max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 12, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 12,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "metricWith1SampleEvery10Seconds[60s] offset 10s",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 6,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "metricWith3SampleEvery10Seconds[60s]",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 18, // 3 sample / 10 seconds * 60 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 18,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "max_over_time(metricWith1SampleEvery10Seconds[60s])",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 6,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "absent_over_time(metricWith1SampleEvery10Seconds[60s])",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 6,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "max_over_time(metricWith3SampleEvery10Seconds[60s])",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 18, // 3 sample / 10 seconds * 60 seconds
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 18,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "metricWith1SampleEvery10Seconds[60s:5s]",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 12, // 1 sample per query * 12 queries (60/5)
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 12,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "metricWith1SampleEvery10Seconds[60s:5s] offset 10s",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 12, // 1 sample per query * 12 queries (60/5)
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 12,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "max_over_time(metricWith3SampleEvery10Seconds[60s:5s])",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 36, // 3 sample per query * 12 queries (60/5)
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 36,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))",
|
||||
Start: time.Unix(201, 0),
|
||||
TotalSamples: 72, // 2 * (3 sample per query * 12 queries (60/5))
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 72,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: `metricWith3SampleEvery10Seconds{a="1"}`,
|
||||
Start: time.Unix(201, 0),
|
||||
End: time.Unix(220, 0),
|
||||
Interval: 5 * time.Second,
|
||||
TotalSamples: 4, // 1 sample per query * 4 steps
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 1,
|
||||
206000: 1,
|
||||
211000: 1,
|
||||
216000: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: `metricWith3SampleEvery10Seconds{a="1"}`,
|
||||
Start: time.Unix(204, 0),
|
||||
End: time.Unix(223, 0),
|
||||
Interval: 5 * time.Second,
|
||||
TotalSamples: 4, // 1 sample per query * 4 steps
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
204000: 1, // aligned to the step time, not the sample time
|
||||
209000: 1,
|
||||
214000: 1,
|
||||
219000: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
// timestamp function as a special handling
|
||||
Query: "timestamp(metricWith1SampleEvery10Seconds)",
|
||||
Start: time.Unix(201, 0),
|
||||
End: time.Unix(220, 0),
|
||||
Interval: 5 * time.Second,
|
||||
TotalSamples: 4, // (1 sample / 10 seconds) * 4 steps
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 1,
|
||||
206000: 1,
|
||||
211000: 1,
|
||||
216000: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: `max_over_time(metricWith3SampleEvery10Seconds{a="1"}[10s])`,
|
||||
Start: time.Unix(991, 0),
|
||||
End: time.Unix(1021, 0),
|
||||
Interval: 10 * time.Second,
|
||||
TotalSamples: 2, // 1 sample per query * 2 steps with data
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
991000: 1,
|
||||
1001000: 1,
|
||||
1011000: 0,
|
||||
1021000: 0,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: `metricWith3SampleEvery10Seconds{a="1"} offset 10s`,
|
||||
Start: time.Unix(201, 0),
|
||||
End: time.Unix(220, 0),
|
||||
Interval: 5 * time.Second,
|
||||
TotalSamples: 4, // 1 sample per query * 4 steps
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 1,
|
||||
206000: 1,
|
||||
211000: 1,
|
||||
216000: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30)",
|
||||
Start: time.Unix(201, 0),
|
||||
End: time.Unix(220, 0),
|
||||
Interval: 5 * time.Second,
|
||||
TotalSamples: 48, // @ modifier force the evaluation timestamp at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series * 4 steps
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 12,
|
||||
206000: 12,
|
||||
211000: 12,
|
||||
216000: 12,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: `metricWith3SampleEvery10Seconds`,
|
||||
Start: time.Unix(201, 0),
|
||||
End: time.Unix(220, 0),
|
||||
Interval: 5 * time.Second,
|
||||
TotalSamples: 12, // 3 sample per query * 4 steps
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 3,
|
||||
206000: 3,
|
||||
211000: 3,
|
||||
216000: 3,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: `max_over_time(metricWith3SampleEvery10Seconds[60s])`,
|
||||
Start: time.Unix(201, 0),
|
||||
End: time.Unix(220, 0),
|
||||
Interval: 5 * time.Second,
|
||||
TotalSamples: 72, // (3 sample / 10 seconds * 60 seconds) * 4 steps = 72
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 18,
|
||||
206000: 18,
|
||||
211000: 18,
|
||||
216000: 18,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "max_over_time(metricWith3SampleEvery10Seconds[60s:5s])",
|
||||
Start: time.Unix(201, 0),
|
||||
End: time.Unix(220, 0),
|
||||
Interval: 5 * time.Second,
|
||||
TotalSamples: 144, // 3 sample per query * 12 queries (60/5) * 4 steps
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 36,
|
||||
206000: 36,
|
||||
211000: 36,
|
||||
216000: 36,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "max_over_time(metricWith1SampleEvery10Seconds[60s:5s])",
|
||||
Start: time.Unix(201, 0),
|
||||
End: time.Unix(220, 0),
|
||||
Interval: 5 * time.Second,
|
||||
TotalSamples: 48, // 1 sample per query * 12 queries (60/5) * 4 steps
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 12,
|
||||
206000: 12,
|
||||
211000: 12,
|
||||
216000: 12,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "sum by (b) (max_over_time(metricWith1SampleEvery10Seconds[60s:5s]))",
|
||||
Start: time.Unix(201, 0),
|
||||
End: time.Unix(220, 0),
|
||||
Interval: 5 * time.Second,
|
||||
TotalSamples: 48, // 1 sample per query * 12 queries (60/5) * 4 steps
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 12,
|
||||
206000: 12,
|
||||
211000: 12,
|
||||
216000: 12,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))",
|
||||
Start: time.Unix(201, 0),
|
||||
End: time.Unix(220, 0),
|
||||
Interval: 5 * time.Second,
|
||||
TotalSamples: 288, // 2 * (3 sample per query * 12 queries (60/5) * 4 steps)
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 72,
|
||||
206000: 72,
|
||||
211000: 72,
|
||||
216000: 72,
|
||||
},
|
||||
},
|
||||
{
|
||||
Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith1SampleEvery10Seconds[60s:5s]))",
|
||||
Start: time.Unix(201, 0),
|
||||
End: time.Unix(220, 0),
|
||||
Interval: 5 * time.Second,
|
||||
TotalSamples: 192, // (1 sample per query * 12 queries (60/5) + 3 sample per query * 12 queries (60/5)) * 4 steps
|
||||
TotalSamplesPerStep: stats.TotalSamplesPerStep{
|
||||
201000: 48,
|
||||
206000: 48,
|
||||
211000: 48,
|
||||
216000: 48,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
engine := test.QueryEngine()
|
||||
engine.enablePerStepStats = true
|
||||
for _, c := range cases {
|
||||
t.Run(c.Query, func(t *testing.T) {
|
||||
opts := &QueryOpts{EnablePerStepStats: true}
|
||||
|
||||
var err error
|
||||
var qry Query
|
||||
if c.Interval == 0 {
|
||||
qry, err = engine.NewInstantQuery(test.Queryable(), opts, c.Query, c.Start)
|
||||
} else {
|
||||
qry, err = engine.NewRangeQuery(test.Queryable(), opts, c.Query, c.Start, c.End, c.Interval)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
res := qry.Exec(test.Context())
|
||||
require.Nil(t, res.Err)
|
||||
|
||||
stats := qry.Stats()
|
||||
require.Equal(t, c.TotalSamples, stats.Samples.TotalSamples, "Total samples mismatch")
|
||||
require.Equal(t, &c.TotalSamplesPerStep, stats.Samples.TotalSamplesPerStepMap(), "Total samples per time mismatch")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMaxQuerySamples(t *testing.T) {
|
||||
test, err := NewTest(t, `
|
||||
load 10s
|
||||
|
@ -892,14 +1327,16 @@ load 10s
|
|||
var err error
|
||||
var qry Query
|
||||
if c.Interval == 0 {
|
||||
qry, err = engine.NewInstantQuery(test.Queryable(), c.Query, c.Start)
|
||||
qry, err = engine.NewInstantQuery(test.Queryable(), nil, c.Query, c.Start)
|
||||
} else {
|
||||
qry, err = engine.NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval)
|
||||
qry, err = engine.NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
res := qry.Exec(test.Context())
|
||||
stats := qry.Stats()
|
||||
require.Equal(t, expError, res.Err)
|
||||
require.NotNil(t, stats)
|
||||
}
|
||||
|
||||
// Within limit.
|
||||
|
@ -1128,9 +1565,9 @@ load 1ms
|
|||
var err error
|
||||
var qry Query
|
||||
if c.end == 0 {
|
||||
qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), c.query, start)
|
||||
qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), nil, c.query, start)
|
||||
} else {
|
||||
qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), c.query, start, end, interval)
|
||||
qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.query, start, end, interval)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -1451,7 +1888,7 @@ func TestSubquerySelector(t *testing.T) {
|
|||
engine := test.QueryEngine()
|
||||
for _, c := range tst.cases {
|
||||
t.Run(c.Query, func(t *testing.T) {
|
||||
qry, err := engine.NewInstantQuery(test.Queryable(), c.Query, c.Start)
|
||||
qry, err := engine.NewInstantQuery(test.Queryable(), nil, c.Query, c.Start)
|
||||
require.NoError(t, err)
|
||||
|
||||
res := qry.Exec(test.Context())
|
||||
|
@ -2458,8 +2895,8 @@ func TestEngineOptsValidation(t *testing.T) {
|
|||
|
||||
for _, c := range cases {
|
||||
eng := NewEngine(c.opts)
|
||||
_, err1 := eng.NewInstantQuery(nil, c.query, time.Unix(10, 0))
|
||||
_, err2 := eng.NewRangeQuery(nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second)
|
||||
_, err1 := eng.NewInstantQuery(nil, nil, c.query, time.Unix(10, 0))
|
||||
_, err2 := eng.NewRangeQuery(nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second)
|
||||
if c.fail {
|
||||
require.Equal(t, c.expError, err1)
|
||||
require.Equal(t, c.expError, err2)
|
||||
|
@ -2606,7 +3043,7 @@ func TestRangeQuery(t *testing.T) {
|
|||
err = test.Run()
|
||||
require.NoError(t, err)
|
||||
|
||||
qry, err := test.QueryEngine().NewRangeQuery(test.Queryable(), c.Query, c.Start, c.End, c.Interval)
|
||||
qry, err := test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval)
|
||||
require.NoError(t, err)
|
||||
|
||||
res := qry.Exec(test.Context())
|
||||
|
|
|
@ -55,7 +55,7 @@ func TestDeriv(t *testing.T) {
|
|||
|
||||
require.NoError(t, a.Commit())
|
||||
|
||||
query, err := engine.NewInstantQuery(storage, "deriv(foo[30m])", timestamp.Time(1493712846939))
|
||||
query, err := engine.NewInstantQuery(storage, nil, "deriv(foo[30m])", timestamp.Time(1493712846939))
|
||||
require.NoError(t, err)
|
||||
|
||||
result := query.Exec(context.Background())
|
||||
|
|
|
@ -533,7 +533,7 @@ func (t *Test) exec(tc testCommand) error {
|
|||
}
|
||||
queries = append([]atModifierTestCase{{expr: cmd.expr, evalTime: cmd.start}}, queries...)
|
||||
for _, iq := range queries {
|
||||
q, err := t.QueryEngine().NewInstantQuery(t.storage, iq.expr, iq.evalTime)
|
||||
q, err := t.QueryEngine().NewInstantQuery(t.storage, nil, iq.expr, iq.evalTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -555,7 +555,7 @@ func (t *Test) exec(tc testCommand) error {
|
|||
|
||||
// Check query returns same result in range mode,
|
||||
// by checking against the middle step.
|
||||
q, err = t.queryEngine.NewRangeQuery(t.storage, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute)
|
||||
q, err = t.queryEngine.NewRangeQuery(t.storage, nil, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -614,6 +614,7 @@ func (t *Test) clear() {
|
|||
NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(1 * time.Minute) },
|
||||
EnableAtModifier: true,
|
||||
EnableNegativeOffset: true,
|
||||
EnablePerStepStats: true,
|
||||
}
|
||||
|
||||
t.queryEngine = NewEngine(opts)
|
||||
|
|
|
@ -185,7 +185,7 @@ type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector,
|
|||
// It converts scalar into vector results.
|
||||
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {
|
||||
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
|
||||
q, err := engine.NewInstantQuery(q, qs, t)
|
||||
q, err := engine.NewInstantQuery(q, nil, qs, t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -15,6 +15,9 @@ package stats
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel"
|
||||
|
@ -75,6 +78,23 @@ func (s QueryTiming) SpanOperation() string {
|
|||
}
|
||||
}
|
||||
|
||||
// stepStat represents a single statistic for a given step timestamp.
|
||||
type stepStat struct {
|
||||
T int64
|
||||
V float64
|
||||
}
|
||||
|
||||
func (s stepStat) String() string {
|
||||
v := strconv.FormatFloat(s.V, 'f', -1, 64)
|
||||
return fmt.Sprintf("%v @[%v]", v, s.T)
|
||||
}
|
||||
|
||||
// MarshalJSON implements json.Marshaler.
|
||||
func (s stepStat) MarshalJSON() ([]byte, error) {
|
||||
v := strconv.FormatFloat(s.V, 'f', -1, 64)
|
||||
return json.Marshal([...]interface{}{float64(s.T) / 1000, v})
|
||||
}
|
||||
|
||||
// queryTimings with all query timers mapped to durations.
|
||||
type queryTimings struct {
|
||||
EvalTotalTime float64 `json:"evalTotalTime"`
|
||||
|
@ -85,15 +105,26 @@ type queryTimings struct {
|
|||
ExecTotalTime float64 `json:"execTotalTime"`
|
||||
}
|
||||
|
||||
type querySamples struct {
|
||||
TotalQueryableSamplesPerStep []stepStat `json:"totalQueryableSamplesPerStep,omitempty"`
|
||||
TotalQueryableSamples int `json:"totalQueryableSamples"`
|
||||
}
|
||||
|
||||
// QueryStats currently only holding query timings.
|
||||
type QueryStats struct {
|
||||
Timings queryTimings `json:"timings,omitempty"`
|
||||
Timings queryTimings `json:"timings,omitempty"`
|
||||
Samples *querySamples `json:"samples,omitempty"`
|
||||
}
|
||||
|
||||
// NewQueryStats makes a QueryStats struct with all QueryTimings found in the
|
||||
// given TimerGroup.
|
||||
func NewQueryStats(tg *QueryTimers) *QueryStats {
|
||||
var qt queryTimings
|
||||
func NewQueryStats(s *Statistics) *QueryStats {
|
||||
var (
|
||||
qt queryTimings
|
||||
samples *querySamples
|
||||
tg = s.Timers
|
||||
sp = s.Samples
|
||||
)
|
||||
|
||||
for s, timer := range tg.TimerGroup.timers {
|
||||
switch s {
|
||||
|
@ -112,10 +143,41 @@ func NewQueryStats(tg *QueryTimers) *QueryStats {
|
|||
}
|
||||
}
|
||||
|
||||
qs := QueryStats{Timings: qt}
|
||||
if sp != nil {
|
||||
samples = &querySamples{
|
||||
TotalQueryableSamples: sp.TotalSamples,
|
||||
}
|
||||
samples.TotalQueryableSamplesPerStep = sp.totalSamplesPerStepPoints()
|
||||
}
|
||||
|
||||
qs := QueryStats{Timings: qt, Samples: samples}
|
||||
return &qs
|
||||
}
|
||||
|
||||
func (qs *QuerySamples) TotalSamplesPerStepMap() *TotalSamplesPerStep {
|
||||
if !qs.EnablePerStepStats {
|
||||
return nil
|
||||
}
|
||||
|
||||
ts := TotalSamplesPerStep{}
|
||||
for _, s := range qs.totalSamplesPerStepPoints() {
|
||||
ts[s.T] = int(s.V)
|
||||
}
|
||||
return &ts
|
||||
}
|
||||
|
||||
func (qs *QuerySamples) totalSamplesPerStepPoints() []stepStat {
|
||||
if !qs.EnablePerStepStats {
|
||||
return nil
|
||||
}
|
||||
|
||||
ts := make([]stepStat, len(qs.TotalSamplesPerStep))
|
||||
for i, c := range qs.TotalSamplesPerStep {
|
||||
ts[i] = stepStat{T: qs.startTimestamp + int64(i)*qs.interval, V: float64(c)}
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
||||
// SpanTimer unifies tracing and timing, to reduce repetition.
|
||||
type SpanTimer struct {
|
||||
timer *Timer
|
||||
|
@ -145,14 +207,89 @@ func (s *SpanTimer) Finish() {
|
|||
}
|
||||
}
|
||||
|
||||
type Statistics struct {
|
||||
Timers *QueryTimers
|
||||
Samples *QuerySamples
|
||||
}
|
||||
|
||||
type QueryTimers struct {
|
||||
*TimerGroup
|
||||
}
|
||||
|
||||
type TotalSamplesPerStep map[int64]int
|
||||
|
||||
type QuerySamples struct {
|
||||
// TotalSamples represents the total number of samples scanned
|
||||
// while evaluating a query.
|
||||
TotalSamples int
|
||||
|
||||
// TotalSamplesPerStep represents the total number of samples scanned
|
||||
// per step while evaluating a query. Each step should be identical to the
|
||||
// TotalSamples when a step is run as an instant query, which means
|
||||
// we intentionally do not account for optimizations that happen inside the
|
||||
// range query engine that reduce the actual work that happens.
|
||||
TotalSamplesPerStep []int
|
||||
|
||||
EnablePerStepStats bool
|
||||
startTimestamp int64
|
||||
interval int64
|
||||
}
|
||||
|
||||
type Stats struct {
|
||||
TimerStats *QueryTimers
|
||||
SampleStats *QuerySamples
|
||||
}
|
||||
|
||||
func (qs *QuerySamples) InitStepTracking(start, end, interval int64) {
|
||||
if !qs.EnablePerStepStats {
|
||||
return
|
||||
}
|
||||
|
||||
numSteps := int((end-start)/interval) + 1
|
||||
qs.TotalSamplesPerStep = make([]int, numSteps)
|
||||
qs.startTimestamp = start
|
||||
qs.interval = interval
|
||||
}
|
||||
|
||||
// IncrementSamplesAtStep increments the total samples count. Use this if you know the step index.
|
||||
func (qs *QuerySamples) IncrementSamplesAtStep(i, samples int) {
|
||||
if qs == nil {
|
||||
return
|
||||
}
|
||||
qs.TotalSamples += samples
|
||||
|
||||
if qs.TotalSamplesPerStep != nil {
|
||||
qs.TotalSamplesPerStep[i] += samples
|
||||
}
|
||||
}
|
||||
|
||||
// IncrementSamplesAtTimestamp increments the total samples count. Use this if you only have the corresponding step
|
||||
// timestamp.
|
||||
func (qs *QuerySamples) IncrementSamplesAtTimestamp(t int64, samples int) {
|
||||
if qs == nil {
|
||||
return
|
||||
}
|
||||
qs.TotalSamples += samples
|
||||
|
||||
if qs.TotalSamplesPerStep != nil {
|
||||
i := int((t - qs.startTimestamp) / qs.interval)
|
||||
qs.TotalSamplesPerStep[i] += samples
|
||||
}
|
||||
}
|
||||
|
||||
func NewQueryTimers() *QueryTimers {
|
||||
return &QueryTimers{NewTimerGroup()}
|
||||
}
|
||||
|
||||
func NewQuerySamples(enablePerStepStats bool) *QuerySamples {
|
||||
qs := QuerySamples{EnablePerStepStats: enablePerStepStats}
|
||||
return &qs
|
||||
}
|
||||
|
||||
func (qs *QuerySamples) NewChild() *QuerySamples {
|
||||
return NewQuerySamples(false)
|
||||
}
|
||||
|
||||
func (qs *QueryTimers) GetSpanTimer(ctx context.Context, qt QueryTiming, observers ...prometheus.Observer) (*SpanTimer, context.Context) {
|
||||
return NewSpanTimer(ctx, qt.SpanOperation(), qs.TimerGroup.GetTimer(qt), observers...)
|
||||
}
|
||||
|
|
|
@ -41,30 +41,38 @@ func TestTimerGroupNewTimer(t *testing.T) {
|
|||
"Expected elapsed time to be greater than time slept.")
|
||||
}
|
||||
|
||||
func TestQueryStatsWithTimers(t *testing.T) {
|
||||
func TestQueryStatsWithTimersAndSamples(t *testing.T) {
|
||||
qt := NewQueryTimers()
|
||||
qs := NewQuerySamples(true)
|
||||
qs.InitStepTracking(20001000, 25001000, 1000000)
|
||||
timer := qt.GetTimer(ExecTotalTime)
|
||||
timer.Start()
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
timer.Stop()
|
||||
qs.IncrementSamplesAtTimestamp(20001000, 5)
|
||||
qs.IncrementSamplesAtTimestamp(25001000, 5)
|
||||
|
||||
qs := NewQueryStats(qt)
|
||||
actual, err := json.Marshal(qs)
|
||||
qstats := NewQueryStats(&Statistics{Timers: qt, Samples: qs})
|
||||
actual, err := json.Marshal(qstats)
|
||||
require.NoError(t, err, "unexpected error during serialization")
|
||||
// Timing value is one of multiple fields, unit is seconds (float).
|
||||
match, err := regexp.MatchString(`[,{]"execTotalTime":\d+\.\d+[,}]`, string(actual))
|
||||
require.NoError(t, err, "unexpected error while matching string")
|
||||
require.True(t, match, "Expected timings with one non-zero entry.")
|
||||
|
||||
require.Regexpf(t, `[,{]"totalQueryableSamples":10[,}]`, string(actual), "expected totalQueryableSamples")
|
||||
require.Regexpf(t, `[,{]"totalQueryableSamplesPerStep":\[\[20001,"5"\],\[21001,"0"\],\[22001,"0"\],\[23001,"0"\],\[24001,"0"\],\[25001,"5"\]\]`, string(actual), "expected totalQueryableSamplesPerStep")
|
||||
}
|
||||
|
||||
func TestQueryStatsWithSpanTimers(t *testing.T) {
|
||||
qt := NewQueryTimers()
|
||||
qs := NewQuerySamples(false)
|
||||
ctx := &testutil.MockContext{DoneCh: make(chan struct{})}
|
||||
qst, _ := qt.GetSpanTimer(ctx, ExecQueueTime, prometheus.NewSummary(prometheus.SummaryOpts{}))
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
qst.Finish()
|
||||
qs := NewQueryStats(qt)
|
||||
actual, err := json.Marshal(qs)
|
||||
qstats := NewQueryStats(&Statistics{Timers: qt, Samples: qs})
|
||||
actual, err := json.Marshal(qstats)
|
||||
require.NoError(t, err, "unexpected error during serialization")
|
||||
// Timing value is one of multiple fields, unit is seconds (float).
|
||||
match, err := regexp.MatchString(`[,{]"execQueueTime":\d+\.\d+[,}]`, string(actual))
|
||||
|
|
|
@ -157,8 +157,8 @@ type TSDBAdminStats interface {
|
|||
// QueryEngine defines the interface for the *promql.Engine, so it can be replaced, wrapped or mocked.
|
||||
type QueryEngine interface {
|
||||
SetQueryLogger(l promql.QueryLogger)
|
||||
NewInstantQuery(q storage.Queryable, qs string, ts time.Time) (promql.Query, error)
|
||||
NewRangeQuery(q storage.Queryable, qs string, start, end time.Time, interval time.Duration) (promql.Query, error)
|
||||
NewInstantQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error)
|
||||
NewRangeQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error)
|
||||
}
|
||||
|
||||
// API can register a set of endpoints in a router and handle
|
||||
|
@ -376,7 +376,8 @@ func (api *API) query(r *http.Request) (result apiFuncResult) {
|
|||
defer cancel()
|
||||
}
|
||||
|
||||
qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, r.FormValue("query"), ts)
|
||||
opts := extractQueryOpts(r)
|
||||
qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, opts, r.FormValue("query"), ts)
|
||||
if err != nil {
|
||||
return invalidParamError(err, "query")
|
||||
}
|
||||
|
@ -410,6 +411,12 @@ func (api *API) query(r *http.Request) (result apiFuncResult) {
|
|||
}, nil, res.Warnings, qry.Close}
|
||||
}
|
||||
|
||||
func extractQueryOpts(r *http.Request) *promql.QueryOpts {
|
||||
return &promql.QueryOpts{
|
||||
EnablePerStepStats: r.FormValue("stats") == "all",
|
||||
}
|
||||
}
|
||||
|
||||
func (api *API) queryRange(r *http.Request) (result apiFuncResult) {
|
||||
start, err := parseTime(r.FormValue("start"))
|
||||
if err != nil {
|
||||
|
@ -451,7 +458,8 @@ func (api *API) queryRange(r *http.Request) (result apiFuncResult) {
|
|||
defer cancel()
|
||||
}
|
||||
|
||||
qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, r.FormValue("query"), start, end, step)
|
||||
opts := extractQueryOpts(r)
|
||||
qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, opts, r.FormValue("query"), start, end, step)
|
||||
if err != nil {
|
||||
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
|
||||
}
|
||||
|
|
|
@ -522,6 +522,99 @@ func TestLabelNames(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStats(t *testing.T) {
|
||||
suite, err := promql.NewTest(t, ``)
|
||||
require.NoError(t, err)
|
||||
defer suite.Close()
|
||||
require.NoError(t, suite.Run())
|
||||
|
||||
api := &API{
|
||||
Queryable: suite.Storage(),
|
||||
QueryEngine: suite.QueryEngine(),
|
||||
now: func() time.Time {
|
||||
return time.Unix(123, 0)
|
||||
},
|
||||
}
|
||||
request := func(method string, param string) (*http.Request, error) {
|
||||
u, err := url.Parse("http://example.com")
|
||||
require.NoError(t, err)
|
||||
q := u.Query()
|
||||
q.Add("stats", param)
|
||||
q.Add("query", "up")
|
||||
q.Add("start", "0")
|
||||
q.Add("end", "100")
|
||||
q.Add("step", "10")
|
||||
u.RawQuery = q.Encode()
|
||||
|
||||
r, err := http.NewRequest(method, u.String(), nil)
|
||||
if method == http.MethodPost {
|
||||
r.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||
}
|
||||
return r, err
|
||||
}
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
param string
|
||||
expected func(*testing.T, interface{})
|
||||
}{
|
||||
{
|
||||
name: "stats is blank",
|
||||
param: "",
|
||||
expected: func(t *testing.T, i interface{}) {
|
||||
require.IsType(t, i, &queryData{})
|
||||
qd := i.(*queryData)
|
||||
require.Nil(t, qd.Stats)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "stats is true",
|
||||
param: "true",
|
||||
expected: func(t *testing.T, i interface{}) {
|
||||
require.IsType(t, i, &queryData{})
|
||||
qd := i.(*queryData)
|
||||
require.NotNil(t, qd.Stats)
|
||||
qs := qd.Stats
|
||||
require.NotNil(t, qs.Timings)
|
||||
require.Greater(t, qs.Timings.EvalTotalTime, float64(0))
|
||||
require.NotNil(t, qs.Samples)
|
||||
require.NotNil(t, qs.Samples.TotalQueryableSamples)
|
||||
require.Nil(t, qs.Samples.TotalQueryableSamplesPerStep)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "stats is all",
|
||||
param: "all",
|
||||
expected: func(t *testing.T, i interface{}) {
|
||||
require.IsType(t, i, &queryData{})
|
||||
qd := i.(*queryData)
|
||||
require.NotNil(t, qd.Stats)
|
||||
qs := qd.Stats
|
||||
require.NotNil(t, qs.Timings)
|
||||
require.Greater(t, qs.Timings.EvalTotalTime, float64(0))
|
||||
require.NotNil(t, qs.Samples)
|
||||
require.NotNil(t, qs.Samples.TotalQueryableSamples)
|
||||
require.NotNil(t, qs.Samples.TotalQueryableSamplesPerStep)
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
for _, method := range []string{http.MethodGet, http.MethodPost} {
|
||||
ctx := context.Background()
|
||||
req, err := request(method, tc.param)
|
||||
require.NoError(t, err)
|
||||
res := api.query(req.WithContext(ctx))
|
||||
assertAPIError(t, res.err, "")
|
||||
tc.expected(t, res.data)
|
||||
|
||||
res = api.queryRange(req.WithContext(ctx))
|
||||
assertAPIError(t, res.err, "")
|
||||
tc.expected(t, res.data)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func setupTestTargetRetriever(t *testing.T) *testTargetRetriever {
|
||||
t.Helper()
|
||||
|
||||
|
|
Loading…
Reference in a new issue