diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index fc52c6dc7..e2fb6b26f 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -30,6 +30,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -334,7 +335,8 @@ func main() { } } - promql.SetDefaultEvaluationInterval(time.Duration(config.DefaultGlobalConfig.EvaluationInterval)) + noStepSubqueryInterval := &safePromQLNoStepSubqueryInterval{} + noStepSubqueryInterval.Set(config.DefaultGlobalConfig.EvaluationInterval) // Above level 6, the k8s client would log bearer tokens in clear-text. klog.ClampLevel(6) @@ -367,12 +369,13 @@ func main() { scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage) opts = promql.EngineOpts{ - Logger: log.With(logger, "component", "query engine"), - Reg: prometheus.DefaultRegisterer, - MaxSamples: cfg.queryMaxSamples, - Timeout: time.Duration(cfg.queryTimeout), - ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")), - LookbackDelta: time.Duration(cfg.lookbackDelta), + Logger: log.With(logger, "component", "query engine"), + Reg: prometheus.DefaultRegisterer, + MaxSamples: cfg.queryMaxSamples, + Timeout: time.Duration(cfg.queryTimeout), + ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")), + LookbackDelta: time.Duration(cfg.lookbackDelta), + NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get, } queryEngine = promql.NewEngine(opts) @@ -606,11 +609,11 @@ func main() { for { select { case <-hup: - if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil { + if err := reloadConfig(cfg.configFile, logger, noStepSubqueryInterval, reloaders...); err != nil { level.Error(logger).Log("msg", "Error reloading config", "err", err) } case rc := <-webHandler.Reload(): - if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil { + if err := reloadConfig(cfg.configFile, logger, noStepSubqueryInterval, reloaders...); err != nil { level.Error(logger).Log("msg", "Error reloading config", "err", err) rc <- err } else { @@ -642,7 +645,7 @@ func main() { return nil } - if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil { + if err := reloadConfig(cfg.configFile, logger, noStepSubqueryInterval, reloaders...); err != nil { return errors.Wrapf(err, "error loading config from %q", cfg.configFile) } @@ -797,7 +800,22 @@ func openDBWithMetrics(dir string, logger log.Logger, reg prometheus.Registerer, return db, nil } -func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) { +type safePromQLNoStepSubqueryInterval struct { + value int64 +} + +func durationToInt64Millis(d time.Duration) int64 { + return int64(d / time.Millisecond) +} +func (i *safePromQLNoStepSubqueryInterval) Set(ev model.Duration) { + atomic.StoreInt64(&i.value, durationToInt64Millis(time.Duration(ev))) +} + +func (i *safePromQLNoStepSubqueryInterval) Get(int64) int64 { + return atomic.LoadInt64(&i.value) +} + +func reloadConfig(filename string, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, rls ...func(*config.Config) error) (err error) { level.Info(logger).Log("msg", "Loading configuration file", "filename", filename) defer func() { @@ -825,7 +843,7 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config return errors.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename) } - promql.SetDefaultEvaluationInterval(time.Duration(conf.GlobalConfig.EvaluationInterval)) + noStepSuqueryInterval.Set(conf.GlobalConfig.EvaluationInterval) level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename) return nil } diff --git a/promql/engine.go b/promql/engine.go index de4e968c4..f6bf6a61a 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -24,7 +24,6 @@ import ( "sort" "strconv" "sync" - "sync/atomic" "time" "github.com/go-kit/kit/log" @@ -56,22 +55,6 @@ const ( minInt64 = -9223372036854775808 ) -var ( - // DefaultEvaluationInterval is the default evaluation interval of - // a subquery in milliseconds. - DefaultEvaluationInterval int64 -) - -// SetDefaultEvaluationInterval sets DefaultEvaluationInterval. -func SetDefaultEvaluationInterval(ev time.Duration) { - atomic.StoreInt64(&DefaultEvaluationInterval, durationToInt64Millis(ev)) -} - -// GetDefaultEvaluationInterval returns the DefaultEvaluationInterval as time.Duration. -func GetDefaultEvaluationInterval() int64 { - return atomic.LoadInt64(&DefaultEvaluationInterval) -} - type engineMetrics struct { currentQueries prometheus.Gauge maxConcurrentQueries prometheus.Gauge @@ -221,19 +204,24 @@ type EngineOpts struct { // LookbackDelta determines the time since the last sample after which a time // series is considered stale. LookbackDelta time.Duration + + // NoStepSubqueryIntervalFn is the default evaluation interval of + // a subquery in milliseconds if no step in range vector was specified `[30m:]`. + NoStepSubqueryIntervalFn func(rangeMillis int64) int64 } // Engine handles the lifetime of queries from beginning to end. // It is connected to a querier. type Engine struct { - logger log.Logger - metrics *engineMetrics - timeout time.Duration - maxSamplesPerQuery int - activeQueryTracker *ActiveQueryTracker - queryLogger QueryLogger - queryLoggerLock sync.RWMutex - lookbackDelta time.Duration + logger log.Logger + metrics *engineMetrics + timeout time.Duration + maxSamplesPerQuery int + activeQueryTracker *ActiveQueryTracker + queryLogger QueryLogger + queryLoggerLock sync.RWMutex + lookbackDelta time.Duration + noStepSubqueryIntervalFn func(rangeMillis int64) int64 } // NewEngine returns a new engine. @@ -328,12 +316,13 @@ func NewEngine(opts EngineOpts) *Engine { } return &Engine{ - timeout: opts.Timeout, - logger: opts.Logger, - metrics: metrics, - maxSamplesPerQuery: opts.MaxSamples, - activeQueryTracker: opts.ActiveQueryTracker, - lookbackDelta: opts.LookbackDelta, + timeout: opts.Timeout, + logger: opts.Logger, + metrics: metrics, + maxSamplesPerQuery: opts.MaxSamples, + activeQueryTracker: opts.ActiveQueryTracker, + lookbackDelta: opts.LookbackDelta, + noStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn, } } @@ -525,14 +514,14 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval if s.Start == s.End && s.Interval == 0 { start := timeMilliseconds(s.Start) evaluator := &evaluator{ - startTimestamp: start, - endTimestamp: start, - interval: 1, - ctx: ctxInnerEval, - maxSamples: ng.maxSamplesPerQuery, - defaultEvalInterval: GetDefaultEvaluationInterval(), - logger: ng.logger, - lookbackDelta: ng.lookbackDelta, + startTimestamp: start, + endTimestamp: start, + interval: 1, + ctx: ctxInnerEval, + maxSamples: ng.maxSamplesPerQuery, + logger: ng.logger, + lookbackDelta: ng.lookbackDelta, + noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, } val, warnings, err := evaluator.Eval(s.Expr) @@ -575,14 +564,14 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval // Range evaluation. evaluator := &evaluator{ - startTimestamp: timeMilliseconds(s.Start), - endTimestamp: timeMilliseconds(s.End), - interval: durationMilliseconds(s.Interval), - ctx: ctxInnerEval, - maxSamples: ng.maxSamplesPerQuery, - defaultEvalInterval: GetDefaultEvaluationInterval(), - logger: ng.logger, - lookbackDelta: ng.lookbackDelta, + startTimestamp: timeMilliseconds(s.Start), + endTimestamp: timeMilliseconds(s.End), + interval: durationMilliseconds(s.Interval), + ctx: ctxInnerEval, + maxSamples: ng.maxSamplesPerQuery, + logger: ng.logger, + lookbackDelta: ng.lookbackDelta, + noStepSubqueryIntervalFn: ng.noStepSubqueryIntervalFn, } val, warnings, err := evaluator.Eval(s.Expr) if err != nil { @@ -657,7 +646,7 @@ func (ng *Engine) populateSeries(querier storage.Querier, s *parser.EvalStmt) { hints := &storage.SelectHints{ Start: timestamp.FromTime(s.Start), End: timestamp.FromTime(s.End), - Step: durationToInt64Millis(s.Interval), + Step: durationMilliseconds(s.Interval), } // We need to make sure we select the timerange selected by the subquery. @@ -769,11 +758,11 @@ type evaluator struct { endTimestamp int64 // End time in milliseconds. interval int64 // Interval in milliseconds. - maxSamples int - currentSamples int - defaultEvalInterval int64 - logger log.Logger - lookbackDelta time.Duration + maxSamples int + currentSamples int + logger log.Logger + lookbackDelta time.Duration + noStepSubqueryIntervalFn func(rangeMillis int64) int64 } // errorf causes a panic with the input formatted into an error. @@ -1333,21 +1322,22 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { return ev.matrixSelector(e) case *parser.SubqueryExpr: - offsetMillis := durationToInt64Millis(e.Offset) - rangeMillis := durationToInt64Millis(e.Range) + offsetMillis := durationMilliseconds(e.Offset) + rangeMillis := durationMilliseconds(e.Range) newEv := &evaluator{ - endTimestamp: ev.endTimestamp - offsetMillis, - interval: ev.defaultEvalInterval, - ctx: ev.ctx, - currentSamples: ev.currentSamples, - maxSamples: ev.maxSamples, - defaultEvalInterval: ev.defaultEvalInterval, - logger: ev.logger, - lookbackDelta: ev.lookbackDelta, + endTimestamp: ev.endTimestamp - offsetMillis, + ctx: ev.ctx, + currentSamples: ev.currentSamples, + maxSamples: ev.maxSamples, + logger: ev.logger, + lookbackDelta: ev.lookbackDelta, + noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn, } if e.Step != 0 { - newEv.interval = durationToInt64Millis(e.Step) + newEv.interval = durationMilliseconds(e.Step) + } else { + newEv.interval = ev.noStepSubqueryIntervalFn(rangeMillis) } // Start with the first timestamp after (ev.startTimestamp - offset - range) @@ -1367,10 +1357,6 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) { panic(errors.Errorf("unhandled expression of type: %T", expr)) } -func durationToInt64Millis(d time.Duration) int64 { - return int64(d / time.Millisecond) -} - // vectorSelector evaluates a *parser.VectorSelector expression. func (ev *evaluator) vectorSelector(node *parser.VectorSelector, ts int64) (Vector, storage.Warnings) { ws, err := checkAndExpandSeriesSet(ev.ctx, node) diff --git a/promql/engine_test.go b/promql/engine_test.go index c9c046318..8c4bc89fc 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -23,7 +23,6 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" @@ -1114,30 +1113,27 @@ func TestSubquerySelector(t *testing.T) { }, } - SetDefaultEvaluationInterval(1 * time.Minute) for _, tst := range tests { - test, err := NewTest(t, tst.loadString) - testutil.Ok(t, err) - - defer test.Close() - - err = test.Run() - testutil.Ok(t, err) - - engine := test.QueryEngine() - for _, c := range tst.cases { - var err error - var qry Query - - qry, err = engine.NewInstantQuery(test.Queryable(), c.Query, c.Start) + t.Run("", func(t *testing.T) { + test, err := NewTest(t, tst.loadString) testutil.Ok(t, err) + defer test.Close() - res := qry.Exec(test.Context()) - testutil.Equals(t, c.Result.Err, res.Err) - mat := res.Value.(Matrix) - sort.Sort(mat) - testutil.Equals(t, c.Result.Value, mat) - } + testutil.Ok(t, test.Run()) + engine := test.QueryEngine() + for _, c := range tst.cases { + t.Run(c.Query, func(t *testing.T) { + qry, err := engine.NewInstantQuery(test.Queryable(), c.Query, c.Start) + testutil.Ok(t, err) + + res := qry.Exec(test.Context()) + testutil.Equals(t, c.Result.Err, res.Err, "errors do not match for query %s", c.Query) + mat := res.Value.(Matrix) + sort.Sort(mat) + testutil.Equals(t, c.Result.Value, mat) + }) + } + }) } } diff --git a/promql/test.go b/promql/test.go index e3d99e83f..ff2549144 100644 --- a/promql/test.go +++ b/promql/test.go @@ -518,10 +518,11 @@ func (t *Test) clear() { t.storage = teststorage.New(t) opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: 100 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 100 * time.Second, + NoStepSubqueryIntervalFn: func(int64) int64 { return durationMilliseconds(1 * time.Minute) }, } t.queryEngine = NewEngine(opts)