From a64b9fe323cf913c212c1829fe93d1a9dbc63348 Mon Sep 17 00:00:00 2001 From: Andrew Bloomgarden Date: Tue, 22 Feb 2022 15:30:39 -0500 Subject: [PATCH] Report PeakSamples in query statistics This exactly corresponds to the statistic compared against MaxSamples during the course of query execution, so users can see how close their queries are to a limit. Co-authored-by: Harkishen Singh Co-authored-by: Andrew Bloomgarden Signed-off-by: Andrew Bloomgarden --- promql/engine.go | 21 ++++++++-- promql/engine_test.go | 80 +++++++++++++++++++++++++++++++++------ util/stats/query_stats.go | 30 +++++++++++++++ 3 files changed, 117 insertions(+), 14 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 22ce7649b0..63671f0d9f 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1115,6 +1115,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) } } args[i] = vectors[i] + ev.samplesStats.UpdatePeak(ev.currentSamples) } // Make the function call. @@ -1130,10 +1131,12 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) // When we reset currentSamples to tempNumSamples during the next iteration of the loop it also // needs to include the samples from the result here, as they're still in memory. tempNumSamples += len(result) + ev.samplesStats.UpdatePeak(ev.currentSamples) if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } + ev.samplesStats.UpdatePeak(ev.currentSamples) // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { @@ -1143,6 +1146,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) mat[i] = Series{Metric: s.Metric, Points: []Point{s.Point}} } ev.currentSamples = originalNumSamples + mat.TotalSamples() + ev.samplesStats.UpdatePeak(ev.currentSamples) return mat, warnings } @@ -1175,17 +1179,20 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) mat = append(mat, ss) } ev.currentSamples = originalNumSamples + mat.TotalSamples() + ev.samplesStats.UpdatePeak(ev.currentSamples) return mat, warnings } // evalSubquery evaluates given SubqueryExpr and returns an equivalent // evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set. func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, storage.Warnings) { - sampleStats := ev.samplesStats + samplesStats := ev.samplesStats // Avoid double counting samples when running a subquery, those samples will be counted in later stage. - ev.samplesStats = nil + ev.samplesStats = ev.samplesStats.NewChild() val, ws := ev.eval(subq) - ev.samplesStats = sampleStats + // But do incorporate the peak from the subquery + samplesStats.UpdatePeakFromSubquery(ev.samplesStats) + ev.samplesStats = samplesStats mat := val.(Matrix) vs := &parser.VectorSelector{ OriginalOffset: subq.OriginalOffset, @@ -1409,7 +1416,9 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { } else { putPointSlice(ss.Points) } + ev.samplesStats.UpdatePeak(ev.currentSamples) } + ev.samplesStats.UpdatePeak(ev.currentSamples) ev.currentSamples -= len(points) putPointSlice(points) @@ -1563,6 +1572,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { putPointSlice(ss.Points) } } + ev.samplesStats.UpdatePeak(ev.currentSamples) return mat, ws case *parser.MatrixSelector: @@ -1607,6 +1617,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { res, ws := newEv.eval(e.Expr) ev.currentSamples = newEv.currentSamples + ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats) ev.samplesStats.IncrementSamplesAtTimestamp(ev.endTimestamp, newEv.samplesStats.TotalSamples) return res, ws case *parser.StepInvariantExpr: @@ -1629,6 +1640,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { } res, ws := newEv.eval(e.Expr) ev.currentSamples = newEv.currentSamples + ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats) for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts = ts + ev.interval { step++ ev.samplesStats.IncrementSamplesAtStep(step, newEv.samplesStats.TotalSamples) @@ -1662,6 +1674,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { } } } + ev.samplesStats.UpdatePeak(ev.currentSamples) return res, ws } @@ -1694,6 +1707,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect } } + ev.samplesStats.UpdatePeak(ev.currentSamples) return vec, ws } @@ -1841,6 +1855,7 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m ev.currentSamples++ } } + ev.samplesStats.UpdatePeak(ev.currentSamples) return out } diff --git a/promql/engine_test.go b/promql/engine_test.go index 35a123f264..7a5f495548 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -756,14 +756,17 @@ load 10s cases := []struct { Query string + SkipMaxCheck bool TotalSamples int TotalSamplesPerStep stats.TotalSamplesPerStep + PeakSamples int Start time.Time End time.Time Interval time.Duration }{ { Query: `"literal string"`, + SkipMaxCheck: true, // This can't fail from a max samples limit. Start: time.Unix(21, 0), TotalSamples: 0, TotalSamplesPerStep: stats.TotalSamplesPerStep{ @@ -774,6 +777,7 @@ load 10s Query: "1", Start: time.Unix(21, 0), TotalSamples: 0, + PeakSamples: 1, TotalSamplesPerStep: stats.TotalSamplesPerStep{ 21000: 0, }, @@ -781,6 +785,7 @@ load 10s { Query: "metricWith1SampleEvery10Seconds", Start: time.Unix(21, 0), + PeakSamples: 1, TotalSamples: 1, // 1 sample / 10 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 21000: 1, @@ -790,6 +795,7 @@ load 10s // timestamp function has a special handling. Query: "timestamp(metricWith1SampleEvery10Seconds)", Start: time.Unix(21, 0), + PeakSamples: 2, TotalSamples: 1, // 1 sample / 10 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 21000: 1, @@ -798,6 +804,7 @@ load 10s { Query: "metricWith1SampleEvery10Seconds", Start: time.Unix(22, 0), + PeakSamples: 1, TotalSamples: 1, // 1 sample / 10 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 22000: 1, // Aligned to the step time, not the sample time. @@ -806,6 +813,7 @@ load 10s { Query: "metricWith1SampleEvery10Seconds offset 10s", Start: time.Unix(21, 0), + PeakSamples: 1, TotalSamples: 1, // 1 sample / 10 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 21000: 1, @@ -814,6 +822,7 @@ load 10s { Query: "metricWith1SampleEvery10Seconds @ 15", Start: time.Unix(21, 0), + PeakSamples: 1, TotalSamples: 1, // 1 sample / 10 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 21000: 1, @@ -822,6 +831,7 @@ load 10s { Query: `metricWith3SampleEvery10Seconds{a="1"}`, Start: time.Unix(21, 0), + PeakSamples: 1, TotalSamples: 1, // 1 sample / 10 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 21000: 1, @@ -830,6 +840,7 @@ load 10s { Query: `metricWith3SampleEvery10Seconds{a="1"} @ 19`, Start: time.Unix(21, 0), + PeakSamples: 1, TotalSamples: 1, // 1 sample / 10 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 21000: 1, @@ -838,6 +849,7 @@ load 10s { Query: `metricWith3SampleEvery10Seconds{a="1"}[20s] @ 19`, Start: time.Unix(21, 0), + PeakSamples: 2, TotalSamples: 2, // (1 sample / 10 seconds) * 20s TotalSamplesPerStep: stats.TotalSamplesPerStep{ 21000: 2, @@ -846,6 +858,7 @@ load 10s { Query: "metricWith3SampleEvery10Seconds", Start: time.Unix(21, 0), + PeakSamples: 3, TotalSamples: 3, // 3 samples / 10 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 21000: 3, @@ -854,6 +867,7 @@ load 10s { Query: "metricWith1SampleEvery10Seconds[60s]", Start: time.Unix(201, 0), + PeakSamples: 6, TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 6, @@ -862,6 +876,7 @@ load 10s { Query: "max_over_time(metricWith1SampleEvery10Seconds[59s])[20s:5s]", Start: time.Unix(201, 0), + PeakSamples: 10, TotalSamples: 24, // (1 sample / 10 seconds * 60 seconds) * 60/5 (using 59s so we always return 6 samples // as if we run a query on 00 looking back 60 seconds we will return 7 samples; // see next test). @@ -872,6 +887,7 @@ load 10s { Query: "max_over_time(metricWith1SampleEvery10Seconds[60s])[20s:5s]", Start: time.Unix(201, 0), + PeakSamples: 11, TotalSamples: 26, // (1 sample / 10 seconds * 60 seconds) + 2 as // max_over_time(metricWith1SampleEvery10Seconds[60s]) @ 190 and 200 will return 7 samples. TotalSamplesPerStep: stats.TotalSamplesPerStep{ @@ -881,6 +897,7 @@ load 10s { Query: "metricWith1SampleEvery10Seconds[60s] @ 30", Start: time.Unix(201, 0), + PeakSamples: 4, TotalSamples: 4, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 1 series TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 4, @@ -889,6 +906,7 @@ load 10s { Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))", Start: time.Unix(201, 0), + PeakSamples: 7, TotalSamples: 12, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 12, @@ -897,6 +915,7 @@ load 10s { Query: "sum by (b) (max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))", Start: time.Unix(201, 0), + PeakSamples: 8, TotalSamples: 12, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 12, @@ -905,6 +924,7 @@ load 10s { Query: "metricWith1SampleEvery10Seconds[60s] offset 10s", Start: time.Unix(201, 0), + PeakSamples: 6, TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 6, @@ -913,6 +933,7 @@ load 10s { Query: "metricWith3SampleEvery10Seconds[60s]", Start: time.Unix(201, 0), + PeakSamples: 18, TotalSamples: 18, // 3 sample / 10 seconds * 60 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 18, @@ -921,6 +942,7 @@ load 10s { Query: "max_over_time(metricWith1SampleEvery10Seconds[60s])", Start: time.Unix(201, 0), + PeakSamples: 7, TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 6, @@ -929,6 +951,7 @@ load 10s { Query: "absent_over_time(metricWith1SampleEvery10Seconds[60s])", Start: time.Unix(201, 0), + PeakSamples: 7, TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 6, @@ -937,6 +960,7 @@ load 10s { Query: "max_over_time(metricWith3SampleEvery10Seconds[60s])", Start: time.Unix(201, 0), + PeakSamples: 9, TotalSamples: 18, // 3 sample / 10 seconds * 60 seconds TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 18, @@ -945,6 +969,7 @@ load 10s { Query: "metricWith1SampleEvery10Seconds[60s:5s]", Start: time.Unix(201, 0), + PeakSamples: 12, TotalSamples: 12, // 1 sample per query * 12 queries (60/5) TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 12, @@ -953,6 +978,7 @@ load 10s { Query: "metricWith1SampleEvery10Seconds[60s:5s] offset 10s", Start: time.Unix(201, 0), + PeakSamples: 12, TotalSamples: 12, // 1 sample per query * 12 queries (60/5) TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 12, @@ -961,6 +987,7 @@ load 10s { Query: "max_over_time(metricWith3SampleEvery10Seconds[60s:5s])", Start: time.Unix(201, 0), + PeakSamples: 51, TotalSamples: 36, // 3 sample per query * 12 queries (60/5) TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 36, @@ -969,6 +996,7 @@ load 10s { Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))", Start: time.Unix(201, 0), + PeakSamples: 52, TotalSamples: 72, // 2 * (3 sample per query * 12 queries (60/5)) TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 72, @@ -979,6 +1007,7 @@ load 10s Start: time.Unix(201, 0), End: time.Unix(220, 0), Interval: 5 * time.Second, + PeakSamples: 4, TotalSamples: 4, // 1 sample per query * 4 steps TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 1, @@ -992,6 +1021,7 @@ load 10s Start: time.Unix(204, 0), End: time.Unix(223, 0), Interval: 5 * time.Second, + PeakSamples: 4, TotalSamples: 4, // 1 sample per query * 4 steps TotalSamplesPerStep: stats.TotalSamplesPerStep{ 204000: 1, // aligned to the step time, not the sample time @@ -1006,6 +1036,7 @@ load 10s Start: time.Unix(201, 0), End: time.Unix(220, 0), Interval: 5 * time.Second, + PeakSamples: 5, TotalSamples: 4, // (1 sample / 10 seconds) * 4 steps TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 1, @@ -1019,6 +1050,7 @@ load 10s Start: time.Unix(991, 0), End: time.Unix(1021, 0), Interval: 10 * time.Second, + PeakSamples: 2, TotalSamples: 2, // 1 sample per query * 2 steps with data TotalSamplesPerStep: stats.TotalSamplesPerStep{ 991000: 1, @@ -1032,6 +1064,7 @@ load 10s Start: time.Unix(201, 0), End: time.Unix(220, 0), Interval: 5 * time.Second, + PeakSamples: 4, TotalSamples: 4, // 1 sample per query * 4 steps TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 1, @@ -1045,6 +1078,7 @@ load 10s Start: time.Unix(201, 0), End: time.Unix(220, 0), Interval: 5 * time.Second, + PeakSamples: 12, TotalSamples: 48, // @ modifier force the evaluation timestamp at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series * 4 steps TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 12, @@ -1057,6 +1091,7 @@ load 10s Query: `metricWith3SampleEvery10Seconds`, Start: time.Unix(201, 0), End: time.Unix(220, 0), + PeakSamples: 12, Interval: 5 * time.Second, TotalSamples: 12, // 3 sample per query * 4 steps TotalSamplesPerStep: stats.TotalSamplesPerStep{ @@ -1071,6 +1106,7 @@ load 10s Start: time.Unix(201, 0), End: time.Unix(220, 0), Interval: 5 * time.Second, + PeakSamples: 18, TotalSamples: 72, // (3 sample / 10 seconds * 60 seconds) * 4 steps = 72 TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 18, @@ -1084,6 +1120,7 @@ load 10s Start: time.Unix(201, 0), End: time.Unix(220, 0), Interval: 5 * time.Second, + PeakSamples: 72, TotalSamples: 144, // 3 sample per query * 12 queries (60/5) * 4 steps TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 36, @@ -1097,6 +1134,7 @@ load 10s Start: time.Unix(201, 0), End: time.Unix(220, 0), Interval: 5 * time.Second, + PeakSamples: 32, TotalSamples: 48, // 1 sample per query * 12 queries (60/5) * 4 steps TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 12, @@ -1110,6 +1148,7 @@ load 10s Start: time.Unix(201, 0), End: time.Unix(220, 0), Interval: 5 * time.Second, + PeakSamples: 32, TotalSamples: 48, // 1 sample per query * 12 queries (60/5) * 4 steps TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 12, @@ -1123,6 +1162,7 @@ load 10s Start: time.Unix(201, 0), End: time.Unix(220, 0), Interval: 5 * time.Second, + PeakSamples: 76, TotalSamples: 288, // 2 * (3 sample per query * 12 queries (60/5) * 4 steps) TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 72, @@ -1136,6 +1176,7 @@ load 10s Start: time.Unix(201, 0), End: time.Unix(220, 0), Interval: 5 * time.Second, + PeakSamples: 72, TotalSamples: 192, // (1 sample per query * 12 queries (60/5) + 3 sample per query * 12 queries (60/5)) * 4 steps TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 48, @@ -1148,25 +1189,39 @@ load 10s engine := test.QueryEngine() engine.enablePerStepStats = true + origMaxSamples := engine.maxSamplesPerQuery for _, c := range cases { t.Run(c.Query, func(t *testing.T) { opts := &QueryOpts{EnablePerStepStats: true} + engine.maxSamplesPerQuery = origMaxSamples - var err error - var qry Query - if c.Interval == 0 { - qry, err = engine.NewInstantQuery(test.Queryable(), opts, c.Query, c.Start) - } else { - qry, err = engine.NewRangeQuery(test.Queryable(), opts, c.Query, c.Start, c.End, c.Interval) + runQuery := func(expErr error) *stats.Statistics { + var err error + var qry Query + if c.Interval == 0 { + qry, err = engine.NewInstantQuery(test.Queryable(), opts, c.Query, c.Start) + } else { + qry, err = engine.NewRangeQuery(test.Queryable(), opts, c.Query, c.Start, c.End, c.Interval) + } + require.NoError(t, err) + + res := qry.Exec(test.Context()) + require.Equal(t, expErr, res.Err) + + return qry.Stats() } - require.NoError(t, err) - res := qry.Exec(test.Context()) - require.Nil(t, res.Err) - - stats := qry.Stats() + stats := runQuery(nil) require.Equal(t, c.TotalSamples, stats.Samples.TotalSamples, "Total samples mismatch") require.Equal(t, &c.TotalSamplesPerStep, stats.Samples.TotalSamplesPerStepMap(), "Total samples per time mismatch") + require.Equal(t, c.PeakSamples, stats.Samples.PeakSamples, "Peak samples mismatch") + + // Check that the peak is correct by setting the max to one less. + if c.SkipMaxCheck { + return + } + engine.maxSamplesPerQuery = stats.Samples.PeakSamples - 1 + runQuery(ErrTooManySamples(env)) }) } } @@ -1337,6 +1392,9 @@ load 10s stats := qry.Stats() require.Equal(t, expError, res.Err) require.NotNil(t, stats) + if expError == nil { + require.Equal(t, c.MaxSamples, stats.Samples.PeakSamples, "peak samples mismatch for query %q", c.Query) + } } // Within limit. diff --git a/util/stats/query_stats.go b/util/stats/query_stats.go index fc31445808..85f9a99c3b 100644 --- a/util/stats/query_stats.go +++ b/util/stats/query_stats.go @@ -108,6 +108,7 @@ type queryTimings struct { type querySamples struct { TotalQueryableSamplesPerStep []stepStat `json:"totalQueryableSamplesPerStep,omitempty"` TotalQueryableSamples int `json:"totalQueryableSamples"` + PeakSamples int `json:"peakSamples"` } // BuiltinStats holds the statistics that Prometheus's core gathers. @@ -156,6 +157,7 @@ func NewQueryStats(s *Statistics) QueryStats { if sp != nil { samples = &querySamples{ TotalQueryableSamples: sp.TotalSamples, + PeakSamples: sp.PeakSamples, } samples.TotalQueryableSamplesPerStep = sp.totalSamplesPerStepPoints() } @@ -229,6 +231,12 @@ type QueryTimers struct { type TotalSamplesPerStep map[int64]int type QuerySamples struct { + // PeakSamples represent the highest count of samples considered + // while evaluating a query. It corresponds to the peak value of + // currentSamples, which is in turn compared against the MaxSamples + // configured in the engine. + PeakSamples int + // TotalSamples represents the total number of samples scanned // while evaluating a query. TotalSamples int @@ -287,6 +295,28 @@ func (qs *QuerySamples) IncrementSamplesAtTimestamp(t int64, samples int) { } } +// UpdatePeak updates the peak number of samples considered in +// the evaluation of a query as used with the MaxSamples limit. +func (qs *QuerySamples) UpdatePeak(samples int) { + if qs == nil { + return + } + if samples > qs.PeakSamples { + qs.PeakSamples = samples + } +} + +// UpdatePeakFromSubquery updates the peak number of samples considered +// in a query from its evaluation of a subquery. +func (qs *QuerySamples) UpdatePeakFromSubquery(other *QuerySamples) { + if qs == nil || other == nil { + return + } + if other.PeakSamples > qs.PeakSamples { + qs.PeakSamples = other.PeakSamples + } +} + func NewQueryTimers() *QueryTimers { return &QueryTimers{NewTimerGroup()} }