diff --git a/promql/engine.go b/promql/engine.go index e7032b903e..5e24b05cf3 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -363,8 +363,9 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start() // Instant evaluation. if s.Start == s.End && s.Interval == 0 { + start := timeMilliseconds(s.Start) evaluator := &evaluator{ - Timestamp: timeMilliseconds(s.Start), + Timestamp: start, ctx: ctx, } val, err := evaluator.Eval(s.Expr) @@ -374,6 +375,16 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( evalTimer.Stop() queryInnerEval.Observe(evalTimer.ElapsedTime().Seconds()) + // Point might have a different timestamp, force it to the evaluation + // timestamp as that is when we ran the evaluation. + switch v := val.(type) { + case Scalar: + v.T = start + case Vector: + for i := range v { + v[i].Point.T = start + } + } return val, nil } @@ -387,8 +398,9 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return nil, err } + t := timeMilliseconds(ts) evaluator := &evaluator{ - Timestamp: timeMilliseconds(ts), + Timestamp: t, ctx: ctx, } val, err := evaluator.Eval(s.Expr) @@ -405,7 +417,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( ss = Series{Points: make([]Point, 0, numSteps)} Seriess[0] = ss } - ss.Points = append(ss.Points, Point(v)) + ss.Points = append(ss.Points, Point{V: v.V, T: t}) Seriess[0] = ss case Vector: for _, sample := range v { @@ -418,6 +430,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( } Seriess[h] = ss } + sample.Point.T = t ss.Points = append(ss.Points, sample.Point) Seriess[h] = ss } diff --git a/promql/engine_test.go b/promql/engine_test.go index 134f417f34..e6794f455b 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -15,10 +15,13 @@ package promql import ( "fmt" + "reflect" "testing" "time" "golang.org/x/net/context" + + "github.com/prometheus/prometheus/pkg/labels" ) func TestQueryConcurrency(t *testing.T) { @@ -194,6 +197,93 @@ func TestEngineShutdown(t *testing.T) { } } +func TestEngineEvalStmtTimestamps(t *testing.T) { + test, err := NewTest(t, ` +load 10s + metric 1 2 +`) + if err != nil { + t.Fatalf("unexpected error creating test: %q", err) + } + err = test.Run() + if err != nil { + t.Fatalf("unexpected error initializing test: %q", err) + } + + cases := []struct { + Query string + Result Value + Start time.Time + End time.Time + Interval time.Duration + }{ + // Instant queries. + { + Query: "1", + Result: Scalar{V: 1, T: 1000}, + Start: time.Unix(1, 0), + }, + { + Query: "metric", + Result: Vector{ + Sample{Point: Point{V: 1, T: 1000}, + Metric: labels.FromStrings("__name__", "metric")}, + }, + Start: time.Unix(1, 0), + }, + { + Query: "metric[20s]", + Result: Matrix{Series{ + Points: []Point{{V: 1, T: 0}, {V: 2, T: 10000}}, + Metric: labels.FromStrings("__name__", "metric")}, + }, + Start: time.Unix(10, 0), + }, + // Range queries. + { + Query: "1", + Result: Matrix{Series{ + Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 1, T: 2000}}, + Metric: labels.FromStrings()}, + }, + Start: time.Unix(0, 0), + End: time.Unix(2, 0), + Interval: time.Second, + }, + { + Query: "metric", + Result: Matrix{Series{ + Points: []Point{{V: 1, T: 0}, {V: 1, T: 1000}, {V: 2, T: 2000}}, + Metric: labels.FromStrings("__name__", "metric")}, + }, + Start: time.Unix(0, 0), + End: time.Unix(2, 0), + Interval: time.Second, + }, + } + + for _, c := range cases { + var err error + var qry Query + if c.Interval == 0 { + qry, err = test.QueryEngine().NewInstantQuery(c.Query, c.Start) + } else { + qry, err = test.QueryEngine().NewRangeQuery(c.Query, c.Start, c.End, c.Interval) + } + if err != nil { + t.Fatalf("unexpected error creating query: %q", err) + } + res := qry.Exec(test.Context()) + if res.Err != nil { + t.Fatalf("unexpected error running query: %q", res.Err) + } + if !reflect.DeepEqual(res.Value, c.Result) { + t.Fatalf("unexpected result for query %q: got %q wanted %q", c.Query, res.Value.String(), c.Result.String()) + } + } + +} + func TestRecoverEvaluatorRuntime(t *testing.T) { var ev *evaluator var err error