Merge branch 'prometheus:main' into patch-impl2

This commit is contained in:
Vanshika 2024-09-04 18:34:15 +05:30 committed by GitHub
commit 9d7d7cb66d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 356 additions and 110 deletions

View file

@ -999,12 +999,18 @@ func main() {
listeners, err := webHandler.Listeners()
if err != nil {
level.Error(logger).Log("msg", "Unable to start web listeners", "err", err)
if err := queryEngine.Close(); err != nil {
level.Warn(logger).Log("msg", "Closing query engine failed", "err", err)
}
os.Exit(1)
}
err = toolkit_web.Validate(*webConfig)
if err != nil {
level.Error(logger).Log("msg", "Unable to validate web configuration file", "err", err)
if err := queryEngine.Close(); err != nil {
level.Warn(logger).Log("msg", "Closing query engine failed", "err", err)
}
os.Exit(1)
}
@ -1026,6 +1032,9 @@ func main() {
case <-cancel:
reloadReady.Close()
}
if err := queryEngine.Close(); err != nil {
level.Warn(logger).Log("msg", "Closing query engine failed", "err", err)
}
return nil
},
func(err error) {

View file

@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/teststorage"
@ -274,7 +275,7 @@ func BenchmarkRangeQuery(b *testing.B) {
MaxSamples: 50000000,
Timeout: 100 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(b, opts)
const interval = 10000 // 10s interval.
// A day of data plus 10k steps.
@ -365,7 +366,7 @@ func BenchmarkNativeHistograms(b *testing.B) {
for _, tc := range cases {
b.Run(tc.name, func(b *testing.B) {
ng := promql.NewEngine(opts)
ng := promqltest.NewTestEngineWithOpts(b, opts)
for i := 0; i < b.N; i++ {
qry, err := ng.NewRangeQuery(context.Background(), testStorage, nil, tc.query, start, end, step)
if err != nil {

View file

@ -19,6 +19,7 @@ import (
"context"
"errors"
"fmt"
"io"
"math"
"reflect"
"runtime"
@ -271,6 +272,8 @@ func contextErr(err error, env string) error {
//
// 2) Enforcement of the maximum number of concurrent queries.
type QueryTracker interface {
io.Closer
// GetMaxConcurrent returns maximum number of concurrent queries that are allowed by this tracker.
GetMaxConcurrent() int
@ -430,6 +433,14 @@ func NewEngine(opts EngineOpts) *Engine {
}
}
// Close closes ng.
func (ng *Engine) Close() error {
if ng.activeQueryTracker != nil {
return ng.activeQueryTracker.Close()
}
return nil
}
// SetQueryLogger sets the query logger.
func (ng *Engine) SetQueryLogger(l QueryLogger) {
ng.queryLoggerLock.Lock()

View file

@ -17,7 +17,6 @@ import (
"context"
"errors"
"fmt"
"os"
"sort"
"strconv"
"sync"
@ -55,14 +54,7 @@ func TestMain(m *testing.M) {
func TestQueryConcurrency(t *testing.T) {
maxConcurrency := 10
dir, err := os.MkdirTemp("", "test_concurrency")
require.NoError(t, err)
defer os.RemoveAll(dir)
queryTracker := promql.NewActiveQueryTracker(dir, maxConcurrency, nil)
t.Cleanup(func() {
require.NoError(t, queryTracker.Close())
})
queryTracker := promql.NewActiveQueryTracker(t.TempDir(), maxConcurrency, nil)
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
@ -70,15 +62,17 @@ func TestQueryConcurrency(t *testing.T) {
Timeout: 100 * time.Second,
ActiveQueryTracker: queryTracker,
}
engine := promqltest.NewTestEngineWithOpts(t, opts)
engine := promql.NewEngine(opts)
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
t.Cleanup(cancelCtx)
block := make(chan struct{})
processing := make(chan struct{})
done := make(chan int)
defer close(done)
t.Cleanup(func() {
close(done)
})
f := func(context.Context) error {
select {
@ -163,7 +157,7 @@ func TestQueryTimeout(t *testing.T) {
MaxSamples: 10,
Timeout: 5 * time.Millisecond,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
@ -188,7 +182,7 @@ func TestQueryCancel(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
@ -262,7 +256,7 @@ func TestQueryError(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
errStorage := promql.ErrStorage{errors.New("storage error")}
queryable := storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
return &errQuerier{err: errStorage}, nil
@ -596,7 +590,7 @@ func TestSelectHintsSetCorrectly(t *testing.T) {
},
} {
t.Run(tc.query, func(t *testing.T) {
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
hintsRecorder := &noopHintRecordingQueryable{}
var (
@ -627,7 +621,7 @@ func TestEngineShutdown(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ctx, cancelCtx := context.WithCancel(context.Background())
block := make(chan struct{})
@ -763,7 +757,7 @@ load 10s
t.Run(fmt.Sprintf("%d query=%s", i, c.Query), func(t *testing.T) {
var err error
var qry promql.Query
engine := newTestEngine()
engine := newTestEngine(t)
if c.Interval == 0 {
qry, err = engine.NewInstantQuery(context.Background(), storage, nil, c.Query, c.Start)
} else {
@ -1305,7 +1299,7 @@ load 10s
for _, c := range cases {
t.Run(c.Query, func(t *testing.T) {
opts := promql.NewPrometheusQueryOpts(true, 0)
engine := promqltest.NewTestEngine(true, 0, promqltest.DefaultMaxSamplesPerQuery)
engine := promqltest.NewTestEngine(t, true, 0, promqltest.DefaultMaxSamplesPerQuery)
runQuery := func(expErr error) *stats.Statistics {
var err error
@ -1332,7 +1326,7 @@ load 10s
if c.SkipMaxCheck {
return
}
engine = promqltest.NewTestEngine(true, 0, stats.Samples.PeakSamples-1)
engine = promqltest.NewTestEngine(t, true, 0, stats.Samples.PeakSamples-1)
runQuery(promql.ErrTooManySamples(env))
})
}
@ -1485,7 +1479,7 @@ load 10s
for _, c := range cases {
t.Run(c.Query, func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
testFunc := func(expError error) {
var err error
var qry promql.Query
@ -1506,18 +1500,18 @@ load 10s
}
// Within limit.
engine = promqltest.NewTestEngine(false, 0, c.MaxSamples)
engine = promqltest.NewTestEngine(t, false, 0, c.MaxSamples)
testFunc(nil)
// Exceeding limit.
engine = promqltest.NewTestEngine(false, 0, c.MaxSamples-1)
engine = promqltest.NewTestEngine(t, false, 0, c.MaxSamples-1)
testFunc(promql.ErrTooManySamples(env))
})
}
}
func TestAtModifier(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := promqltest.LoadedStorage(t, `
load 10s
metric{job="1"} 0+1x1000
@ -2000,7 +1994,7 @@ func TestSubquerySelector(t *testing.T) {
},
} {
t.Run("", func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := promqltest.LoadedStorage(t, tst.loadString)
t.Cleanup(func() { storage.Close() })
@ -2049,7 +2043,7 @@ func TestQueryLogger_basic(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
queryExec := func() {
ctx, cancelCtx := context.WithCancel(context.Background())
@ -2100,7 +2094,7 @@ func TestQueryLogger_fields(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
f1 := NewFakeQueryLogger()
engine.SetQueryLogger(f1)
@ -2129,7 +2123,7 @@ func TestQueryLogger_error(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
f1 := NewFakeQueryLogger()
engine.SetQueryLogger(f1)
@ -3012,7 +3006,7 @@ func TestEngineOptsValidation(t *testing.T) {
}
for _, c := range cases {
eng := promql.NewEngine(c.opts)
eng := promqltest.NewTestEngineWithOpts(t, c.opts)
_, err1 := eng.NewInstantQuery(context.Background(), nil, nil, c.query, time.Unix(10, 0))
_, err2 := eng.NewRangeQuery(context.Background(), nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second)
if c.fail {
@ -3026,7 +3020,7 @@ func TestEngineOptsValidation(t *testing.T) {
}
func TestInstantQueryWithRangeVectorSelector(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
baseT := timestamp.Time(0)
storage := promqltest.LoadedStorage(t, `
@ -3280,7 +3274,7 @@ func TestNativeHistogram_SubOperator(t *testing.T) {
for _, c := range cases {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t %d", floatHisto, idx0), func(t *testing.T) {
engine := newTestEngine()
engine := newTestEngine(t)
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })
@ -3399,7 +3393,7 @@ metric 0 1 2
for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
engine := promqltest.NewTestEngine(false, c.engineLookback, promqltest.DefaultMaxSamplesPerQuery)
engine := promqltest.NewTestEngine(t, false, c.engineLookback, promqltest.DefaultMaxSamplesPerQuery)
storage := promqltest.LoadedStorage(t, load)
t.Cleanup(func() { storage.Close() })
@ -3436,7 +3430,7 @@ histogram {{sum:4 count:4 buckets:[2 2]}} {{sum:6 count:6 buckets:[3 3]}} {{sum:
`
storage := promqltest.LoadedStorage(t, load)
t.Cleanup(func() { storage.Close() })
engine := promqltest.NewTestEngine(false, 0, promqltest.DefaultMaxSamplesPerQuery)
engine := promqltest.NewTestEngine(t, false, 0, promqltest.DefaultMaxSamplesPerQuery)
verify := func(t *testing.T, qry promql.Query, expected []histogram.FloatHistogram) {
res := qry.Exec(context.Background())

View file

@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/util/teststorage"
)
@ -39,7 +40,7 @@ func TestDeriv(t *testing.T) {
MaxSamples: 10000,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
a := storage.Appender(context.Background())

View file

@ -28,12 +28,12 @@ import (
"github.com/prometheus/prometheus/util/teststorage"
)
func newTestEngine() *promql.Engine {
return promqltest.NewTestEngine(false, 0, promqltest.DefaultMaxSamplesPerQuery)
func newTestEngine(t *testing.T) *promql.Engine {
return promqltest.NewTestEngine(t, false, 0, promqltest.DefaultMaxSamplesPerQuery)
}
func TestEvaluations(t *testing.T) {
promqltest.RunBuiltinTests(t, newTestEngine())
promqltest.RunBuiltinTests(t, newTestEngine(t))
}
// Run a lot of queries at the same time, to check for race conditions.
@ -48,7 +48,7 @@ func TestConcurrentRangeQueries(t *testing.T) {
}
// Enable experimental functions testing
parser.EnableExperimentalFunctions = true
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
const interval = 10000 // 10s interval.
// A day of data plus 10k steps.

View file

@ -79,8 +79,9 @@ func LoadedStorage(t testutil.T, input string) *teststorage.TestStorage {
return test.storage
}
func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamples int) *promql.Engine {
return promql.NewEngine(promql.EngineOpts{
// NewTestEngine creates a promql.Engine with enablePerStepStats, lookbackDelta and maxSamples, and returns it.
func NewTestEngine(tb testing.TB, enablePerStepStats bool, lookbackDelta time.Duration, maxSamples int) *promql.Engine {
return NewTestEngineWithOpts(tb, promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: maxSamples,
@ -94,6 +95,16 @@ func NewTestEngine(enablePerStepStats bool, lookbackDelta time.Duration, maxSamp
})
}
// NewTestEngineWithOpts creates a promql.Engine with opts and returns it.
func NewTestEngineWithOpts(tb testing.TB, opts promql.EngineOpts) *promql.Engine {
tb.Helper()
ng := promql.NewEngine(opts)
tb.Cleanup(func() {
require.NoError(tb, ng.Close())
})
return ng
}
// RunBuiltinTests runs an acceptance test suite against the provided engine.
func RunBuiltinTests(t TBRun, engine promql.QueryEngine) {
t.Cleanup(func() { parser.EnableExperimentalFunctions = false })
@ -1436,7 +1447,11 @@ func (ll *LazyLoader) Storage() storage.Storage {
// Close closes resources associated with the LazyLoader.
func (ll *LazyLoader) Close() error {
ll.cancelCtx()
return ll.storage.Close()
err := ll.queryEngine.Close()
if sErr := ll.storage.Close(); sErr != nil {
return errors.Join(sErr, err)
}
return err
}
func makeInt64Pointer(val int64) *int64 {

View file

@ -595,7 +595,7 @@ eval range from 0 to 5m step 5m testmetric
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
err := runTest(t, testCase.input, NewTestEngine(false, 0, DefaultMaxSamplesPerQuery))
err := runTest(t, testCase.input, NewTestEngine(t, false, 0, DefaultMaxSamplesPerQuery))
if testCase.expectedError == "" {
require.NoError(t, err)

View file

@ -36,7 +36,9 @@ import (
"github.com/prometheus/prometheus/util/testutil"
)
var testEngine = promql.NewEngine(promql.EngineOpts{
func testEngine(tb testing.TB) *promql.Engine {
tb.Helper()
return promqltest.NewTestEngineWithOpts(tb, promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
@ -45,7 +47,8 @@ var testEngine = promql.NewEngine(promql.EngineOpts{
EnableAtModifier: true,
EnableNegativeOffset: true,
EnablePerStepStats: true,
})
})
}
func TestAlertingRuleState(t *testing.T) {
tests := []struct {
@ -225,12 +228,14 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
},
}
ng := testEngine(t)
baseTime := time.Unix(0, 0)
for i, result := range results {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@ -247,7 +252,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
testutil.RequireEqual(t, result, filteredRes)
}
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
require.Empty(t, res)
}
@ -309,13 +314,15 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
},
}
ng := testEngine(t)
evalTime := time.Unix(0, 0)
result[0].T = timestamp.FromTime(evalTime)
result[1].T = timestamp.FromTime(evalTime)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalLabels.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
@ -329,7 +336,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
}
res, err = ruleWithExternalLabels.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
@ -406,9 +413,11 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
result[0].T = timestamp.FromTime(evalTime)
result[1].T = timestamp.FromTime(evalTime)
ng := testEngine(t)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalURL.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
@ -422,7 +431,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
}
res, err = ruleWithExternalURL.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
@ -475,9 +484,11 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) {
evalTime := time.Unix(0, 0)
result[0].T = timestamp.FromTime(evalTime)
ng := testEngine(t)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := rule.Eval(
context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0,
)
require.NoError(t, err)
for _, smpl := range res {
@ -520,6 +531,8 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }};
)
evalTime := time.Unix(0, 0)
ng := testEngine(t)
startQueryCh := make(chan struct{})
getDoneCh := make(chan struct{})
slowQueryFunc := func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
@ -533,7 +546,7 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }};
require.Fail(t, "unexpected blocking when template expanding.")
}
}
return EngineQueryFunc(testEngine, storage)(ctx, q, ts)
return EngineQueryFunc(ng, storage)(ctx, q, ts)
}
go func() {
<-startQueryCh
@ -578,7 +591,7 @@ func TestAlertingRuleDuplicate(t *testing.T) {
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
@ -642,9 +655,9 @@ func TestAlertingRuleLimit(t *testing.T) {
)
evalTime := time.Unix(0, 0)
ng := testEngine(t)
for _, test := range tests {
switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, test.limit); {
case err != nil:
require.EqualError(t, err, test.err)
case test.err != "":
@ -866,12 +879,13 @@ func TestKeepFiringFor(t *testing.T) {
},
}
ng := testEngine(t)
baseTime := time.Unix(0, 0)
for i, result := range results {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@ -888,7 +902,7 @@ func TestKeepFiringFor(t *testing.T) {
testutil.RequireEqual(t, result, filteredRes)
}
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
require.Empty(t, res)
}
@ -923,9 +937,10 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
F: 1,
}
ng := testEngine(t)
baseTime := time.Unix(0, 0)
result.T = timestamp.FromTime(baseTime)
res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
require.Len(t, res, 2)
@ -940,7 +955,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
}
evalTime := baseTime.Add(time.Minute)
res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
require.Empty(t, res)
}

View file

@ -158,12 +158,13 @@ func TestAlertingRule(t *testing.T) {
},
}
ng := testEngine(t)
for i, test := range tests {
t.Logf("case %d", i)
evalTime := baseTime.Add(test.time)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@ -299,6 +300,7 @@ func TestForStateAddSamples(t *testing.T) {
},
}
ng := testEngine(t)
var forState float64
for i, test := range tests {
t.Logf("case %d", i)
@ -311,7 +313,7 @@ func TestForStateAddSamples(t *testing.T) {
forState = float64(value.StaleNaN)
}
res, err := rule.Eval(context.TODO(), queryOffset, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
res, err := rule.Eval(context.TODO(), queryOffset, evalTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS' samples.
@ -366,8 +368,9 @@ func TestForStateRestore(t *testing.T) {
expr, err := parser.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)
require.NoError(t, err)
ng := testEngine(t)
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(testEngine, storage),
QueryFunc: EngineQueryFunc(ng, storage),
Appendable: storage,
Queryable: storage,
Context: context.Background(),
@ -538,7 +541,7 @@ func TestStaleness(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(engineOpts)
engine := promqltest.NewTestEngineWithOpts(t, engineOpts)
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(engine, st),
Appendable: st,
@ -772,7 +775,7 @@ func TestUpdate(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: st,
Queryable: st,
@ -910,7 +913,7 @@ func TestNotify(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(engineOpts)
engine := promqltest.NewTestEngineWithOpts(t, engineOpts)
var lastNotified []*Alert
notifyFunc := func(ctx context.Context, expr string, alerts ...*Alert) {
lastNotified = alerts
@ -985,7 +988,7 @@ func TestMetricsUpdate(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: storage,
Queryable: storage,
@ -1059,7 +1062,7 @@ func TestGroupStalenessOnRemoval(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: storage,
Queryable: storage,
@ -1136,7 +1139,7 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: storage,
Queryable: storage,
@ -1285,7 +1288,7 @@ func TestRuleHealthUpdates(t *testing.T) {
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(engineOpts)
engine := promqltest.NewTestEngineWithOpts(t, engineOpts)
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(engine, st),
Appendable: st,
@ -1382,9 +1385,10 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) {
},
}
ng := testEngine(t)
testFunc := func(tst testInput) {
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(testEngine, storage),
QueryFunc: EngineQueryFunc(ng, storage),
Appendable: storage,
Queryable: storage,
Context: context.Background(),
@ -1468,8 +1472,9 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) {
}
require.NoError(t, app.Commit())
ng := testEngine(t)
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(testEngine, storage),
QueryFunc: EngineQueryFunc(ng, storage),
Appendable: storage,
Queryable: storage,
Context: context.Background(),

View file

@ -123,10 +123,11 @@ func TestRuleEval(t *testing.T) {
storage := setUpRuleEvalTest(t)
t.Cleanup(func() { storage.Close() })
ng := testEngine(t)
for _, scenario := range ruleEvalTestScenarios {
t.Run(scenario.name, func(t *testing.T) {
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
result, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0)
result, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(ng, storage), nil, 0)
require.NoError(t, err)
testutil.RequireEqual(t, scenario.expected, result)
})
@ -137,6 +138,7 @@ func BenchmarkRuleEval(b *testing.B) {
storage := setUpRuleEvalTest(b)
b.Cleanup(func() { storage.Close() })
ng := testEngine(b)
for _, scenario := range ruleEvalTestScenarios {
b.Run(scenario.name, func(b *testing.B) {
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
@ -144,7 +146,7 @@ func BenchmarkRuleEval(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0)
_, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(ng, storage), nil, 0)
if err != nil {
require.NoError(b, err)
}
@ -165,7 +167,7 @@ func TestRuleEvalDuplicate(t *testing.T) {
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
engine := promqltest.NewTestEngineWithOpts(t, opts)
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()
@ -212,10 +214,11 @@ func TestRecordingRuleLimit(t *testing.T) {
labels.FromStrings("test", "test"),
)
ng := testEngine(t)
evalTime := time.Unix(0, 0)
for _, test := range tests {
switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(ng, storage), nil, test.limit); {
case err != nil:
require.EqualError(t, err, test.err)
case test.err != "":

View file

@ -974,6 +974,7 @@ func (it *floatHistogramIterator) Reset(b []byte) {
if it.atFloatHistogramCalled {
it.atFloatHistogramCalled = false
it.pBuckets, it.nBuckets = nil, nil
it.pSpans, it.nSpans = nil, nil
} else {
it.pBuckets, it.nBuckets = it.pBuckets[:0], it.nBuckets[:0]
}
@ -1069,7 +1070,7 @@ func (it *floatHistogramIterator) Next() ValueType {
// The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code,
// so we don't need a separate single delta logic for the 2nd sample.
// Recycle bucket slices that have not been returned yet. Otherwise, copy them.
// Recycle bucket and span slices that have not been returned yet. Otherwise, copy them.
// We can always recycle the slices for leading and trailing bits as they are
// never returned to the caller.
if it.atFloatHistogramCalled {
@ -1088,6 +1089,20 @@ func (it *floatHistogramIterator) Next() ValueType {
} else {
it.nBuckets = nil
}
if len(it.pSpans) > 0 {
newSpans := make([]histogram.Span, len(it.pSpans))
copy(newSpans, it.pSpans)
it.pSpans = newSpans
} else {
it.pSpans = nil
}
if len(it.nSpans) > 0 {
newSpans := make([]histogram.Span, len(it.nSpans))
copy(newSpans, it.nSpans)
it.nSpans = newSpans
} else {
it.nSpans = nil
}
}
tDod, err := readVarbitInt(&it.br)

View file

@ -1306,3 +1306,54 @@ func TestFloatHistogramAppendOnlyErrors(t *testing.T) {
require.EqualError(t, err, "float histogram counter reset")
})
}
func TestFloatHistogramUniqueSpansAfterNext(t *testing.T) {
// Create two histograms with the same schema and spans.
h1 := &histogram.FloatHistogram{
Schema: 1,
ZeroThreshold: 1e-100,
Count: 10,
ZeroCount: 2,
Sum: 15.0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []float64{1, 2, 3, 4},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 1},
},
NegativeBuckets: []float64{2},
}
h2 := h1.Copy()
// Create a chunk and append both histograms.
c := NewFloatHistogramChunk()
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendFloatHistogram(nil, 0, h1, false)
require.NoError(t, err)
_, _, _, err = app.AppendFloatHistogram(nil, 1, h2, false)
require.NoError(t, err)
// Create an iterator and advance to the first histogram.
it := c.Iterator(nil)
require.Equal(t, ValFloatHistogram, it.Next())
_, rh1 := it.AtFloatHistogram(nil)
// Advance to the second histogram and retrieve it.
require.Equal(t, ValFloatHistogram, it.Next())
_, rh2 := it.AtFloatHistogram(nil)
require.Equal(t, rh1.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected")
require.Equal(t, rh1.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected")
require.Equal(t, rh2.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected")
require.Equal(t, rh2.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected")
// Check that the spans for h1 and h2 are unique slices.
require.NotSame(t, &rh1.PositiveSpans[0], &rh2.PositiveSpans[0], "PositiveSpans should be unique between histograms")
require.NotSame(t, &rh1.NegativeSpans[0], &rh2.NegativeSpans[0], "NegativeSpans should be unique between histograms")
}

View file

@ -1073,6 +1073,7 @@ func (it *histogramIterator) Reset(b []byte) {
if it.atHistogramCalled {
it.atHistogramCalled = false
it.pBuckets, it.nBuckets = nil, nil
it.pSpans, it.nSpans = nil, nil
} else {
it.pBuckets = it.pBuckets[:0]
it.nBuckets = it.nBuckets[:0]
@ -1185,8 +1186,25 @@ func (it *histogramIterator) Next() ValueType {
// The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code,
// so we don't need a separate single delta logic for the 2nd sample.
// Recycle bucket slices that have not been returned yet. Otherwise,
// Recycle bucket and span slices that have not been returned yet. Otherwise, copy them.
// copy them.
if it.atFloatHistogramCalled || it.atHistogramCalled {
if len(it.pSpans) > 0 {
newSpans := make([]histogram.Span, len(it.pSpans))
copy(newSpans, it.pSpans)
it.pSpans = newSpans
} else {
it.pSpans = nil
}
if len(it.nSpans) > 0 {
newSpans := make([]histogram.Span, len(it.nSpans))
copy(newSpans, it.nSpans)
it.nSpans = newSpans
} else {
it.nSpans = nil
}
}
if it.atHistogramCalled {
it.atHistogramCalled = false
if len(it.pBuckets) > 0 {
@ -1204,6 +1222,7 @@ func (it *histogramIterator) Next() ValueType {
it.nBuckets = nil
}
}
// FloatBuckets are set from scratch, so simply create empty ones.
if it.atFloatHistogramCalled {
it.atFloatHistogramCalled = false

View file

@ -1487,6 +1487,108 @@ func TestHistogramAppendOnlyErrors(t *testing.T) {
})
}
func TestHistogramUniqueSpansAfterNext(t *testing.T) {
// Create two histograms with the same schema and spans.
h1 := &histogram.Histogram{
Schema: 1,
ZeroThreshold: 1e-100,
Count: 10,
ZeroCount: 2,
Sum: 15.0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{1, 2, 3, 4},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 1},
},
NegativeBuckets: []int64{2},
}
h2 := h1.Copy()
// Create a chunk and append both histograms.
c := NewHistogramChunk()
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 0, h1, false)
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 1, h2, false)
require.NoError(t, err)
// Create an iterator and advance to the first histogram.
it := c.Iterator(nil)
require.Equal(t, ValHistogram, it.Next())
_, rh1 := it.AtHistogram(nil)
// Advance to the second histogram and retrieve it.
require.Equal(t, ValHistogram, it.Next())
_, rh2 := it.AtHistogram(nil)
require.Equal(t, rh1.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected")
require.Equal(t, rh1.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected")
require.Equal(t, rh2.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected")
require.Equal(t, rh2.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected")
// Check that the spans for h1 and h2 are unique slices.
require.NotSame(t, &rh1.PositiveSpans[0], &rh2.PositiveSpans[0], "PositiveSpans should be unique between histograms")
require.NotSame(t, &rh1.NegativeSpans[0], &rh2.NegativeSpans[0], "NegativeSpans should be unique between histograms")
}
func TestHistogramUniqueSpansAfterNextWithAtFloatHistogram(t *testing.T) {
// Create two histograms with the same schema and spans.
h1 := &histogram.Histogram{
Schema: 1,
ZeroThreshold: 1e-100,
Count: 10,
ZeroCount: 2,
Sum: 15.0,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{1, 2, 3, 4},
NegativeSpans: []histogram.Span{
{Offset: 1, Length: 1},
},
NegativeBuckets: []int64{2},
}
h2 := h1.Copy()
// Create a chunk and append both histograms.
c := NewHistogramChunk()
app, err := c.Appender()
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 0, h1, false)
require.NoError(t, err)
_, _, _, err = app.AppendHistogram(nil, 1, h2, false)
require.NoError(t, err)
// Create an iterator and advance to the first histogram.
it := c.Iterator(nil)
require.Equal(t, ValHistogram, it.Next())
_, rh1 := it.AtFloatHistogram(nil)
// Advance to the second histogram and retrieve it.
require.Equal(t, ValHistogram, it.Next())
_, rh2 := it.AtFloatHistogram(nil)
require.Equal(t, rh1.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected")
require.Equal(t, rh1.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected")
require.Equal(t, rh2.PositiveSpans, h1.PositiveSpans, "Returned positive spans are as expected")
require.Equal(t, rh2.NegativeSpans, h1.NegativeSpans, "Returned negative spans are as expected")
// Check that the spans for h1 and h2 are unique slices.
require.NotSame(t, &rh1.PositiveSpans[0], &rh2.PositiveSpans[0], "PositiveSpans should be unique between histograms")
require.NotSame(t, &rh1.NegativeSpans[0], &rh2.NegativeSpans[0], "NegativeSpans should be unique between histograms")
}
func BenchmarkAppendable(b *testing.B) {
// Create a histogram with a bunch of spans and buckets.
const (

View file

@ -59,7 +59,9 @@ import (
"github.com/prometheus/prometheus/util/teststorage"
)
var testEngine = promql.NewEngine(promql.EngineOpts{
func testEngine(t *testing.T) *promql.Engine {
t.Helper()
return promqltest.NewTestEngineWithOpts(t, promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
@ -68,7 +70,8 @@ var testEngine = promql.NewEngine(promql.EngineOpts{
EnableAtModifier: true,
EnableNegativeOffset: true,
EnablePerStepStats: true,
})
})
}
// testMetaStore satisfies the scrape.MetricMetadataStore interface.
// It is used to inject specific metadata as part of a test case.
@ -306,8 +309,7 @@ func (m *rulesRetrieverMock) CreateRuleGroups() {
MaxSamples: 10,
Timeout: 100 * time.Second,
}
engine := promql.NewEngine(engineOpts)
engine := promqltest.NewTestEngineWithOpts(m.testing, engineOpts)
opts := &rules.ManagerOptions{
QueryFunc: rules.EngineQueryFunc(engine, storage),
Appendable: storage,
@ -431,9 +433,10 @@ func TestEndpoints(t *testing.T) {
now := time.Now()
ng := testEngine(t)
t.Run("local", func(t *testing.T) {
algr := rulesRetrieverMock{}
algr.testing = t
algr := rulesRetrieverMock{testing: t}
algr.CreateAlertingRules()
algr.CreateRuleGroups()
@ -445,7 +448,7 @@ func TestEndpoints(t *testing.T) {
api := &API{
Queryable: storage,
QueryEngine: testEngine,
QueryEngine: ng,
ExemplarQueryable: storage.ExemplarQueryable(),
targetRetriever: testTargetRetriever.toFactory(),
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
@ -496,8 +499,7 @@ func TestEndpoints(t *testing.T) {
})
require.NoError(t, err)
algr := rulesRetrieverMock{}
algr.testing = t
algr := rulesRetrieverMock{testing: t}
algr.CreateAlertingRules()
algr.CreateRuleGroups()
@ -509,7 +511,7 @@ func TestEndpoints(t *testing.T) {
api := &API{
Queryable: remote,
QueryEngine: testEngine,
QueryEngine: ng,
ExemplarQueryable: storage.ExemplarQueryable(),
targetRetriever: testTargetRetriever.toFactory(),
alertmanagerRetriever: testAlertmanagerRetriever{}.toFactory(),
@ -651,7 +653,7 @@ func TestQueryExemplars(t *testing.T) {
api := &API{
Queryable: storage,
QueryEngine: testEngine,
QueryEngine: testEngine(t),
ExemplarQueryable: storage.ExemplarQueryable(),
}
@ -870,7 +872,7 @@ func TestStats(t *testing.T) {
api := &API{
Queryable: storage,
QueryEngine: testEngine,
QueryEngine: testEngine(t),
now: func() time.Time {
return time.Unix(123, 0)
},

View file

@ -32,6 +32,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/storage"
@ -86,7 +87,7 @@ func TestApiStatusCodes(t *testing.T) {
"error from seriesset": errorTestQueryable{q: errorTestQuerier{s: errorTestSeriesSet{err: tc.err}}},
} {
t.Run(fmt.Sprintf("%s/%s", name, k), func(t *testing.T) {
r := createPrometheusAPI(q)
r := createPrometheusAPI(t, q)
rec := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/api/v1/query?query=up", nil)
@ -100,8 +101,10 @@ func TestApiStatusCodes(t *testing.T) {
}
}
func createPrometheusAPI(q storage.SampleAndChunkQueryable) *route.Router {
engine := promql.NewEngine(promql.EngineOpts{
func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route.Router {
t.Helper()
engine := promqltest.NewTestEngineWithOpts(t, promql.EngineOpts{
Logger: log.NewNopLogger(),
Reg: nil,
ActiveQueryTracker: nil,