Report PeakSamples in query statistics

This exactly corresponds to the statistic compared against MaxSamples
during the course of query execution, so users can see how close their
queries are to a limit.

Co-authored-by: Harkishen Singh <harkishensingh@hotmail.com>
Co-authored-by: Andrew Bloomgarden <blmgrdn@amazon.com>
Signed-off-by: Andrew Bloomgarden <blmgrdn@amazon.com>
This commit is contained in:
Andrew Bloomgarden 2022-02-22 15:30:39 -05:00 committed by Julien Pivotto
parent ed091a1fb9
commit a64b9fe323
3 changed files with 117 additions and 14 deletions

View file

@ -1115,6 +1115,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
}
}
args[i] = vectors[i]
ev.samplesStats.UpdatePeak(ev.currentSamples)
}
// Make the function call.
@ -1130,10 +1131,12 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
// When we reset currentSamples to tempNumSamples during the next iteration of the loop it also
// needs to include the samples from the result here, as they're still in memory.
tempNumSamples += len(result)
ev.samplesStats.UpdatePeak(ev.currentSamples)
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
// If this could be an instant query, shortcut so as not to change sort order.
if ev.endTimestamp == ev.startTimestamp {
@ -1143,6 +1146,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
mat[i] = Series{Metric: s.Metric, Points: []Point{s.Point}}
}
ev.currentSamples = originalNumSamples + mat.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)
return mat, warnings
}
@ -1175,17 +1179,20 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
mat = append(mat, ss)
}
ev.currentSamples = originalNumSamples + mat.TotalSamples()
ev.samplesStats.UpdatePeak(ev.currentSamples)
return mat, warnings
}
// evalSubquery evaluates given SubqueryExpr and returns an equivalent
// evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, storage.Warnings) {
sampleStats := ev.samplesStats
samplesStats := ev.samplesStats
// Avoid double counting samples when running a subquery, those samples will be counted in later stage.
ev.samplesStats = nil
ev.samplesStats = ev.samplesStats.NewChild()
val, ws := ev.eval(subq)
ev.samplesStats = sampleStats
// But do incorporate the peak from the subquery
samplesStats.UpdatePeakFromSubquery(ev.samplesStats)
ev.samplesStats = samplesStats
mat := val.(Matrix)
vs := &parser.VectorSelector{
OriginalOffset: subq.OriginalOffset,
@ -1409,7 +1416,9 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
} else {
putPointSlice(ss.Points)
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
ev.currentSamples -= len(points)
putPointSlice(points)
@ -1563,6 +1572,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
putPointSlice(ss.Points)
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
return mat, ws
case *parser.MatrixSelector:
@ -1607,6 +1617,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
res, ws := newEv.eval(e.Expr)
ev.currentSamples = newEv.currentSamples
ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats)
ev.samplesStats.IncrementSamplesAtTimestamp(ev.endTimestamp, newEv.samplesStats.TotalSamples)
return res, ws
case *parser.StepInvariantExpr:
@ -1629,6 +1640,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
}
res, ws := newEv.eval(e.Expr)
ev.currentSamples = newEv.currentSamples
ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats)
for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts = ts + ev.interval {
step++
ev.samplesStats.IncrementSamplesAtStep(step, newEv.samplesStats.TotalSamples)
@ -1662,6 +1674,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
}
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
return res, ws
}
@ -1694,6 +1707,7 @@ func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vect
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
return vec, ws
}
@ -1841,6 +1855,7 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
ev.currentSamples++
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
return out
}

View file

@ -756,14 +756,17 @@ load 10s
cases := []struct {
Query string
SkipMaxCheck bool
TotalSamples int
TotalSamplesPerStep stats.TotalSamplesPerStep
PeakSamples int
Start time.Time
End time.Time
Interval time.Duration
}{
{
Query: `"literal string"`,
SkipMaxCheck: true, // This can't fail from a max samples limit.
Start: time.Unix(21, 0),
TotalSamples: 0,
TotalSamplesPerStep: stats.TotalSamplesPerStep{
@ -774,6 +777,7 @@ load 10s
Query: "1",
Start: time.Unix(21, 0),
TotalSamples: 0,
PeakSamples: 1,
TotalSamplesPerStep: stats.TotalSamplesPerStep{
21000: 0,
},
@ -781,6 +785,7 @@ load 10s
{
Query: "metricWith1SampleEvery10Seconds",
Start: time.Unix(21, 0),
PeakSamples: 1,
TotalSamples: 1, // 1 sample / 10 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
21000: 1,
@ -790,6 +795,7 @@ load 10s
// timestamp function has a special handling.
Query: "timestamp(metricWith1SampleEvery10Seconds)",
Start: time.Unix(21, 0),
PeakSamples: 2,
TotalSamples: 1, // 1 sample / 10 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
21000: 1,
@ -798,6 +804,7 @@ load 10s
{
Query: "metricWith1SampleEvery10Seconds",
Start: time.Unix(22, 0),
PeakSamples: 1,
TotalSamples: 1, // 1 sample / 10 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
22000: 1, // Aligned to the step time, not the sample time.
@ -806,6 +813,7 @@ load 10s
{
Query: "metricWith1SampleEvery10Seconds offset 10s",
Start: time.Unix(21, 0),
PeakSamples: 1,
TotalSamples: 1, // 1 sample / 10 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
21000: 1,
@ -814,6 +822,7 @@ load 10s
{
Query: "metricWith1SampleEvery10Seconds @ 15",
Start: time.Unix(21, 0),
PeakSamples: 1,
TotalSamples: 1, // 1 sample / 10 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
21000: 1,
@ -822,6 +831,7 @@ load 10s
{
Query: `metricWith3SampleEvery10Seconds{a="1"}`,
Start: time.Unix(21, 0),
PeakSamples: 1,
TotalSamples: 1, // 1 sample / 10 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
21000: 1,
@ -830,6 +840,7 @@ load 10s
{
Query: `metricWith3SampleEvery10Seconds{a="1"} @ 19`,
Start: time.Unix(21, 0),
PeakSamples: 1,
TotalSamples: 1, // 1 sample / 10 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
21000: 1,
@ -838,6 +849,7 @@ load 10s
{
Query: `metricWith3SampleEvery10Seconds{a="1"}[20s] @ 19`,
Start: time.Unix(21, 0),
PeakSamples: 2,
TotalSamples: 2, // (1 sample / 10 seconds) * 20s
TotalSamplesPerStep: stats.TotalSamplesPerStep{
21000: 2,
@ -846,6 +858,7 @@ load 10s
{
Query: "metricWith3SampleEvery10Seconds",
Start: time.Unix(21, 0),
PeakSamples: 3,
TotalSamples: 3, // 3 samples / 10 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
21000: 3,
@ -854,6 +867,7 @@ load 10s
{
Query: "metricWith1SampleEvery10Seconds[60s]",
Start: time.Unix(201, 0),
PeakSamples: 6,
TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 6,
@ -862,6 +876,7 @@ load 10s
{
Query: "max_over_time(metricWith1SampleEvery10Seconds[59s])[20s:5s]",
Start: time.Unix(201, 0),
PeakSamples: 10,
TotalSamples: 24, // (1 sample / 10 seconds * 60 seconds) * 60/5 (using 59s so we always return 6 samples
// as if we run a query on 00 looking back 60 seconds we will return 7 samples;
// see next test).
@ -872,6 +887,7 @@ load 10s
{
Query: "max_over_time(metricWith1SampleEvery10Seconds[60s])[20s:5s]",
Start: time.Unix(201, 0),
PeakSamples: 11,
TotalSamples: 26, // (1 sample / 10 seconds * 60 seconds) + 2 as
// max_over_time(metricWith1SampleEvery10Seconds[60s]) @ 190 and 200 will return 7 samples.
TotalSamplesPerStep: stats.TotalSamplesPerStep{
@ -881,6 +897,7 @@ load 10s
{
Query: "metricWith1SampleEvery10Seconds[60s] @ 30",
Start: time.Unix(201, 0),
PeakSamples: 4,
TotalSamples: 4, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 1 series
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 4,
@ -889,6 +906,7 @@ load 10s
{
Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))",
Start: time.Unix(201, 0),
PeakSamples: 7,
TotalSamples: 12, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 12,
@ -897,6 +915,7 @@ load 10s
{
Query: "sum by (b) (max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))",
Start: time.Unix(201, 0),
PeakSamples: 8,
TotalSamples: 12, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 12,
@ -905,6 +924,7 @@ load 10s
{
Query: "metricWith1SampleEvery10Seconds[60s] offset 10s",
Start: time.Unix(201, 0),
PeakSamples: 6,
TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 6,
@ -913,6 +933,7 @@ load 10s
{
Query: "metricWith3SampleEvery10Seconds[60s]",
Start: time.Unix(201, 0),
PeakSamples: 18,
TotalSamples: 18, // 3 sample / 10 seconds * 60 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 18,
@ -921,6 +942,7 @@ load 10s
{
Query: "max_over_time(metricWith1SampleEvery10Seconds[60s])",
Start: time.Unix(201, 0),
PeakSamples: 7,
TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 6,
@ -929,6 +951,7 @@ load 10s
{
Query: "absent_over_time(metricWith1SampleEvery10Seconds[60s])",
Start: time.Unix(201, 0),
PeakSamples: 7,
TotalSamples: 6, // 1 sample / 10 seconds * 60 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 6,
@ -937,6 +960,7 @@ load 10s
{
Query: "max_over_time(metricWith3SampleEvery10Seconds[60s])",
Start: time.Unix(201, 0),
PeakSamples: 9,
TotalSamples: 18, // 3 sample / 10 seconds * 60 seconds
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 18,
@ -945,6 +969,7 @@ load 10s
{
Query: "metricWith1SampleEvery10Seconds[60s:5s]",
Start: time.Unix(201, 0),
PeakSamples: 12,
TotalSamples: 12, // 1 sample per query * 12 queries (60/5)
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 12,
@ -953,6 +978,7 @@ load 10s
{
Query: "metricWith1SampleEvery10Seconds[60s:5s] offset 10s",
Start: time.Unix(201, 0),
PeakSamples: 12,
TotalSamples: 12, // 1 sample per query * 12 queries (60/5)
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 12,
@ -961,6 +987,7 @@ load 10s
{
Query: "max_over_time(metricWith3SampleEvery10Seconds[60s:5s])",
Start: time.Unix(201, 0),
PeakSamples: 51,
TotalSamples: 36, // 3 sample per query * 12 queries (60/5)
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 36,
@ -969,6 +996,7 @@ load 10s
{
Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))",
Start: time.Unix(201, 0),
PeakSamples: 52,
TotalSamples: 72, // 2 * (3 sample per query * 12 queries (60/5))
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 72,
@ -979,6 +1007,7 @@ load 10s
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
Interval: 5 * time.Second,
PeakSamples: 4,
TotalSamples: 4, // 1 sample per query * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 1,
@ -992,6 +1021,7 @@ load 10s
Start: time.Unix(204, 0),
End: time.Unix(223, 0),
Interval: 5 * time.Second,
PeakSamples: 4,
TotalSamples: 4, // 1 sample per query * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
204000: 1, // aligned to the step time, not the sample time
@ -1006,6 +1036,7 @@ load 10s
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
Interval: 5 * time.Second,
PeakSamples: 5,
TotalSamples: 4, // (1 sample / 10 seconds) * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 1,
@ -1019,6 +1050,7 @@ load 10s
Start: time.Unix(991, 0),
End: time.Unix(1021, 0),
Interval: 10 * time.Second,
PeakSamples: 2,
TotalSamples: 2, // 1 sample per query * 2 steps with data
TotalSamplesPerStep: stats.TotalSamplesPerStep{
991000: 1,
@ -1032,6 +1064,7 @@ load 10s
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
Interval: 5 * time.Second,
PeakSamples: 4,
TotalSamples: 4, // 1 sample per query * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 1,
@ -1045,6 +1078,7 @@ load 10s
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
Interval: 5 * time.Second,
PeakSamples: 12,
TotalSamples: 48, // @ modifier force the evaluation timestamp at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 3 series * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 12,
@ -1057,6 +1091,7 @@ load 10s
Query: `metricWith3SampleEvery10Seconds`,
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
PeakSamples: 12,
Interval: 5 * time.Second,
TotalSamples: 12, // 3 sample per query * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
@ -1071,6 +1106,7 @@ load 10s
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
Interval: 5 * time.Second,
PeakSamples: 18,
TotalSamples: 72, // (3 sample / 10 seconds * 60 seconds) * 4 steps = 72
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 18,
@ -1084,6 +1120,7 @@ load 10s
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
Interval: 5 * time.Second,
PeakSamples: 72,
TotalSamples: 144, // 3 sample per query * 12 queries (60/5) * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 36,
@ -1097,6 +1134,7 @@ load 10s
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
Interval: 5 * time.Second,
PeakSamples: 32,
TotalSamples: 48, // 1 sample per query * 12 queries (60/5) * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 12,
@ -1110,6 +1148,7 @@ load 10s
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
Interval: 5 * time.Second,
PeakSamples: 32,
TotalSamples: 48, // 1 sample per query * 12 queries (60/5) * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 12,
@ -1123,6 +1162,7 @@ load 10s
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
Interval: 5 * time.Second,
PeakSamples: 76,
TotalSamples: 288, // 2 * (3 sample per query * 12 queries (60/5) * 4 steps)
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 72,
@ -1136,6 +1176,7 @@ load 10s
Start: time.Unix(201, 0),
End: time.Unix(220, 0),
Interval: 5 * time.Second,
PeakSamples: 72,
TotalSamples: 192, // (1 sample per query * 12 queries (60/5) + 3 sample per query * 12 queries (60/5)) * 4 steps
TotalSamplesPerStep: stats.TotalSamplesPerStep{
201000: 48,
@ -1148,25 +1189,39 @@ load 10s
engine := test.QueryEngine()
engine.enablePerStepStats = true
origMaxSamples := engine.maxSamplesPerQuery
for _, c := range cases {
t.Run(c.Query, func(t *testing.T) {
opts := &QueryOpts{EnablePerStepStats: true}
engine.maxSamplesPerQuery = origMaxSamples
var err error
var qry Query
if c.Interval == 0 {
qry, err = engine.NewInstantQuery(test.Queryable(), opts, c.Query, c.Start)
} else {
qry, err = engine.NewRangeQuery(test.Queryable(), opts, c.Query, c.Start, c.End, c.Interval)
runQuery := func(expErr error) *stats.Statistics {
var err error
var qry Query
if c.Interval == 0 {
qry, err = engine.NewInstantQuery(test.Queryable(), opts, c.Query, c.Start)
} else {
qry, err = engine.NewRangeQuery(test.Queryable(), opts, c.Query, c.Start, c.End, c.Interval)
}
require.NoError(t, err)
res := qry.Exec(test.Context())
require.Equal(t, expErr, res.Err)
return qry.Stats()
}
require.NoError(t, err)
res := qry.Exec(test.Context())
require.Nil(t, res.Err)
stats := qry.Stats()
stats := runQuery(nil)
require.Equal(t, c.TotalSamples, stats.Samples.TotalSamples, "Total samples mismatch")
require.Equal(t, &c.TotalSamplesPerStep, stats.Samples.TotalSamplesPerStepMap(), "Total samples per time mismatch")
require.Equal(t, c.PeakSamples, stats.Samples.PeakSamples, "Peak samples mismatch")
// Check that the peak is correct by setting the max to one less.
if c.SkipMaxCheck {
return
}
engine.maxSamplesPerQuery = stats.Samples.PeakSamples - 1
runQuery(ErrTooManySamples(env))
})
}
}
@ -1337,6 +1392,9 @@ load 10s
stats := qry.Stats()
require.Equal(t, expError, res.Err)
require.NotNil(t, stats)
if expError == nil {
require.Equal(t, c.MaxSamples, stats.Samples.PeakSamples, "peak samples mismatch for query %q", c.Query)
}
}
// Within limit.

View file

@ -108,6 +108,7 @@ type queryTimings struct {
type querySamples struct {
TotalQueryableSamplesPerStep []stepStat `json:"totalQueryableSamplesPerStep,omitempty"`
TotalQueryableSamples int `json:"totalQueryableSamples"`
PeakSamples int `json:"peakSamples"`
}
// BuiltinStats holds the statistics that Prometheus's core gathers.
@ -156,6 +157,7 @@ func NewQueryStats(s *Statistics) QueryStats {
if sp != nil {
samples = &querySamples{
TotalQueryableSamples: sp.TotalSamples,
PeakSamples: sp.PeakSamples,
}
samples.TotalQueryableSamplesPerStep = sp.totalSamplesPerStepPoints()
}
@ -229,6 +231,12 @@ type QueryTimers struct {
type TotalSamplesPerStep map[int64]int
type QuerySamples struct {
// PeakSamples represent the highest count of samples considered
// while evaluating a query. It corresponds to the peak value of
// currentSamples, which is in turn compared against the MaxSamples
// configured in the engine.
PeakSamples int
// TotalSamples represents the total number of samples scanned
// while evaluating a query.
TotalSamples int
@ -287,6 +295,28 @@ func (qs *QuerySamples) IncrementSamplesAtTimestamp(t int64, samples int) {
}
}
// UpdatePeak updates the peak number of samples considered in
// the evaluation of a query as used with the MaxSamples limit.
func (qs *QuerySamples) UpdatePeak(samples int) {
if qs == nil {
return
}
if samples > qs.PeakSamples {
qs.PeakSamples = samples
}
}
// UpdatePeakFromSubquery updates the peak number of samples considered
// in a query from its evaluation of a subquery.
func (qs *QuerySamples) UpdatePeakFromSubquery(other *QuerySamples) {
if qs == nil || other == nil {
return
}
if other.PeakSamples > qs.PeakSamples {
qs.PeakSamples = other.PeakSamples
}
}
func NewQueryTimers() *QueryTimers {
return &QueryTimers{NewTimerGroup()}
}