mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #12579 from charleskorn/timestamp
Don't recreate iterator for each series on each timestep when evaluating a query with `timestamp()`
This commit is contained in:
commit
8d47b3d497
|
@ -186,6 +186,10 @@ func rangeQueryCases() []benchCase {
|
||||||
expr: "count({__name__!=\"\",l=\"\"})",
|
expr: "count({__name__!=\"\",l=\"\"})",
|
||||||
steps: 1,
|
steps: 1,
|
||||||
},
|
},
|
||||||
|
// Functions which have special handling inside eval()
|
||||||
|
{
|
||||||
|
expr: "timestamp(a_X)",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// X in an expr will be replaced by different metric sizes.
|
// X in an expr will be replaced by different metric sizes.
|
||||||
|
|
|
@ -1387,15 +1387,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
|
||||||
unwrapParenExpr(&arg)
|
unwrapParenExpr(&arg)
|
||||||
vs, ok := arg.(*parser.VectorSelector)
|
vs, ok := arg.(*parser.VectorSelector)
|
||||||
if ok {
|
if ok {
|
||||||
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
|
return ev.rangeEvalTimestampFunctionOverVectorSelector(vs, call, e)
|
||||||
if vs.Timestamp != nil {
|
|
||||||
// This is a special case only for "timestamp" since the offset
|
|
||||||
// needs to be adjusted for every point.
|
|
||||||
vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond
|
|
||||||
}
|
|
||||||
val, ws := ev.vectorSelector(vs, enh.Ts)
|
|
||||||
return call([]parser.Value{val}, e.Args, enh), ws
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1833,38 +1825,48 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
|
||||||
panic(fmt.Errorf("unhandled expression of type: %T", expr))
|
panic(fmt.Errorf("unhandled expression of type: %T", expr))
|
||||||
}
|
}
|
||||||
|
|
||||||
// vectorSelector evaluates a *parser.VectorSelector expression.
|
func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, storage.Warnings) {
|
||||||
func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vector, storage.Warnings) {
|
ws, err := checkAndExpandSeriesSet(ev.ctx, vs)
|
||||||
ws, err := checkAndExpandSeriesSet(ev.ctx, node)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
|
ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws})
|
||||||
}
|
}
|
||||||
vec := make(Vector, 0, len(node.Series))
|
|
||||||
it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta))
|
|
||||||
var chkIter chunkenc.Iterator
|
|
||||||
for i, s := range node.Series {
|
|
||||||
chkIter = s.Iterator(chkIter)
|
|
||||||
it.Reset(chkIter)
|
|
||||||
|
|
||||||
t, f, h, ok := ev.vectorSelectorSingle(it, node, ts)
|
seriesIterators := make([]*storage.MemoizedSeriesIterator, len(vs.Series))
|
||||||
if ok {
|
for i, s := range vs.Series {
|
||||||
vec = append(vec, Sample{
|
it := s.Iterator(nil)
|
||||||
Metric: node.Series[i].Labels(),
|
seriesIterators[i] = storage.NewMemoizedIterator(it, durationMilliseconds(ev.lookbackDelta))
|
||||||
T: t,
|
}
|
||||||
F: f,
|
|
||||||
H: h,
|
|
||||||
})
|
|
||||||
|
|
||||||
ev.currentSamples++
|
return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, storage.Warnings) {
|
||||||
ev.samplesStats.IncrementSamplesAtTimestamp(ts, 1)
|
if vs.Timestamp != nil {
|
||||||
if ev.currentSamples > ev.maxSamples {
|
// This is a special case for "timestamp()" when the @ modifier is used, to ensure that
|
||||||
ev.error(ErrTooManySamples(env))
|
// we return a point for each time step in this case.
|
||||||
}
|
// See https://github.com/prometheus/prometheus/issues/8433.
|
||||||
|
vs.Offset = time.Duration(enh.Ts-*vs.Timestamp) * time.Millisecond
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
vec := make(Vector, 0, len(vs.Series))
|
||||||
ev.samplesStats.UpdatePeak(ev.currentSamples)
|
for i, s := range vs.Series {
|
||||||
return vec, ws
|
it := seriesIterators[i]
|
||||||
|
t, f, h, ok := ev.vectorSelectorSingle(it, vs, enh.Ts)
|
||||||
|
if ok {
|
||||||
|
vec = append(vec, Sample{
|
||||||
|
Metric: s.Labels(),
|
||||||
|
T: t,
|
||||||
|
F: f,
|
||||||
|
H: h,
|
||||||
|
})
|
||||||
|
|
||||||
|
ev.currentSamples++
|
||||||
|
ev.samplesStats.IncrementSamplesAtTimestamp(enh.Ts, 1)
|
||||||
|
if ev.currentSamples > ev.maxSamples {
|
||||||
|
ev.error(ErrTooManySamples(env))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ev.samplesStats.UpdatePeak(ev.currentSamples)
|
||||||
|
return call([]parser.Value{vec}, e.Args, enh), ws
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// vectorSelectorSingle evaluates an instant vector for the iterator of one time series.
|
// vectorSelectorSingle evaluates an instant vector for the iterator of one time series.
|
||||||
|
|
|
@ -1977,6 +1977,50 @@ func TestSubquerySelector(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTimestampFunction_StepsMoreOftenThanSamples(t *testing.T) {
|
||||||
|
test, err := NewTest(t, `
|
||||||
|
load 1m
|
||||||
|
metric 0+1x1000
|
||||||
|
`)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer test.Close()
|
||||||
|
|
||||||
|
err = test.Run()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
query := "timestamp(metric)"
|
||||||
|
start := time.Unix(0, 0)
|
||||||
|
end := time.Unix(61, 0)
|
||||||
|
interval := time.Second
|
||||||
|
|
||||||
|
// We expect the value to be 0 for t=0s to t=59s (inclusive), then 60 for t=60s and t=61s.
|
||||||
|
expectedPoints := []FPoint{}
|
||||||
|
|
||||||
|
for t := 0; t <= 59; t++ {
|
||||||
|
expectedPoints = append(expectedPoints, FPoint{F: 0, T: int64(t * 1000)})
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedPoints = append(
|
||||||
|
expectedPoints,
|
||||||
|
FPoint{F: 60, T: 60_000},
|
||||||
|
FPoint{F: 60, T: 61_000},
|
||||||
|
)
|
||||||
|
|
||||||
|
expectedResult := Matrix{
|
||||||
|
Series{
|
||||||
|
Floats: expectedPoints,
|
||||||
|
Metric: labels.EmptyLabels(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
qry, err := test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, query, start, end, interval)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
res := qry.Exec(test.Context())
|
||||||
|
require.NoError(t, res.Err)
|
||||||
|
require.Equal(t, expectedResult, res.Value)
|
||||||
|
}
|
||||||
|
|
||||||
type FakeQueryLogger struct {
|
type FakeQueryLogger struct {
|
||||||
closed bool
|
closed bool
|
||||||
logs []interface{}
|
logs []interface{}
|
||||||
|
|
Loading…
Reference in a new issue