mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-24 05:04:05 -08:00
promql.Engine: Add Close method
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
parent
a6316a5dcb
commit
0cc99e677a
|
@ -955,12 +955,18 @@ func main() {
|
||||||
listener, err := webHandler.Listener()
|
listener, err := webHandler.Listener()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
level.Error(logger).Log("msg", "Unable to start web listener", "err", err)
|
level.Error(logger).Log("msg", "Unable to start web listener", "err", err)
|
||||||
|
if err := queryEngine.Close(); err != nil {
|
||||||
|
level.Warn(logger).Log("msg", "Closing query engine failed", "err", err)
|
||||||
|
}
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = toolkit_web.Validate(*webConfig)
|
err = toolkit_web.Validate(*webConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
level.Error(logger).Log("msg", "Unable to validate web configuration file", "err", err)
|
level.Error(logger).Log("msg", "Unable to validate web configuration file", "err", err)
|
||||||
|
if err := queryEngine.Close(); err != nil {
|
||||||
|
level.Warn(logger).Log("msg", "Closing query engine failed", "err", err)
|
||||||
|
}
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -982,6 +988,9 @@ func main() {
|
||||||
case <-cancel:
|
case <-cancel:
|
||||||
reloadReady.Close()
|
reloadReady.Close()
|
||||||
}
|
}
|
||||||
|
if err := queryEngine.Close(); err != nil {
|
||||||
|
level.Warn(logger).Log("msg", "Closing query engine failed", "err", err)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
func(err error) {
|
func(err error) {
|
||||||
|
|
|
@ -21,6 +21,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/model/histogram"
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
|
@ -257,6 +259,9 @@ func BenchmarkRangeQuery(b *testing.B) {
|
||||||
Timeout: 100 * time.Second,
|
Timeout: 100 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
b.Cleanup(func() {
|
||||||
|
require.NoError(b, engine.Close())
|
||||||
|
})
|
||||||
|
|
||||||
const interval = 10000 // 10s interval.
|
const interval = 10000 // 10s interval.
|
||||||
// A day of data plus 10k steps.
|
// A day of data plus 10k steps.
|
||||||
|
@ -340,6 +345,9 @@ func BenchmarkNativeHistograms(b *testing.B) {
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
b.Run(tc.name, func(b *testing.B) {
|
b.Run(tc.name, func(b *testing.B) {
|
||||||
ng := promql.NewEngine(opts)
|
ng := promql.NewEngine(opts)
|
||||||
|
b.Cleanup(func() {
|
||||||
|
require.NoError(b, ng.Close())
|
||||||
|
})
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
qry, err := ng.NewRangeQuery(context.Background(), testStorage, nil, tc.query, start, end, step)
|
qry, err := ng.NewRangeQuery(context.Background(), testStorage, nil, tc.query, start, end, step)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"reflect"
|
"reflect"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
@ -271,6 +272,8 @@ func contextErr(err error, env string) error {
|
||||||
//
|
//
|
||||||
// 2) Enforcement of the maximum number of concurrent queries.
|
// 2) Enforcement of the maximum number of concurrent queries.
|
||||||
type QueryTracker interface {
|
type QueryTracker interface {
|
||||||
|
io.Closer
|
||||||
|
|
||||||
// GetMaxConcurrent returns maximum number of concurrent queries that are allowed by this tracker.
|
// GetMaxConcurrent returns maximum number of concurrent queries that are allowed by this tracker.
|
||||||
GetMaxConcurrent() int
|
GetMaxConcurrent() int
|
||||||
|
|
||||||
|
@ -423,6 +426,14 @@ func NewEngine(opts EngineOpts) *Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close closes ng.
|
||||||
|
func (ng *Engine) Close() error {
|
||||||
|
if ng.activeQueryTracker != nil {
|
||||||
|
return ng.activeQueryTracker.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// SetQueryLogger sets the query logger.
|
// SetQueryLogger sets the query logger.
|
||||||
func (ng *Engine) SetQueryLogger(l QueryLogger) {
|
func (ng *Engine) SetQueryLogger(l QueryLogger) {
|
||||||
ng.queryLoggerLock.Lock()
|
ng.queryLoggerLock.Lock()
|
||||||
|
|
|
@ -18,7 +18,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -55,14 +54,7 @@ func TestMain(m *testing.M) {
|
||||||
func TestQueryConcurrency(t *testing.T) {
|
func TestQueryConcurrency(t *testing.T) {
|
||||||
maxConcurrency := 10
|
maxConcurrency := 10
|
||||||
|
|
||||||
dir, err := os.MkdirTemp("", "test_concurrency")
|
queryTracker := promql.NewActiveQueryTracker(t.TempDir(), maxConcurrency, nil)
|
||||||
require.NoError(t, err)
|
|
||||||
defer os.RemoveAll(dir)
|
|
||||||
queryTracker := promql.NewActiveQueryTracker(dir, maxConcurrency, nil)
|
|
||||||
t.Cleanup(func() {
|
|
||||||
require.NoError(t, queryTracker.Close())
|
|
||||||
})
|
|
||||||
|
|
||||||
opts := promql.EngineOpts{
|
opts := promql.EngineOpts{
|
||||||
Logger: nil,
|
Logger: nil,
|
||||||
Reg: nil,
|
Reg: nil,
|
||||||
|
@ -72,13 +64,18 @@ func TestQueryConcurrency(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||||
defer cancelCtx()
|
t.Cleanup(cancelCtx)
|
||||||
|
|
||||||
block := make(chan struct{})
|
block := make(chan struct{})
|
||||||
processing := make(chan struct{})
|
processing := make(chan struct{})
|
||||||
done := make(chan int)
|
done := make(chan int)
|
||||||
defer close(done)
|
t.Cleanup(func() {
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
|
||||||
f := func(context.Context) error {
|
f := func(context.Context) error {
|
||||||
select {
|
select {
|
||||||
|
@ -164,6 +161,9 @@ func TestQueryTimeout(t *testing.T) {
|
||||||
Timeout: 5 * time.Millisecond,
|
Timeout: 5 * time.Millisecond,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||||
defer cancelCtx()
|
defer cancelCtx()
|
||||||
|
|
||||||
|
@ -263,6 +263,9 @@ func TestQueryError(t *testing.T) {
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
errStorage := promql.ErrStorage{errors.New("storage error")}
|
errStorage := promql.ErrStorage{errors.New("storage error")}
|
||||||
queryable := storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
|
queryable := storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
|
||||||
return &errQuerier{err: errStorage}, nil
|
return &errQuerier{err: errStorage}, nil
|
||||||
|
@ -597,6 +600,9 @@ func TestSelectHintsSetCorrectly(t *testing.T) {
|
||||||
} {
|
} {
|
||||||
t.Run(tc.query, func(t *testing.T) {
|
t.Run(tc.query, func(t *testing.T) {
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
hintsRecorder := &noopHintRecordingQueryable{}
|
hintsRecorder := &noopHintRecordingQueryable{}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -628,6 +634,9 @@ func TestEngineShutdown(t *testing.T) {
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||||
|
|
||||||
block := make(chan struct{})
|
block := make(chan struct{})
|
||||||
|
@ -763,7 +772,7 @@ load 10s
|
||||||
t.Run(fmt.Sprintf("%d query=%s", i, c.Query), func(t *testing.T) {
|
t.Run(fmt.Sprintf("%d query=%s", i, c.Query), func(t *testing.T) {
|
||||||
var err error
|
var err error
|
||||||
var qry promql.Query
|
var qry promql.Query
|
||||||
engine := newTestEngine()
|
engine := newTestEngine(t)
|
||||||
if c.Interval == 0 {
|
if c.Interval == 0 {
|
||||||
qry, err = engine.NewInstantQuery(context.Background(), storage, nil, c.Query, c.Start)
|
qry, err = engine.NewInstantQuery(context.Background(), storage, nil, c.Query, c.Start)
|
||||||
} else {
|
} else {
|
||||||
|
@ -1305,7 +1314,7 @@ load 10s
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
t.Run(c.Query, func(t *testing.T) {
|
t.Run(c.Query, func(t *testing.T) {
|
||||||
opts := promql.NewPrometheusQueryOpts(true, 0)
|
opts := promql.NewPrometheusQueryOpts(true, 0)
|
||||||
engine := promqltest.NewTestEngine(true, 0, promqltest.DefaultMaxSamplesPerQuery)
|
engine := promqltest.NewTestEngine(t, true, 0, promqltest.DefaultMaxSamplesPerQuery)
|
||||||
|
|
||||||
runQuery := func(expErr error) *stats.Statistics {
|
runQuery := func(expErr error) *stats.Statistics {
|
||||||
var err error
|
var err error
|
||||||
|
@ -1332,7 +1341,7 @@ load 10s
|
||||||
if c.SkipMaxCheck {
|
if c.SkipMaxCheck {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
engine = promqltest.NewTestEngine(true, 0, stats.Samples.PeakSamples-1)
|
engine = promqltest.NewTestEngine(t, true, 0, stats.Samples.PeakSamples-1)
|
||||||
runQuery(promql.ErrTooManySamples(env))
|
runQuery(promql.ErrTooManySamples(env))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1485,7 +1494,7 @@ load 10s
|
||||||
|
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
t.Run(c.Query, func(t *testing.T) {
|
t.Run(c.Query, func(t *testing.T) {
|
||||||
engine := newTestEngine()
|
engine := newTestEngine(t)
|
||||||
testFunc := func(expError error) {
|
testFunc := func(expError error) {
|
||||||
var err error
|
var err error
|
||||||
var qry promql.Query
|
var qry promql.Query
|
||||||
|
@ -1506,18 +1515,18 @@ load 10s
|
||||||
}
|
}
|
||||||
|
|
||||||
// Within limit.
|
// Within limit.
|
||||||
engine = promqltest.NewTestEngine(false, 0, c.MaxSamples)
|
engine = promqltest.NewTestEngine(t, false, 0, c.MaxSamples)
|
||||||
testFunc(nil)
|
testFunc(nil)
|
||||||
|
|
||||||
// Exceeding limit.
|
// Exceeding limit.
|
||||||
engine = promqltest.NewTestEngine(false, 0, c.MaxSamples-1)
|
engine = promqltest.NewTestEngine(t, false, 0, c.MaxSamples-1)
|
||||||
testFunc(promql.ErrTooManySamples(env))
|
testFunc(promql.ErrTooManySamples(env))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAtModifier(t *testing.T) {
|
func TestAtModifier(t *testing.T) {
|
||||||
engine := newTestEngine()
|
engine := newTestEngine(t)
|
||||||
storage := promqltest.LoadedStorage(t, `
|
storage := promqltest.LoadedStorage(t, `
|
||||||
load 10s
|
load 10s
|
||||||
metric{job="1"} 0+1x1000
|
metric{job="1"} 0+1x1000
|
||||||
|
@ -1995,7 +2004,7 @@ func TestSubquerySelector(t *testing.T) {
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
t.Run("", func(t *testing.T) {
|
t.Run("", func(t *testing.T) {
|
||||||
engine := newTestEngine()
|
engine := newTestEngine(t)
|
||||||
storage := promqltest.LoadedStorage(t, tst.loadString)
|
storage := promqltest.LoadedStorage(t, tst.loadString)
|
||||||
t.Cleanup(func() { storage.Close() })
|
t.Cleanup(func() { storage.Close() })
|
||||||
|
|
||||||
|
@ -2016,7 +2025,7 @@ func TestSubquerySelector(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTimestampFunction_StepsMoreOftenThanSamples(t *testing.T) {
|
func TestTimestampFunction_StepsMoreOftenThanSamples(t *testing.T) {
|
||||||
engine := newTestEngine()
|
engine := newTestEngine(t)
|
||||||
storage := promqltest.LoadedStorage(t, `
|
storage := promqltest.LoadedStorage(t, `
|
||||||
load 1m
|
load 1m
|
||||||
metric 0+1x1000
|
metric 0+1x1000
|
||||||
|
@ -2086,6 +2095,9 @@ func TestQueryLogger_basic(t *testing.T) {
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
|
|
||||||
queryExec := func() {
|
queryExec := func() {
|
||||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||||
|
@ -2137,6 +2149,9 @@ func TestQueryLogger_fields(t *testing.T) {
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
|
|
||||||
f1 := NewFakeQueryLogger()
|
f1 := NewFakeQueryLogger()
|
||||||
engine.SetQueryLogger(f1)
|
engine.SetQueryLogger(f1)
|
||||||
|
@ -2166,6 +2181,9 @@ func TestQueryLogger_error(t *testing.T) {
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
|
|
||||||
f1 := NewFakeQueryLogger()
|
f1 := NewFakeQueryLogger()
|
||||||
engine.SetQueryLogger(f1)
|
engine.SetQueryLogger(f1)
|
||||||
|
@ -3049,6 +3067,9 @@ func TestEngineOptsValidation(t *testing.T) {
|
||||||
|
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
eng := promql.NewEngine(c.opts)
|
eng := promql.NewEngine(c.opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, eng.Close())
|
||||||
|
})
|
||||||
_, err1 := eng.NewInstantQuery(context.Background(), nil, nil, c.query, time.Unix(10, 0))
|
_, err1 := eng.NewInstantQuery(context.Background(), nil, nil, c.query, time.Unix(10, 0))
|
||||||
_, err2 := eng.NewRangeQuery(context.Background(), nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second)
|
_, err2 := eng.NewRangeQuery(context.Background(), nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second)
|
||||||
if c.fail {
|
if c.fail {
|
||||||
|
@ -3208,7 +3229,7 @@ func TestRangeQuery(t *testing.T) {
|
||||||
}
|
}
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
t.Run(c.Name, func(t *testing.T) {
|
t.Run(c.Name, func(t *testing.T) {
|
||||||
engine := newTestEngine()
|
engine := newTestEngine(t)
|
||||||
storage := promqltest.LoadedStorage(t, c.Load)
|
storage := promqltest.LoadedStorage(t, c.Load)
|
||||||
t.Cleanup(func() { storage.Close() })
|
t.Cleanup(func() { storage.Close() })
|
||||||
|
|
||||||
|
@ -3342,7 +3363,7 @@ func TestNativeHistogram_Sum_Count_Add_AvgOperator(t *testing.T) {
|
||||||
seriesName := "sparse_histogram_series"
|
seriesName := "sparse_histogram_series"
|
||||||
seriesNameOverTime := "sparse_histogram_series_over_time"
|
seriesNameOverTime := "sparse_histogram_series_over_time"
|
||||||
|
|
||||||
engine := newTestEngine()
|
engine := newTestEngine(t)
|
||||||
|
|
||||||
ts := idx0 * int64(10*time.Minute/time.Millisecond)
|
ts := idx0 * int64(10*time.Minute/time.Millisecond)
|
||||||
app := storage.Appender(context.Background())
|
app := storage.Appender(context.Background())
|
||||||
|
@ -3612,7 +3633,7 @@ func TestNativeHistogram_SubOperator(t *testing.T) {
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
for _, floatHisto := range []bool{true, false} {
|
for _, floatHisto := range []bool{true, false} {
|
||||||
t.Run(fmt.Sprintf("floatHistogram=%t %d", floatHisto, idx0), func(t *testing.T) {
|
t.Run(fmt.Sprintf("floatHistogram=%t %d", floatHisto, idx0), func(t *testing.T) {
|
||||||
engine := newTestEngine()
|
engine := newTestEngine(t)
|
||||||
storage := teststorage.New(t)
|
storage := teststorage.New(t)
|
||||||
t.Cleanup(func() { storage.Close() })
|
t.Cleanup(func() { storage.Close() })
|
||||||
|
|
||||||
|
@ -3773,7 +3794,7 @@ func TestNativeHistogram_MulDivOperator(t *testing.T) {
|
||||||
seriesName := "sparse_histogram_series"
|
seriesName := "sparse_histogram_series"
|
||||||
floatSeriesName := "float_series"
|
floatSeriesName := "float_series"
|
||||||
|
|
||||||
engine := newTestEngine()
|
engine := newTestEngine(t)
|
||||||
|
|
||||||
ts := idx0 * int64(10*time.Minute/time.Millisecond)
|
ts := idx0 * int64(10*time.Minute/time.Millisecond)
|
||||||
app := storage.Appender(context.Background())
|
app := storage.Appender(context.Background())
|
||||||
|
@ -3896,7 +3917,7 @@ metric 0 1 2
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
c := c
|
c := c
|
||||||
t.Run(c.name, func(t *testing.T) {
|
t.Run(c.name, func(t *testing.T) {
|
||||||
engine := promqltest.NewTestEngine(false, c.engineLookback, promqltest.DefaultMaxSamplesPerQuery)
|
engine := promqltest.NewTestEngine(t, false, c.engineLookback, promqltest.DefaultMaxSamplesPerQuery)
|
||||||
storage := promqltest.LoadedStorage(t, load)
|
storage := promqltest.LoadedStorage(t, load)
|
||||||
t.Cleanup(func() { storage.Close() })
|
t.Cleanup(func() { storage.Close() })
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,9 @@ func TestDeriv(t *testing.T) {
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
|
|
||||||
a := storage.Appender(context.Background())
|
a := storage.Appender(context.Background())
|
||||||
|
|
||||||
|
|
|
@ -27,12 +27,12 @@ import (
|
||||||
"github.com/prometheus/prometheus/util/teststorage"
|
"github.com/prometheus/prometheus/util/teststorage"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newTestEngine() *promql.Engine {
|
func newTestEngine(t *testing.T) *promql.Engine {
|
||||||
return promqltest.NewTestEngine(false, 0, promqltest.DefaultMaxSamplesPerQuery)
|
return promqltest.NewTestEngine(t, false, 0, promqltest.DefaultMaxSamplesPerQuery)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEvaluations(t *testing.T) {
|
func TestEvaluations(t *testing.T) {
|
||||||
promqltest.RunBuiltinTests(t, newTestEngine())
|
promqltest.RunBuiltinTests(t, newTestEngine(t))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run a lot of queries at the same time, to check for race conditions.
|
// Run a lot of queries at the same time, to check for race conditions.
|
||||||
|
@ -46,6 +46,9 @@ func TestConcurrentRangeQueries(t *testing.T) {
|
||||||
Timeout: 100 * time.Second,
|
Timeout: 100 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
|
|
||||||
const interval = 10000 // 10s interval.
|
const interval = 10000 // 10s interval.
|
||||||
// A day of data plus 10k steps.
|
// A day of data plus 10k steps.
|
||||||
|
|
|
@ -72,8 +72,8 @@ func LoadedStorage(t testutil.T, input string) *teststorage.TestStorage {
|
||||||
return test.storage
|
return test.storage
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamples int) *promql.Engine {
|
func NewTestEngine(t *testing.T, enablePerStepStats bool, lookbackDelta time.Duration, maxSamples int) *promql.Engine {
|
||||||
return promql.NewEngine(promql.EngineOpts{
|
ng := promql.NewEngine(promql.EngineOpts{
|
||||||
Logger: nil,
|
Logger: nil,
|
||||||
Reg: nil,
|
Reg: nil,
|
||||||
MaxSamples: maxSamples,
|
MaxSamples: maxSamples,
|
||||||
|
@ -84,6 +84,10 @@ func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamp
|
||||||
EnablePerStepStats: enablePerStepStats,
|
EnablePerStepStats: enablePerStepStats,
|
||||||
LookbackDelta: lookbackDelta,
|
LookbackDelta: lookbackDelta,
|
||||||
})
|
})
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, ng.Close())
|
||||||
|
})
|
||||||
|
return ng
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunBuiltinTests runs an acceptance test suite against the provided engine.
|
// RunBuiltinTests runs an acceptance test suite against the provided engine.
|
||||||
|
@ -1073,7 +1077,11 @@ func (ll *LazyLoader) Storage() storage.Storage {
|
||||||
// Close closes resources associated with the LazyLoader.
|
// Close closes resources associated with the LazyLoader.
|
||||||
func (ll *LazyLoader) Close() error {
|
func (ll *LazyLoader) Close() error {
|
||||||
ll.cancelCtx()
|
ll.cancelCtx()
|
||||||
return ll.storage.Close()
|
err := ll.queryEngine.Close()
|
||||||
|
if sErr := ll.storage.Close(); sErr != nil {
|
||||||
|
return errors.Join(sErr, err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeInt64Pointer(val int64) *int64 {
|
func makeInt64Pointer(val int64) *int64 {
|
||||||
|
|
|
@ -451,7 +451,7 @@ eval range from 0 to 5m step 5m testmetric
|
||||||
|
|
||||||
for name, testCase := range testCases {
|
for name, testCase := range testCases {
|
||||||
t.Run(name, func(t *testing.T) {
|
t.Run(name, func(t *testing.T) {
|
||||||
err := runTest(t, testCase.input, NewTestEngine(false, 0, DefaultMaxSamplesPerQuery))
|
err := runTest(t, testCase.input, NewTestEngine(t, false, 0, DefaultMaxSamplesPerQuery))
|
||||||
|
|
||||||
if testCase.expectedError == "" {
|
if testCase.expectedError == "" {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -36,16 +36,23 @@ import (
|
||||||
"github.com/prometheus/prometheus/util/testutil"
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
var testEngine = promql.NewEngine(promql.EngineOpts{
|
func testEngine(tb testing.TB) *promql.Engine {
|
||||||
Logger: nil,
|
tb.Helper()
|
||||||
Reg: nil,
|
e := promql.NewEngine(promql.EngineOpts{
|
||||||
MaxSamples: 10000,
|
Logger: nil,
|
||||||
Timeout: 100 * time.Second,
|
Reg: nil,
|
||||||
NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 },
|
MaxSamples: 10000,
|
||||||
EnableAtModifier: true,
|
Timeout: 100 * time.Second,
|
||||||
EnableNegativeOffset: true,
|
NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 },
|
||||||
EnablePerStepStats: true,
|
EnableAtModifier: true,
|
||||||
})
|
EnableNegativeOffset: true,
|
||||||
|
EnablePerStepStats: true,
|
||||||
|
})
|
||||||
|
tb.Cleanup(func() {
|
||||||
|
require.NoError(tb, e.Close())
|
||||||
|
})
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
func TestAlertingRuleState(t *testing.T) {
|
func TestAlertingRuleState(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
@ -225,12 +232,14 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
|
|
||||||
baseTime := time.Unix(0, 0)
|
baseTime := time.Unix(0, 0)
|
||||||
for i, result := range results {
|
for i, result := range results {
|
||||||
t.Logf("case %d", i)
|
t.Logf("case %d", i)
|
||||||
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
|
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
|
||||||
result[0].T = timestamp.FromTime(evalTime)
|
result[0].T = timestamp.FromTime(evalTime)
|
||||||
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
|
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
|
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
|
||||||
|
@ -247,7 +256,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
|
||||||
testutil.RequireEqual(t, result, filteredRes)
|
testutil.RequireEqual(t, result, filteredRes)
|
||||||
}
|
}
|
||||||
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
|
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
|
||||||
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
|
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Empty(t, res)
|
require.Empty(t, res)
|
||||||
}
|
}
|
||||||
|
@ -309,13 +318,15 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
|
|
||||||
evalTime := time.Unix(0, 0)
|
evalTime := time.Unix(0, 0)
|
||||||
result[0].T = timestamp.FromTime(evalTime)
|
result[0].T = timestamp.FromTime(evalTime)
|
||||||
result[1].T = timestamp.FromTime(evalTime)
|
result[1].T = timestamp.FromTime(evalTime)
|
||||||
|
|
||||||
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
|
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
|
||||||
res, err := ruleWithoutExternalLabels.Eval(
|
res, err := ruleWithoutExternalLabels.Eval(
|
||||||
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
|
context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0,
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
for _, smpl := range res {
|
for _, smpl := range res {
|
||||||
|
@ -329,7 +340,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err = ruleWithExternalLabels.Eval(
|
res, err = ruleWithExternalLabels.Eval(
|
||||||
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
|
context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0,
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
for _, smpl := range res {
|
for _, smpl := range res {
|
||||||
|
@ -406,9 +417,11 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
|
||||||
result[0].T = timestamp.FromTime(evalTime)
|
result[0].T = timestamp.FromTime(evalTime)
|
||||||
result[1].T = timestamp.FromTime(evalTime)
|
result[1].T = timestamp.FromTime(evalTime)
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
|
|
||||||
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
|
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
|
||||||
res, err := ruleWithoutExternalURL.Eval(
|
res, err := ruleWithoutExternalURL.Eval(
|
||||||
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
|
context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0,
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
for _, smpl := range res {
|
for _, smpl := range res {
|
||||||
|
@ -422,7 +435,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err = ruleWithExternalURL.Eval(
|
res, err = ruleWithExternalURL.Eval(
|
||||||
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
|
context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0,
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
for _, smpl := range res {
|
for _, smpl := range res {
|
||||||
|
@ -475,9 +488,11 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) {
|
||||||
evalTime := time.Unix(0, 0)
|
evalTime := time.Unix(0, 0)
|
||||||
result[0].T = timestamp.FromTime(evalTime)
|
result[0].T = timestamp.FromTime(evalTime)
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
|
|
||||||
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
|
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
|
||||||
res, err := rule.Eval(
|
res, err := rule.Eval(
|
||||||
context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
|
context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0,
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
for _, smpl := range res {
|
for _, smpl := range res {
|
||||||
|
@ -520,6 +535,8 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }};
|
||||||
)
|
)
|
||||||
evalTime := time.Unix(0, 0)
|
evalTime := time.Unix(0, 0)
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
|
|
||||||
startQueryCh := make(chan struct{})
|
startQueryCh := make(chan struct{})
|
||||||
getDoneCh := make(chan struct{})
|
getDoneCh := make(chan struct{})
|
||||||
slowQueryFunc := func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
|
slowQueryFunc := func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
|
||||||
|
@ -533,7 +550,7 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }};
|
||||||
require.Fail(t, "unexpected blocking when template expanding.")
|
require.Fail(t, "unexpected blocking when template expanding.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return EngineQueryFunc(testEngine, storage)(ctx, q, ts)
|
return EngineQueryFunc(ng, storage)(ctx, q, ts)
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
<-startQueryCh
|
<-startQueryCh
|
||||||
|
@ -579,6 +596,9 @@ func TestAlertingRuleDuplicate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||||
defer cancelCtx()
|
defer cancelCtx()
|
||||||
|
|
||||||
|
@ -642,9 +662,9 @@ func TestAlertingRuleLimit(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
evalTime := time.Unix(0, 0)
|
evalTime := time.Unix(0, 0)
|
||||||
|
ng := testEngine(t)
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
|
switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, test.limit); {
|
||||||
case err != nil:
|
case err != nil:
|
||||||
require.EqualError(t, err, test.err)
|
require.EqualError(t, err, test.err)
|
||||||
case test.err != "":
|
case test.err != "":
|
||||||
|
@ -866,12 +886,13 @@ func TestKeepFiringFor(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
baseTime := time.Unix(0, 0)
|
baseTime := time.Unix(0, 0)
|
||||||
for i, result := range results {
|
for i, result := range results {
|
||||||
t.Logf("case %d", i)
|
t.Logf("case %d", i)
|
||||||
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
|
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
|
||||||
result[0].T = timestamp.FromTime(evalTime)
|
result[0].T = timestamp.FromTime(evalTime)
|
||||||
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
|
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
|
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
|
||||||
|
@ -888,7 +909,7 @@ func TestKeepFiringFor(t *testing.T) {
|
||||||
testutil.RequireEqual(t, result, filteredRes)
|
testutil.RequireEqual(t, result, filteredRes)
|
||||||
}
|
}
|
||||||
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
|
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
|
||||||
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
|
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Empty(t, res)
|
require.Empty(t, res)
|
||||||
}
|
}
|
||||||
|
@ -923,9 +944,10 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
|
||||||
F: 1,
|
F: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
baseTime := time.Unix(0, 0)
|
baseTime := time.Unix(0, 0)
|
||||||
result.T = timestamp.FromTime(baseTime)
|
result.T = timestamp.FromTime(baseTime)
|
||||||
res, err := rule.Eval(context.TODO(), baseTime, EngineQueryFunc(testEngine, storage), nil, 0)
|
res, err := rule.Eval(context.TODO(), baseTime, EngineQueryFunc(ng, storage), nil, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Len(t, res, 2)
|
require.Len(t, res, 2)
|
||||||
|
@ -940,7 +962,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
evalTime := baseTime.Add(time.Minute)
|
evalTime := baseTime.Add(time.Minute)
|
||||||
res, err = rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
|
res, err = rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Empty(t, res)
|
require.Empty(t, res)
|
||||||
}
|
}
|
||||||
|
|
|
@ -157,12 +157,13 @@ func TestAlertingRule(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
for i, test := range tests {
|
for i, test := range tests {
|
||||||
t.Logf("case %d", i)
|
t.Logf("case %d", i)
|
||||||
|
|
||||||
evalTime := baseTime.Add(test.time)
|
evalTime := baseTime.Add(test.time)
|
||||||
|
|
||||||
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
|
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
|
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
|
||||||
|
@ -296,6 +297,7 @@ func TestForStateAddSamples(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
var forState float64
|
var forState float64
|
||||||
for i, test := range tests {
|
for i, test := range tests {
|
||||||
t.Logf("case %d", i)
|
t.Logf("case %d", i)
|
||||||
|
@ -308,7 +310,7 @@ func TestForStateAddSamples(t *testing.T) {
|
||||||
forState = float64(value.StaleNaN)
|
forState = float64(value.StaleNaN)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
|
res, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var filteredRes promql.Vector // After removing 'ALERTS' samples.
|
var filteredRes promql.Vector // After removing 'ALERTS' samples.
|
||||||
|
@ -359,8 +361,9 @@ func TestForStateRestore(t *testing.T) {
|
||||||
expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)
|
expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
opts := &ManagerOptions{
|
opts := &ManagerOptions{
|
||||||
QueryFunc: EngineQueryFunc(testEngine, storage),
|
QueryFunc: EngineQueryFunc(ng, storage),
|
||||||
Appendable: storage,
|
Appendable: storage,
|
||||||
Queryable: storage,
|
Queryable: storage,
|
||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
|
@ -528,6 +531,9 @@ func TestStaleness(t *testing.T) {
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(engineOpts)
|
engine := promql.NewEngine(engineOpts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
opts := &ManagerOptions{
|
opts := &ManagerOptions{
|
||||||
QueryFunc: EngineQueryFunc(engine, st),
|
QueryFunc: EngineQueryFunc(engine, st),
|
||||||
Appendable: st,
|
Appendable: st,
|
||||||
|
@ -720,6 +726,9 @@ func TestUpdate(t *testing.T) {
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
ruleManager := NewManager(&ManagerOptions{
|
ruleManager := NewManager(&ManagerOptions{
|
||||||
Appendable: st,
|
Appendable: st,
|
||||||
Queryable: st,
|
Queryable: st,
|
||||||
|
@ -858,6 +867,9 @@ func TestNotify(t *testing.T) {
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(engineOpts)
|
engine := promql.NewEngine(engineOpts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
var lastNotified []*Alert
|
var lastNotified []*Alert
|
||||||
notifyFunc := func(ctx context.Context, expr string, alerts ...*Alert) {
|
notifyFunc := func(ctx context.Context, expr string, alerts ...*Alert) {
|
||||||
lastNotified = alerts
|
lastNotified = alerts
|
||||||
|
@ -933,6 +945,9 @@ func TestMetricsUpdate(t *testing.T) {
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
ruleManager := NewManager(&ManagerOptions{
|
ruleManager := NewManager(&ManagerOptions{
|
||||||
Appendable: storage,
|
Appendable: storage,
|
||||||
Queryable: storage,
|
Queryable: storage,
|
||||||
|
@ -1007,6 +1022,9 @@ func TestGroupStalenessOnRemoval(t *testing.T) {
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
ruleManager := NewManager(&ManagerOptions{
|
ruleManager := NewManager(&ManagerOptions{
|
||||||
Appendable: storage,
|
Appendable: storage,
|
||||||
Queryable: storage,
|
Queryable: storage,
|
||||||
|
@ -1084,6 +1102,9 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) {
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
ruleManager := NewManager(&ManagerOptions{
|
ruleManager := NewManager(&ManagerOptions{
|
||||||
Appendable: storage,
|
Appendable: storage,
|
||||||
Queryable: storage,
|
Queryable: storage,
|
||||||
|
@ -1186,6 +1207,9 @@ func TestRuleHealthUpdates(t *testing.T) {
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}
|
}
|
||||||
engine := promql.NewEngine(engineOpts)
|
engine := promql.NewEngine(engineOpts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
opts := &ManagerOptions{
|
opts := &ManagerOptions{
|
||||||
QueryFunc: EngineQueryFunc(engine, st),
|
QueryFunc: EngineQueryFunc(engine, st),
|
||||||
Appendable: st,
|
Appendable: st,
|
||||||
|
@ -1282,9 +1306,10 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
testFunc := func(tst testInput) {
|
testFunc := func(tst testInput) {
|
||||||
opts := &ManagerOptions{
|
opts := &ManagerOptions{
|
||||||
QueryFunc: EngineQueryFunc(testEngine, storage),
|
QueryFunc: EngineQueryFunc(ng, storage),
|
||||||
Appendable: storage,
|
Appendable: storage,
|
||||||
Queryable: storage,
|
Queryable: storage,
|
||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
|
@ -1368,8 +1393,9 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) {
|
||||||
}
|
}
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
opts := &ManagerOptions{
|
opts := &ManagerOptions{
|
||||||
QueryFunc: EngineQueryFunc(testEngine, storage),
|
QueryFunc: EngineQueryFunc(ng, storage),
|
||||||
Appendable: storage,
|
Appendable: storage,
|
||||||
Queryable: storage,
|
Queryable: storage,
|
||||||
Context: context.Background(),
|
Context: context.Background(),
|
||||||
|
|
|
@ -123,10 +123,11 @@ func TestRuleEval(t *testing.T) {
|
||||||
storage := setUpRuleEvalTest(t)
|
storage := setUpRuleEvalTest(t)
|
||||||
t.Cleanup(func() { storage.Close() })
|
t.Cleanup(func() { storage.Close() })
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
for _, scenario := range ruleEvalTestScenarios {
|
for _, scenario := range ruleEvalTestScenarios {
|
||||||
t.Run(scenario.name, func(t *testing.T) {
|
t.Run(scenario.name, func(t *testing.T) {
|
||||||
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
|
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
|
||||||
result, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0)
|
result, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(ng, storage), nil, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
testutil.RequireEqual(t, scenario.expected, result)
|
testutil.RequireEqual(t, scenario.expected, result)
|
||||||
})
|
})
|
||||||
|
@ -137,6 +138,7 @@ func BenchmarkRuleEval(b *testing.B) {
|
||||||
storage := setUpRuleEvalTest(b)
|
storage := setUpRuleEvalTest(b)
|
||||||
b.Cleanup(func() { storage.Close() })
|
b.Cleanup(func() { storage.Close() })
|
||||||
|
|
||||||
|
ng := testEngine(b)
|
||||||
for _, scenario := range ruleEvalTestScenarios {
|
for _, scenario := range ruleEvalTestScenarios {
|
||||||
b.Run(scenario.name, func(b *testing.B) {
|
b.Run(scenario.name, func(b *testing.B) {
|
||||||
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
|
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
|
||||||
|
@ -144,7 +146,7 @@ func BenchmarkRuleEval(b *testing.B) {
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
_, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0)
|
_, err := rule.Eval(context.TODO(), ruleEvaluationTime, EngineQueryFunc(ng, storage), nil, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
}
|
}
|
||||||
|
@ -166,6 +168,9 @@ func TestRuleEvalDuplicate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
engine := promql.NewEngine(opts)
|
engine := promql.NewEngine(opts)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||||
defer cancelCtx()
|
defer cancelCtx()
|
||||||
|
|
||||||
|
@ -212,10 +217,11 @@ func TestRecordingRuleLimit(t *testing.T) {
|
||||||
labels.FromStrings("test", "test"),
|
labels.FromStrings("test", "test"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
evalTime := time.Unix(0, 0)
|
evalTime := time.Unix(0, 0)
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
|
switch _, err := rule.Eval(context.TODO(), evalTime, EngineQueryFunc(ng, storage), nil, test.limit); {
|
||||||
case err != nil:
|
case err != nil:
|
||||||
require.EqualError(t, err, test.err)
|
require.EqualError(t, err, test.err)
|
||||||
case test.err != "":
|
case test.err != "":
|
||||||
|
|
|
@ -59,16 +59,25 @@ import (
|
||||||
"github.com/prometheus/prometheus/util/teststorage"
|
"github.com/prometheus/prometheus/util/teststorage"
|
||||||
)
|
)
|
||||||
|
|
||||||
var testEngine = promql.NewEngine(promql.EngineOpts{
|
func testEngine(t *testing.T) *promql.Engine {
|
||||||
Logger: nil,
|
t.Helper()
|
||||||
Reg: nil,
|
|
||||||
MaxSamples: 10000,
|
ng := promql.NewEngine(promql.EngineOpts{
|
||||||
Timeout: 100 * time.Second,
|
Logger: nil,
|
||||||
NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 },
|
Reg: nil,
|
||||||
EnableAtModifier: true,
|
MaxSamples: 10000,
|
||||||
EnableNegativeOffset: true,
|
Timeout: 100 * time.Second,
|
||||||
EnablePerStepStats: true,
|
NoStepSubqueryIntervalFn: func(int64) int64 { return 60 * 1000 },
|
||||||
})
|
EnableAtModifier: true,
|
||||||
|
EnableNegativeOffset: true,
|
||||||
|
EnablePerStepStats: true,
|
||||||
|
})
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, ng.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
return ng
|
||||||
|
}
|
||||||
|
|
||||||
// testMetaStore satisfies the scrape.MetricMetadataStore interface.
|
// testMetaStore satisfies the scrape.MetricMetadataStore interface.
|
||||||
// It is used to inject specific metadata as part of a test case.
|
// It is used to inject specific metadata as part of a test case.
|
||||||
|
@ -283,6 +292,9 @@ func (m *rulesRetrieverMock) CreateRuleGroups() {
|
||||||
}
|
}
|
||||||
|
|
||||||
engine := promql.NewEngine(engineOpts)
|
engine := promql.NewEngine(engineOpts)
|
||||||
|
m.testing.Cleanup(func() {
|
||||||
|
require.NoError(m.testing, engine.Close())
|
||||||
|
})
|
||||||
opts := &rules.ManagerOptions{
|
opts := &rules.ManagerOptions{
|
||||||
QueryFunc: rules.EngineQueryFunc(engine, storage),
|
QueryFunc: rules.EngineQueryFunc(engine, storage),
|
||||||
Appendable: storage,
|
Appendable: storage,
|
||||||
|
@ -403,9 +415,10 @@ func TestEndpoints(t *testing.T) {
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
|
ng := testEngine(t)
|
||||||
|
|
||||||
t.Run("local", func(t *testing.T) {
|
t.Run("local", func(t *testing.T) {
|
||||||
algr := rulesRetrieverMock{}
|
algr := rulesRetrieverMock{testing: t}
|
||||||
algr.testing = t
|
|
||||||
|
|
||||||
algr.CreateAlertingRules()
|
algr.CreateAlertingRules()
|
||||||
algr.CreateRuleGroups()
|
algr.CreateRuleGroups()
|
||||||
|
@ -417,7 +430,7 @@ func TestEndpoints(t *testing.T) {
|
||||||
|
|
||||||
api := &API{
|
api := &API{
|
||||||
Queryable: storage,
|
Queryable: storage,
|
||||||
QueryEngine: testEngine,
|
QueryEngine: ng,
|
||||||
ExemplarQueryable: storage.ExemplarQueryable(),
|
ExemplarQueryable: storage.ExemplarQueryable(),
|
||||||
targetRetriever: testTargetRetriever.toFactory(),
|
targetRetriever: testTargetRetriever.toFactory(),
|
||||||
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
|
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
|
||||||
|
@ -468,8 +481,7 @@ func TestEndpoints(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
algr := rulesRetrieverMock{}
|
algr := rulesRetrieverMock{testing: t}
|
||||||
algr.testing = t
|
|
||||||
|
|
||||||
algr.CreateAlertingRules()
|
algr.CreateAlertingRules()
|
||||||
algr.CreateRuleGroups()
|
algr.CreateRuleGroups()
|
||||||
|
@ -481,7 +493,7 @@ func TestEndpoints(t *testing.T) {
|
||||||
|
|
||||||
api := &API{
|
api := &API{
|
||||||
Queryable: remote,
|
Queryable: remote,
|
||||||
QueryEngine: testEngine,
|
QueryEngine: ng,
|
||||||
ExemplarQueryable: storage.ExemplarQueryable(),
|
ExemplarQueryable: storage.ExemplarQueryable(),
|
||||||
targetRetriever: testTargetRetriever.toFactory(),
|
targetRetriever: testTargetRetriever.toFactory(),
|
||||||
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
|
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
|
||||||
|
@ -623,7 +635,7 @@ func TestQueryExemplars(t *testing.T) {
|
||||||
|
|
||||||
api := &API{
|
api := &API{
|
||||||
Queryable: storage,
|
Queryable: storage,
|
||||||
QueryEngine: testEngine,
|
QueryEngine: testEngine(t),
|
||||||
ExemplarQueryable: storage.ExemplarQueryable(),
|
ExemplarQueryable: storage.ExemplarQueryable(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -831,7 +843,7 @@ func TestStats(t *testing.T) {
|
||||||
|
|
||||||
api := &API{
|
api := &API{
|
||||||
Queryable: storage,
|
Queryable: storage,
|
||||||
QueryEngine: testEngine,
|
QueryEngine: testEngine(t),
|
||||||
now: func() time.Time {
|
now: func() time.Time {
|
||||||
return time.Unix(123, 0)
|
return time.Unix(123, 0)
|
||||||
},
|
},
|
||||||
|
|
|
@ -86,7 +86,7 @@ func TestApiStatusCodes(t *testing.T) {
|
||||||
"error from seriesset": errorTestQueryable{q: errorTestQuerier{s: errorTestSeriesSet{err: tc.err}}},
|
"error from seriesset": errorTestQueryable{q: errorTestQuerier{s: errorTestSeriesSet{err: tc.err}}},
|
||||||
} {
|
} {
|
||||||
t.Run(fmt.Sprintf("%s/%s", name, k), func(t *testing.T) {
|
t.Run(fmt.Sprintf("%s/%s", name, k), func(t *testing.T) {
|
||||||
r := createPrometheusAPI(q)
|
r := createPrometheusAPI(t, q)
|
||||||
rec := httptest.NewRecorder()
|
rec := httptest.NewRecorder()
|
||||||
|
|
||||||
req := httptest.NewRequest(http.MethodGet, "/api/v1/query?query=up", nil)
|
req := httptest.NewRequest(http.MethodGet, "/api/v1/query?query=up", nil)
|
||||||
|
@ -100,7 +100,9 @@ func TestApiStatusCodes(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router {
|
func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route.Router {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
engine := promql.NewEngine(promql.EngineOpts{
|
engine := promql.NewEngine(promql.EngineOpts{
|
||||||
Logger: log.NewNopLogger(),
|
Logger: log.NewNopLogger(),
|
||||||
Reg: nil,
|
Reg: nil,
|
||||||
|
@ -108,6 +110,9 @@ func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router {
|
||||||
MaxSamples: 100,
|
MaxSamples: 100,
|
||||||
Timeout: 5 * time.Second,
|
Timeout: 5 * time.Second,
|
||||||
})
|
})
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, engine.Close())
|
||||||
|
})
|
||||||
|
|
||||||
api := NewAPI(
|
api := NewAPI(
|
||||||
engine,
|
engine,
|
||||||
|
|
Loading…
Reference in a new issue