Merge pull request #506 from grafana/charleskorn/streaming-timestamp

Don't recreate iterator for each series on each timestep when evaluating a query with `timestamp()`
This commit is contained in:
Charles Korn 2023-06-29 14:17:23 +10:00 committed by GitHub
commit 5c270e23ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 133 additions and 34 deletions

View file

@ -186,6 +186,10 @@ func rangeQueryCases() []benchCase {
expr: "count({__name__!=\"\",l=\"\"})",
steps: 1,
},
// timestamp() function
{
expr: "timestamp(a_X)",
},
}
// X in an expr will be replaced by different metric sizes.

View file

@ -1353,15 +1353,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
unwrapParenExpr(&arg)
vs, ok := arg.(*parser.VectorSelector)
if ok {
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
if vs.Timestamp != nil {
// This is a special case only for "timestamp" since the offset
// needs to be adjusted for every point.
vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond
}
val, ws := ev.vectorSelector(vs, enh.Ts)
return call([]parser.Value{val}, e.Args, enh), ws
})
return ev.evalTimestampFunctionOverVectorSelector(vs, call, e)
}
}
@ -1799,38 +1791,47 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
panic(fmt.Errorf("unhandled expression of type: %T", expr))
}
// vectorSelector evaluates a *parser.VectorSelector expression.
func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vector, storage.Warnings) {
ws, err := checkAndExpandSeriesSet(ev.ctx, node)
func (ev *evaluator) evalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, storage.Warnings) {
ws, err := checkAndExpandSeriesSet(ev.ctx, vs)
if err != nil {
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
}
vec := make(Vector, 0, len(node.Series))
it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
var chkIter chunkenc.Iterator
for i, s := range node.Series {
chkIter = s.Iterator(chkIter)
it.Reset(chkIter)
t, f, h, ok := ev.vectorSelectorSingle(it, node, ts)
seriesIterators := make([]*storage.MemoizedSeriesIterator, len(vs.Series))
for i, s := range vs.Series {
it := s.Iterator(nil)
seriesIterators[i] = storage.NewMemoizedIterator(it, durationMilliseconds(ev.lookbackDelta))
}
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
if vs.Timestamp != nil {
// This is a special case only for "timestamp" since the offset
// needs to be adjusted for every point.
vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond
}
vec := make(Vector, 0, len(vs.Series))
for i, s := range vs.Series {
it := seriesIterators[i]
t, f, h, ok := ev.vectorSelectorSingle(it, vs, enh.Ts)
if ok {
vec = append(vec, Sample{
Metric: node.Series[i].Labels(),
Metric: s.Labels(),
T: t,
F: f,
H: h,
})
ev.currentSamples++
ev.samplesStats.IncrementSamplesAtTimestamp(ts, 1)
ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, 1)
if ev.currentSamples > ev.maxSamples {
ev.error(ErrTooManySamples(env))
}
}
}
ev.samplesStats.UpdatePeak(ev.currentSamples)
return vec, ws
return call([]parser.Value{vec}, e.Args, enh), ws
})
}
// vectorSelectorSingle evaluates an instant vector for the iterator of one time series.

View file

@ -1976,6 +1976,100 @@ func TestSubquerySelector(t *testing.T) {
}
}
func TestTimestampFunction_StepsMoreOftenThanSamples(t *testing.T) {
test, err := NewTest(t, `
load 1m
metric 0+1x1000
`)
require.NoError(t, err)
defer test.Close()
err = test.Run()
require.NoError(t, err)
query := "timestamp(metric)"
start := time.Unix(0, 0)
end := time.Unix(61, 0)
interval := time.Second
expectedResult := Matrix{
Series{
Floats: []FPoint{
{F: 0, T: 0},
{F: 0, T: 1_000},
{F: 0, T: 2_000},
{F: 0, T: 3_000},
{F: 0, T: 4_000},
{F: 0, T: 5_000},
{F: 0, T: 6_000},
{F: 0, T: 7_000},
{F: 0, T: 8_000},
{F: 0, T: 9_000},
{F: 0, T: 10_000},
{F: 0, T: 11_000},
{F: 0, T: 12_000},
{F: 0, T: 13_000},
{F: 0, T: 14_000},
{F: 0, T: 15_000},
{F: 0, T: 16_000},
{F: 0, T: 17_000},
{F: 0, T: 18_000},
{F: 0, T: 19_000},
{F: 0, T: 20_000},
{F: 0, T: 21_000},
{F: 0, T: 22_000},
{F: 0, T: 23_000},
{F: 0, T: 24_000},
{F: 0, T: 25_000},
{F: 0, T: 26_000},
{F: 0, T: 27_000},
{F: 0, T: 28_000},
{F: 0, T: 29_000},
{F: 0, T: 30_000},
{F: 0, T: 31_000},
{F: 0, T: 32_000},
{F: 0, T: 33_000},
{F: 0, T: 34_000},
{F: 0, T: 35_000},
{F: 0, T: 36_000},
{F: 0, T: 37_000},
{F: 0, T: 38_000},
{F: 0, T: 39_000},
{F: 0, T: 40_000},
{F: 0, T: 41_000},
{F: 0, T: 42_000},
{F: 0, T: 43_000},
{F: 0, T: 44_000},
{F: 0, T: 45_000},
{F: 0, T: 46_000},
{F: 0, T: 47_000},
{F: 0, T: 48_000},
{F: 0, T: 49_000},
{F: 0, T: 50_000},
{F: 0, T: 51_000},
{F: 0, T: 52_000},
{F: 0, T: 53_000},
{F: 0, T: 54_000},
{F: 0, T: 55_000},
{F: 0, T: 56_000},
{F: 0, T: 57_000},
{F: 0, T: 58_000},
{F: 0, T: 59_000},
{F: 60, T: 60_000},
{F: 60, T: 61_000},
},
Metric: labels.EmptyLabels(),
},
}
qry, err := test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, query, start, end, interval)
require.NoError(t, err)
res := qry.Exec(test.Context())
require.NoError(t, res.Err)
require.Equal(t, expectedResult, res.Value)
}
type FakeQueryLogger struct {
closed bool
logs []interface{}