diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 6a627ad38..b2ace5f59 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -999,12 +999,18 @@ func main() { listeners, err := webHandler.Listeners() if err != nil { level.Error(logger).Log("msg", "Unable to start web listeners", "err", err) + if err := queryEngine.Close(); err != nil { + level.Warn(logger).Log("msg", "Closing query engine failed", "err", err) + } os.Exit(1) } err = toolkit_web.Validate(*webConfig) if err != nil { level.Error(logger).Log("msg", "Unable to validate web configuration file", "err", err) + if err := queryEngine.Close(); err != nil { + level.Warn(logger).Log("msg", "Closing query engine failed", "err", err) + } os.Exit(1) } @@ -1026,6 +1032,9 @@ func main() { case <-cancel: reloadReady.Close() } + if err := queryEngine.Close(); err != nil { + level.Warn(logger).Log("msg", "Closing query engine failed", "err", err) + } return nil }, func(err error) { diff --git a/promql/bench_test.go b/promql/bench_test.go index 33523b2db..74e85b054 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/promql/promqltest" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/prometheus/prometheus/util/teststorage" @@ -274,7 +275,7 @@ func BenchmarkRangeQuery(b *testing.B) { MaxSamples: 50000000, Timeout: 100 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(b, opts) const interval = 10000 // 10s interval. // A day of data plus 10k steps. @@ -365,7 +366,7 @@ func BenchmarkNativeHistograms(b *testing.B) { for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { - ng := promql.NewEngine(opts) + ng := promqltest.NewTestEngineWithOpts(b, opts) for i := 0; i < b.N; i++ { qry, err := ng.NewRangeQuery(context.Background(), testStorage, nil, tc.query, start, end, step) if err != nil { diff --git a/promql/engine.go b/promql/engine.go index b54ce2d6d..3edb30a97 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -19,6 +19,7 @@ import ( "context" "errors" "fmt" + "io" "math" "reflect" "runtime" @@ -271,6 +272,8 @@ func contextErr(err error, env string) error { // // 2) Enforcement of the maximum number of concurrent queries. type QueryTracker interface { + io.Closer + // GetMaxConcurrent returns maximum number of concurrent queries that are allowed by this tracker. GetMaxConcurrent() int @@ -430,6 +433,14 @@ func NewEngine(opts EngineOpts) *Engine { } } +// Close closes ng. +func (ng *Engine) Close() error { + if ng.activeQueryTracker != nil { + return ng.activeQueryTracker.Close() + } + return nil +} + // SetQueryLogger sets the query logger. func (ng *Engine) SetQueryLogger(l QueryLogger) { ng.queryLoggerLock.Lock() diff --git a/promql/engine_test.go b/promql/engine_test.go index 923d1264d..947c0e1ed 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -17,7 +17,6 @@ import ( "context" "errors" "fmt" - "os" "sort" "strconv" "sync" @@ -55,14 +54,7 @@ func TestMain(m *testing.M) { func TestQueryConcurrency(t *testing.T) { maxConcurrency := 10 - dir, err := os.MkdirTemp("", "test_concurrency") - require.NoError(t, err) - defer os.RemoveAll(dir) - queryTracker := promql.NewActiveQueryTracker(dir, maxConcurrency, nil) - t.Cleanup(func() { - require.NoError(t, queryTracker.Close()) - }) - + queryTracker := promql.NewActiveQueryTracker(t.TempDir(), maxConcurrency, nil) opts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -70,15 +62,17 @@ func TestQueryConcurrency(t *testing.T) { Timeout: 100 * time.Second, ActiveQueryTracker: queryTracker, } + engine := promqltest.NewTestEngineWithOpts(t, opts) - engine := promql.NewEngine(opts) ctx, cancelCtx := context.WithCancel(context.Background()) - defer cancelCtx() + t.Cleanup(cancelCtx) block := make(chan struct{}) processing := make(chan struct{}) done := make(chan int) - defer close(done) + t.Cleanup(func() { + close(done) + }) f := func(context.Context) error { select { @@ -163,7 +157,7 @@ func TestQueryTimeout(t *testing.T) { MaxSamples: 10, Timeout: 5 * time.Millisecond, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -188,7 +182,7 @@ func TestQueryCancel(t *testing.T) { MaxSamples: 10, Timeout: 10 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -262,7 +256,7 @@ func TestQueryError(t *testing.T) { MaxSamples: 10, Timeout: 10 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) errStorage := promql.ErrStorage{errors.New("storage error")} queryable := storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { return &errQuerier{err: errStorage}, nil @@ -596,7 +590,7 @@ func TestSelectHintsSetCorrectly(t *testing.T) { }, } { t.Run(tc.query, func(t *testing.T) { - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) hintsRecorder := &noopHintRecordingQueryable{} var ( @@ -627,7 +621,7 @@ func TestEngineShutdown(t *testing.T) { MaxSamples: 10, Timeout: 10 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) ctx, cancelCtx := context.WithCancel(context.Background()) block := make(chan struct{}) @@ -763,7 +757,7 @@ load 10s t.Run(fmt.Sprintf("%d query=%s", i, c.Query), func(t *testing.T) { var err error var qry promql.Query - engine := newTestEngine() + engine := newTestEngine(t) if c.Interval == 0 { qry, err = engine.NewInstantQuery(context.Background(), storage, nil, c.Query, c.Start) } else { @@ -1305,7 +1299,7 @@ load 10s for _, c := range cases { t.Run(c.Query, func(t *testing.T) { opts := promql.NewPrometheusQueryOpts(true, 0) - engine := promqltest.NewTestEngine(true, 0, promqltest.DefaultMaxSamplesPerQuery) + engine := promqltest.NewTestEngine(t, true, 0, promqltest.DefaultMaxSamplesPerQuery) runQuery := func(expErr error) *stats.Statistics { var err error @@ -1332,7 +1326,7 @@ load 10s if c.SkipMaxCheck { return } - engine = promqltest.NewTestEngine(true, 0, stats.Samples.PeakSamples-1) + engine = promqltest.NewTestEngine(t, true, 0, stats.Samples.PeakSamples-1) runQuery(promql.ErrTooManySamples(env)) }) } @@ -1485,7 +1479,7 @@ load 10s for _, c := range cases { t.Run(c.Query, func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) testFunc := func(expError error) { var err error var qry promql.Query @@ -1506,18 +1500,18 @@ load 10s } // Within limit. - engine = promqltest.NewTestEngine(false, 0, c.MaxSamples) + engine = promqltest.NewTestEngine(t, false, 0, c.MaxSamples) testFunc(nil) // Exceeding limit. - engine = promqltest.NewTestEngine(false, 0, c.MaxSamples-1) + engine = promqltest.NewTestEngine(t, false, 0, c.MaxSamples-1) testFunc(promql.ErrTooManySamples(env)) }) } } func TestAtModifier(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := promqltest.LoadedStorage(t, ` load 10s metric{job="1"} 0+1x1000 @@ -2000,7 +1994,7 @@ func TestSubquerySelector(t *testing.T) { }, } { t.Run("", func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := promqltest.LoadedStorage(t, tst.loadString) t.Cleanup(func() { storage.Close() }) @@ -2049,7 +2043,7 @@ func TestQueryLogger_basic(t *testing.T) { MaxSamples: 10, Timeout: 10 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) queryExec := func() { ctx, cancelCtx := context.WithCancel(context.Background()) @@ -2100,7 +2094,7 @@ func TestQueryLogger_fields(t *testing.T) { MaxSamples: 10, Timeout: 10 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) f1 := NewFakeQueryLogger() engine.SetQueryLogger(f1) @@ -2129,7 +2123,7 @@ func TestQueryLogger_error(t *testing.T) { MaxSamples: 10, Timeout: 10 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) f1 := NewFakeQueryLogger() engine.SetQueryLogger(f1) @@ -3012,7 +3006,7 @@ func TestEngineOptsValidation(t *testing.T) { } for _, c := range cases { - eng := promql.NewEngine(c.opts) + eng := promqltest.NewTestEngineWithOpts(t, c.opts) _, err1 := eng.NewInstantQuery(context.Background(), nil, nil, c.query, time.Unix(10, 0)) _, err2 := eng.NewRangeQuery(context.Background(), nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second) if c.fail { @@ -3026,7 +3020,7 @@ func TestEngineOptsValidation(t *testing.T) { } func TestInstantQueryWithRangeVectorSelector(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) baseT := timestamp.Time(0) storage := promqltest.LoadedStorage(t, ` @@ -3280,7 +3274,7 @@ func TestNativeHistogram_SubOperator(t *testing.T) { for _, c := range cases { for _, floatHisto := range []bool{true, false} { t.Run(fmt.Sprintf("floatHistogram=%t %d", floatHisto, idx0), func(t *testing.T) { - engine := newTestEngine() + engine := newTestEngine(t) storage := teststorage.New(t) t.Cleanup(func() { storage.Close() }) @@ -3399,7 +3393,7 @@ metric 0 1 2 for _, c := range cases { c := c t.Run(c.name, func(t *testing.T) { - engine := promqltest.NewTestEngine(false, c.engineLookback, promqltest.DefaultMaxSamplesPerQuery) + engine := promqltest.NewTestEngine(t, false, c.engineLookback, promqltest.DefaultMaxSamplesPerQuery) storage := promqltest.LoadedStorage(t, load) t.Cleanup(func() { storage.Close() }) @@ -3436,7 +3430,7 @@ histogram {{sum:4 count:4 buckets:[2 2]}} {{sum:6 count:6 buckets:[3 3]}} {{sum: ` storage := promqltest.LoadedStorage(t, load) t.Cleanup(func() { storage.Close() }) - engine := promqltest.NewTestEngine(false, 0, promqltest.DefaultMaxSamplesPerQuery) + engine := promqltest.NewTestEngine(t, false, 0, promqltest.DefaultMaxSamplesPerQuery) verify := func(t *testing.T, qry promql.Query, expected []histogram.FloatHistogram) { res := qry.Exec(context.Background()) diff --git a/promql/functions_test.go b/promql/functions_test.go index aef59c837..9ee0ba51d 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/promql/promqltest" "github.com/prometheus/prometheus/util/teststorage" ) @@ -39,7 +40,7 @@ func TestDeriv(t *testing.T) { MaxSamples: 10000, Timeout: 10 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) a := storage.Appender(context.Background()) diff --git a/promql/promql_test.go b/promql/promql_test.go index a423f90ee..345ecab5e 100644 --- a/promql/promql_test.go +++ b/promql/promql_test.go @@ -28,12 +28,12 @@ import ( "github.com/prometheus/prometheus/util/teststorage" ) -func newTestEngine() *promql.Engine { - return promqltest.NewTestEngine(false, 0, promqltest.DefaultMaxSamplesPerQuery) +func newTestEngine(t *testing.T) *promql.Engine { + return promqltest.NewTestEngine(t, false, 0, promqltest.DefaultMaxSamplesPerQuery) } func TestEvaluations(t *testing.T) { - promqltest.RunBuiltinTests(t, newTestEngine()) + promqltest.RunBuiltinTests(t, newTestEngine(t)) } // Run a lot of queries at the same time, to check for race conditions. @@ -48,7 +48,7 @@ func TestConcurrentRangeQueries(t *testing.T) { } // Enable experimental functions testing parser.EnableExperimentalFunctions = true - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) const interval = 10000 // 10s interval. // A day of data plus 10k steps. diff --git a/promql/promqltest/test.go b/promql/promqltest/test.go index 065e52e33..ff709e442 100644 --- a/promql/promqltest/test.go +++ b/promql/promqltest/test.go @@ -79,8 +79,9 @@ func LoadedStorage(t testutil.T, input string) *teststorage.TestStorage { return test.storage } -func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamples int) *promql.Engine { - return promql.NewEngine(promql.EngineOpts{ +// NewTestEngine creates a promql.Engine with enablePerStepStats, lookbackDelta and maxSamples, and returns it. +func NewTestEngine(tb testing.TB, enablePerStepStats bool, lookbackDelta time.Duration, maxSamples int) *promql.Engine { + return NewTestEngineWithOpts(tb, promql.EngineOpts{ Logger: nil, Reg: nil, MaxSamples: maxSamples, @@ -94,6 +95,16 @@ func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamp }) } +// NewTestEngineWithOpts creates a promql.Engine with opts and returns it. +func NewTestEngineWithOpts(tb testing.TB, opts promql.EngineOpts) *promql.Engine { + tb.Helper() + ng := promql.NewEngine(opts) + tb.Cleanup(func() { + require.NoError(tb, ng.Close()) + }) + return ng +} + // RunBuiltinTests runs an acceptance test suite against the provided engine. func RunBuiltinTests(t TBRun, engine promql.QueryEngine) { t.Cleanup(func() { parser.EnableExperimentalFunctions = false }) @@ -1436,7 +1447,11 @@ func (ll *LazyLoader) Storage() storage.Storage { // Close closes resources associated with the LazyLoader. func (ll *LazyLoader) Close() error { ll.cancelCtx() - return ll.storage.Close() + err := ll.queryEngine.Close() + if sErr := ll.storage.Close(); sErr != nil { + return errors.Join(sErr, err) + } + return err } func makeInt64Pointer(val int64) *int64 { diff --git a/promql/promqltest/test_test.go b/promql/promqltest/test_test.go index 49b43eb12..5aff71fb1 100644 --- a/promql/promqltest/test_test.go +++ b/promql/promqltest/test_test.go @@ -595,7 +595,7 @@ eval range from 0 to 5m step 5m testmetric for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - err := runTest(t, testCase.input, NewTestEngine(false, 0, DefaultMaxSamplesPerQuery)) + err := runTest(t, testCase.input, NewTestEngine(t, false, 0, DefaultMaxSamplesPerQuery)) if testCase.expectedError == "" { require.NoError(t, err) diff --git a/rules/alerting_test.go b/rules/alerting_test.go index 406332946..67d683c85 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -36,16 +36,19 @@ import ( "github.com/prometheus/prometheus/util/testutil" ) -var testEngine = promql.NewEngine(promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: 100 * time.Second, - NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 }, - EnableAtModifier: true, - EnableNegativeOffset: true, - EnablePerStepStats: true, -}) +func testEngine(tb testing.TB) *promql.Engine { + tb.Helper() + return promqltest.NewTestEngineWithOpts(tb, promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 100 * time.Second, + NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 }, + EnableAtModifier: true, + EnableNegativeOffset: true, + EnablePerStepStats: true, + }) +} func TestAlertingRuleState(t *testing.T) { tests := []struct { @@ -225,12 +228,14 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) { }, } + ng := testEngine(t) + baseTime := time.Unix(0, 0) for i, result := range results { t.Logf("case %d", i) evalTime := baseTime.Add(time.Duration(i) * time.Minute) result[0].T = timestamp.FromTime(evalTime) - res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -247,7 +252,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) { testutil.RequireEqual(t, result, filteredRes) } evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute) - res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) require.Empty(t, res) } @@ -309,13 +314,15 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) { }, } + ng := testEngine(t) + evalTime := time.Unix(0, 0) result[0].T = timestamp.FromTime(evalTime) result[1].T = timestamp.FromTime(evalTime) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := ruleWithoutExternalLabels.Eval( - context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -329,7 +336,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) { } res, err = ruleWithExternalLabels.Eval( - context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -406,9 +413,11 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) { result[0].T = timestamp.FromTime(evalTime) result[1].T = timestamp.FromTime(evalTime) + ng := testEngine(t) + var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := ruleWithoutExternalURL.Eval( - context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -422,7 +431,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) { } res, err = ruleWithExternalURL.Eval( - context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -475,9 +484,11 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) { evalTime := time.Unix(0, 0) result[0].T = timestamp.FromTime(evalTime) + ng := testEngine(t) + var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. res, err := rule.Eval( - context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, + context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0, ) require.NoError(t, err) for _, smpl := range res { @@ -520,6 +531,8 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }}; ) evalTime := time.Unix(0, 0) + ng := testEngine(t) + startQueryCh := make(chan struct{}) getDoneCh := make(chan struct{}) slowQueryFunc := func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) { @@ -533,7 +546,7 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }}; require.Fail(t, "unexpected blocking when template expanding.") } } - return EngineQueryFunc(testEngine, storage)(ctx, q, ts) + return EngineQueryFunc(ng, storage)(ctx, q, ts) } go func() { <-startQueryCh @@ -578,7 +591,7 @@ func TestAlertingRuleDuplicate(t *testing.T) { Timeout: 10 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -642,9 +655,9 @@ func TestAlertingRuleLimit(t *testing.T) { ) evalTime := time.Unix(0, 0) - + ng := testEngine(t) for _, test := range tests { - switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); { + switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, test.limit); { case err != nil: require.EqualError(t, err, test.err) case test.err != "": @@ -866,12 +879,13 @@ func TestKeepFiringFor(t *testing.T) { }, } + ng := testEngine(t) baseTime := time.Unix(0, 0) for i, result := range results { t.Logf("case %d", i) evalTime := baseTime.Add(time.Duration(i) * time.Minute) result[0].T = timestamp.FromTime(evalTime) - res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -888,7 +902,7 @@ func TestKeepFiringFor(t *testing.T) { testutil.RequireEqual(t, result, filteredRes) } evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute) - res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) require.Empty(t, res) } @@ -923,9 +937,10 @@ func TestPendingAndKeepFiringFor(t *testing.T) { F: 1, } + ng := testEngine(t) baseTime := time.Unix(0, 0) result.T = timestamp.FromTime(baseTime) - res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) require.Len(t, res, 2) @@ -940,7 +955,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) { } evalTime := baseTime.Add(time.Minute) - res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) require.Empty(t, res) } diff --git a/rules/manager_test.go b/rules/manager_test.go index 83ceca060..d658d3f8f 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -158,12 +158,13 @@ func TestAlertingRule(t *testing.T) { }, } + ng := testEngine(t) for i, test := range tests { t.Logf("case %d", i) evalTime := baseTime.Add(test.time) - res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples. @@ -299,6 +300,7 @@ func TestForStateAddSamples(t *testing.T) { }, } + ng := testEngine(t) var forState float64 for i, test := range tests { t.Logf("case %d", i) @@ -311,7 +313,7 @@ func TestForStateAddSamples(t *testing.T) { forState = float64(value.StaleNaN) } - res, err := rule.Eval(context.TODO(), queryOffset, evalTime, EngineQueryFunc(testEngine, storage), nil, 0) + res, err := rule.Eval(context.TODO(), queryOffset, evalTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) var filteredRes promql.Vector // After removing 'ALERTS' samples. @@ -366,8 +368,9 @@ func TestForStateRestore(t *testing.T) { expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) require.NoError(t, err) + ng := testEngine(t) opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(testEngine, storage), + QueryFunc: EngineQueryFunc(ng, storage), Appendable: storage, Queryable: storage, Context: context.Background(), @@ -538,7 +541,7 @@ func TestStaleness(t *testing.T) { MaxSamples: 10, Timeout: 10 * time.Second, } - engine := promql.NewEngine(engineOpts) + engine := promqltest.NewTestEngineWithOpts(t, engineOpts) opts := &ManagerOptions{ QueryFunc: EngineQueryFunc(engine, st), Appendable: st, @@ -772,7 +775,7 @@ func TestUpdate(t *testing.T) { MaxSamples: 10, Timeout: 10 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) ruleManager := NewManager(&ManagerOptions{ Appendable: st, Queryable: st, @@ -910,7 +913,7 @@ func TestNotify(t *testing.T) { MaxSamples: 10, Timeout: 10 * time.Second, } - engine := promql.NewEngine(engineOpts) + engine := promqltest.NewTestEngineWithOpts(t, engineOpts) var lastNotified []*Alert notifyFunc := func(ctx context.Context, expr string, alerts ...*Alert) { lastNotified = alerts @@ -985,7 +988,7 @@ func TestMetricsUpdate(t *testing.T) { MaxSamples: 10, Timeout: 10 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) ruleManager := NewManager(&ManagerOptions{ Appendable: storage, Queryable: storage, @@ -1059,7 +1062,7 @@ func TestGroupStalenessOnRemoval(t *testing.T) { MaxSamples: 10, Timeout: 10 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) ruleManager := NewManager(&ManagerOptions{ Appendable: storage, Queryable: storage, @@ -1136,7 +1139,7 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) { MaxSamples: 10, Timeout: 10 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) ruleManager := NewManager(&ManagerOptions{ Appendable: storage, Queryable: storage, @@ -1285,7 +1288,7 @@ func TestRuleHealthUpdates(t *testing.T) { MaxSamples: 10, Timeout: 10 * time.Second, } - engine := promql.NewEngine(engineOpts) + engine := promqltest.NewTestEngineWithOpts(t, engineOpts) opts := &ManagerOptions{ QueryFunc: EngineQueryFunc(engine, st), Appendable: st, @@ -1382,9 +1385,10 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) { }, } + ng := testEngine(t) testFunc := func(tst testInput) { opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(testEngine, storage), + QueryFunc: EngineQueryFunc(ng, storage), Appendable: storage, Queryable: storage, Context: context.Background(), @@ -1468,8 +1472,9 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) { } require.NoError(t, app.Commit()) + ng := testEngine(t) opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(testEngine, storage), + QueryFunc: EngineQueryFunc(ng, storage), Appendable: storage, Queryable: storage, Context: context.Background(), diff --git a/rules/recording_test.go b/rules/recording_test.go index fdddd4e02..72c0764f9 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -123,10 +123,11 @@ func TestRuleEval(t *testing.T) { storage := setUpRuleEvalTest(t) t.Cleanup(func() { storage.Close() }) + ng := testEngine(t) for _, scenario := range ruleEvalTestScenarios { t.Run(scenario.name, func(t *testing.T) { rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels) - result, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0) + result, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(ng, storage), nil, 0) require.NoError(t, err) testutil.RequireEqual(t, scenario.expected, result) }) @@ -137,6 +138,7 @@ func BenchmarkRuleEval(b *testing.B) { storage := setUpRuleEvalTest(b) b.Cleanup(func() { storage.Close() }) + ng := testEngine(b) for _, scenario := range ruleEvalTestScenarios { b.Run(scenario.name, func(b *testing.B) { rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels) @@ -144,7 +146,7 @@ func BenchmarkRuleEval(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0) + _, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(ng, storage), nil, 0) if err != nil { require.NoError(b, err) } @@ -165,7 +167,7 @@ func TestRuleEvalDuplicate(t *testing.T) { Timeout: 10 * time.Second, } - engine := promql.NewEngine(opts) + engine := promqltest.NewTestEngineWithOpts(t, opts) ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -212,10 +214,11 @@ func TestRecordingRuleLimit(t *testing.T) { labels.FromStrings("test", "test"), ) + ng := testEngine(t) evalTime := time.Unix(0, 0) for _, test := range tests { - switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); { + switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, test.limit); { case err != nil: require.EqualError(t, err, test.err) case test.err != "": diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index a5f123bc9..f18eb77da 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -974,6 +974,7 @@ func (it *floatHistogramIterator) Reset(b []byte) { if it.atFloatHistogramCalled { it.atFloatHistogramCalled = false it.pBuckets, it.nBuckets = nil, nil + it.pSpans, it.nSpans = nil, nil } else { it.pBuckets, it.nBuckets = it.pBuckets[:0], it.nBuckets[:0] } @@ -1069,7 +1070,7 @@ func (it *floatHistogramIterator) Next() ValueType { // The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code, // so we don't need a separate single delta logic for the 2nd sample. - // Recycle bucket slices that have not been returned yet. Otherwise, copy them. + // Recycle bucket and span slices that have not been returned yet. Otherwise, copy them. // We can always recycle the slices for leading and trailing bits as they are // never returned to the caller. if it.atFloatHistogramCalled { @@ -1088,6 +1089,20 @@ func (it *floatHistogramIterator) Next() ValueType { } else { it.nBuckets = nil } + if len(it.pSpans) > 0 { + newSpans := make([]histogram.Span, len(it.pSpans)) + copy(newSpans, it.pSpans) + it.pSpans = newSpans + } else { + it.pSpans = nil + } + if len(it.nSpans) > 0 { + newSpans := make([]histogram.Span, len(it.nSpans)) + copy(newSpans, it.nSpans) + it.nSpans = newSpans + } else { + it.nSpans = nil + } } tDod, err := readVarbitInt(&it.br) diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index 689696f5a..6092c0f63 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -1306,3 +1306,54 @@ func TestFloatHistogramAppendOnlyErrors(t *testing.T) { require.EqualError(t, err, "float histogram counter reset") }) } + +func TestFloatHistogramUniqueSpansAfterNext(t *testing.T) { + // Create two histograms with the same schema and spans. + h1 := &histogram.FloatHistogram{ + Schema: 1, + ZeroThreshold: 1e-100, + Count: 10, + ZeroCount: 2, + Sum: 15.0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []float64{1, 2, 3, 4}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 1}, + }, + NegativeBuckets: []float64{2}, + } + + h2 := h1.Copy() + + // Create a chunk and append both histograms. + c := NewFloatHistogramChunk() + app, err := c.Appender() + require.NoError(t, err) + + _, _, _, err = app.AppendFloatHistogram(nil, 0, h1, false) + require.NoError(t, err) + + _, _, _, err = app.AppendFloatHistogram(nil, 1, h2, false) + require.NoError(t, err) + + // Create an iterator and advance to the first histogram. + it := c.Iterator(nil) + require.Equal(t, ValFloatHistogram, it.Next()) + _, rh1 := it.AtFloatHistogram(nil) + + // Advance to the second histogram and retrieve it. + require.Equal(t, ValFloatHistogram, it.Next()) + _, rh2 := it.AtFloatHistogram(nil) + + require.Equal(t, rh1.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected") + require.Equal(t, rh1.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected") + require.Equal(t, rh2.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected") + require.Equal(t, rh2.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected") + + // Check that the spans for h1 and h2 are unique slices. + require.NotSame(t, &rh1.PositiveSpans[0], &rh2.PositiveSpans[0], "PositiveSpans should be unique between histograms") + require.NotSame(t, &rh1.NegativeSpans[0], &rh2.NegativeSpans[0], "NegativeSpans should be unique between histograms") +} diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index fafae48d3..f8796d64e 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -1073,6 +1073,7 @@ func (it *histogramIterator) Reset(b []byte) { if it.atHistogramCalled { it.atHistogramCalled = false it.pBuckets, it.nBuckets = nil, nil + it.pSpans, it.nSpans = nil, nil } else { it.pBuckets = it.pBuckets[:0] it.nBuckets = it.nBuckets[:0] @@ -1185,8 +1186,25 @@ func (it *histogramIterator) Next() ValueType { // The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code, // so we don't need a separate single delta logic for the 2nd sample. - // Recycle bucket slices that have not been returned yet. Otherwise, + // Recycle bucket and span slices that have not been returned yet. Otherwise, copy them. // copy them. + if it.atFloatHistogramCalled || it.atHistogramCalled { + if len(it.pSpans) > 0 { + newSpans := make([]histogram.Span, len(it.pSpans)) + copy(newSpans, it.pSpans) + it.pSpans = newSpans + } else { + it.pSpans = nil + } + if len(it.nSpans) > 0 { + newSpans := make([]histogram.Span, len(it.nSpans)) + copy(newSpans, it.nSpans) + it.nSpans = newSpans + } else { + it.nSpans = nil + } + } + if it.atHistogramCalled { it.atHistogramCalled = false if len(it.pBuckets) > 0 { @@ -1204,6 +1222,7 @@ func (it *histogramIterator) Next() ValueType { it.nBuckets = nil } } + // FloatBuckets are set from scratch, so simply create empty ones. if it.atFloatHistogramCalled { it.atFloatHistogramCalled = false diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index 59187ed17..29b77b158 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -1487,6 +1487,108 @@ func TestHistogramAppendOnlyErrors(t *testing.T) { }) } +func TestHistogramUniqueSpansAfterNext(t *testing.T) { + // Create two histograms with the same schema and spans. + h1 := &histogram.Histogram{ + Schema: 1, + ZeroThreshold: 1e-100, + Count: 10, + ZeroCount: 2, + Sum: 15.0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{1, 2, 3, 4}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 1}, + }, + NegativeBuckets: []int64{2}, + } + + h2 := h1.Copy() + + // Create a chunk and append both histograms. + c := NewHistogramChunk() + app, err := c.Appender() + require.NoError(t, err) + + _, _, _, err = app.AppendHistogram(nil, 0, h1, false) + require.NoError(t, err) + + _, _, _, err = app.AppendHistogram(nil, 1, h2, false) + require.NoError(t, err) + + // Create an iterator and advance to the first histogram. + it := c.Iterator(nil) + require.Equal(t, ValHistogram, it.Next()) + _, rh1 := it.AtHistogram(nil) + + // Advance to the second histogram and retrieve it. + require.Equal(t, ValHistogram, it.Next()) + _, rh2 := it.AtHistogram(nil) + + require.Equal(t, rh1.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected") + require.Equal(t, rh1.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected") + require.Equal(t, rh2.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected") + require.Equal(t, rh2.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected") + + // Check that the spans for h1 and h2 are unique slices. + require.NotSame(t, &rh1.PositiveSpans[0], &rh2.PositiveSpans[0], "PositiveSpans should be unique between histograms") + require.NotSame(t, &rh1.NegativeSpans[0], &rh2.NegativeSpans[0], "NegativeSpans should be unique between histograms") +} + +func TestHistogramUniqueSpansAfterNextWithAtFloatHistogram(t *testing.T) { + // Create two histograms with the same schema and spans. + h1 := &histogram.Histogram{ + Schema: 1, + ZeroThreshold: 1e-100, + Count: 10, + ZeroCount: 2, + Sum: 15.0, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 1, Length: 2}, + }, + PositiveBuckets: []int64{1, 2, 3, 4}, + NegativeSpans: []histogram.Span{ + {Offset: 1, Length: 1}, + }, + NegativeBuckets: []int64{2}, + } + + h2 := h1.Copy() + + // Create a chunk and append both histograms. + c := NewHistogramChunk() + app, err := c.Appender() + require.NoError(t, err) + + _, _, _, err = app.AppendHistogram(nil, 0, h1, false) + require.NoError(t, err) + + _, _, _, err = app.AppendHistogram(nil, 1, h2, false) + require.NoError(t, err) + + // Create an iterator and advance to the first histogram. + it := c.Iterator(nil) + require.Equal(t, ValHistogram, it.Next()) + _, rh1 := it.AtFloatHistogram(nil) + + // Advance to the second histogram and retrieve it. + require.Equal(t, ValHistogram, it.Next()) + _, rh2 := it.AtFloatHistogram(nil) + + require.Equal(t, rh1.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected") + require.Equal(t, rh1.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected") + require.Equal(t, rh2.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected") + require.Equal(t, rh2.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected") + + // Check that the spans for h1 and h2 are unique slices. + require.NotSame(t, &rh1.PositiveSpans[0], &rh2.PositiveSpans[0], "PositiveSpans should be unique between histograms") + require.NotSame(t, &rh1.NegativeSpans[0], &rh2.NegativeSpans[0], "NegativeSpans should be unique between histograms") +} + func BenchmarkAppendable(b *testing.B) { // Create a histogram with a bunch of spans and buckets. const ( diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 261ed6b61..ef9d53dd9 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -59,16 +59,19 @@ import ( "github.com/prometheus/prometheus/util/teststorage" ) -var testEngine = promql.NewEngine(promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxSamples: 10000, - Timeout: 100 * time.Second, - NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 }, - EnableAtModifier: true, - EnableNegativeOffset: true, - EnablePerStepStats: true, -}) +func testEngine(t *testing.T) *promql.Engine { + t.Helper() + return promqltest.NewTestEngineWithOpts(t, promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 100 * time.Second, + NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 }, + EnableAtModifier: true, + EnableNegativeOffset: true, + EnablePerStepStats: true, + }) +} // testMetaStore satisfies the scrape.MetricMetadataStore interface. // It is used to inject specific metadata as part of a test case. @@ -306,8 +309,7 @@ func (m *rulesRetrieverMock) CreateRuleGroups() { MaxSamples: 10, Timeout: 100 * time.Second, } - - engine := promql.NewEngine(engineOpts) + engine := promqltest.NewTestEngineWithOpts(m.testing, engineOpts) opts := &rules.ManagerOptions{ QueryFunc: rules.EngineQueryFunc(engine, storage), Appendable: storage, @@ -431,9 +433,10 @@ func TestEndpoints(t *testing.T) { now := time.Now() + ng := testEngine(t) + t.Run("local", func(t *testing.T) { - algr := rulesRetrieverMock{} - algr.testing = t + algr := rulesRetrieverMock{testing: t} algr.CreateAlertingRules() algr.CreateRuleGroups() @@ -445,7 +448,7 @@ func TestEndpoints(t *testing.T) { api := &API{ Queryable: storage, - QueryEngine: testEngine, + QueryEngine: ng, ExemplarQueryable: storage.ExemplarQueryable(), targetRetriever: testTargetRetriever.toFactory(), alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(), @@ -496,8 +499,7 @@ func TestEndpoints(t *testing.T) { }) require.NoError(t, err) - algr := rulesRetrieverMock{} - algr.testing = t + algr := rulesRetrieverMock{testing: t} algr.CreateAlertingRules() algr.CreateRuleGroups() @@ -509,7 +511,7 @@ func TestEndpoints(t *testing.T) { api := &API{ Queryable: remote, - QueryEngine: testEngine, + QueryEngine: ng, ExemplarQueryable: storage.ExemplarQueryable(), targetRetriever: testTargetRetriever.toFactory(), alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(), @@ -651,7 +653,7 @@ func TestQueryExemplars(t *testing.T) { api := &API{ Queryable: storage, - QueryEngine: testEngine, + QueryEngine: testEngine(t), ExemplarQueryable: storage.ExemplarQueryable(), } @@ -870,7 +872,7 @@ func TestStats(t *testing.T) { api := &API{ Queryable: storage, - QueryEngine: testEngine, + QueryEngine: testEngine(t), now: func() time.Time { return time.Unix(123, 0) }, diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index 99ef81018..7e1fc09d8 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/promqltest" "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" @@ -86,7 +87,7 @@ func TestApiStatusCodes(t *testing.T) { "error from seriesset": errorTestQueryable{q: errorTestQuerier{s: errorTestSeriesSet{err: tc.err}}}, } { t.Run(fmt.Sprintf("%s/%s", name, k), func(t *testing.T) { - r := createPrometheusAPI(q) + r := createPrometheusAPI(t, q) rec := httptest.NewRecorder() req := httptest.NewRequest(http.MethodGet, "/api/v1/query?query=up", nil) @@ -100,8 +101,10 @@ func TestApiStatusCodes(t *testing.T) { } } -func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router { - engine := promql.NewEngine(promql.EngineOpts{ +func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route.Router { + t.Helper() + + engine := promqltest.NewTestEngineWithOpts(t, promql.EngineOpts{ Logger: log.NewNopLogger(), Reg: nil, ActiveQueryTracker: nil,