mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-21 03:16:00 -08:00
Merge pull request #485 from grafana/yuri/bring-prometheus-upstream
Synch with Prometheus up to 2023-04-18 (b028112
)
This commit is contained in:
commit
d162bb51b6
|
@ -434,7 +434,7 @@ func (tg *testGroup) maxEvalTime() time.Duration {
|
||||||
}
|
}
|
||||||
|
|
||||||
func query(ctx context.Context, qs string, t time.Time, engine *promql.Engine, qu storage.Queryable) (promql.Vector, error) {
|
func query(ctx context.Context, qs string, t time.Time, engine *promql.Engine, qu storage.Queryable) (promql.Vector, error) {
|
||||||
q, err := engine.NewInstantQuery(qu, nil, qs, t)
|
q, err := engine.NewInstantQuery(ctx, qu, nil, qs, t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,8 +56,14 @@ func (ls labelSlice) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i] }
|
||||||
func (ls labelSlice) Less(i, j int) bool { return ls[i].Name < ls[j].Name }
|
func (ls labelSlice) Less(i, j int) bool { return ls[i].Name < ls[j].Name }
|
||||||
|
|
||||||
func decodeSize(data string, index int) (int, int) {
|
func decodeSize(data string, index int) (int, int) {
|
||||||
var size int
|
// Fast-path for common case of a single byte, value 0..127.
|
||||||
for shift := uint(0); ; shift += 7 {
|
b := data[index]
|
||||||
|
index++
|
||||||
|
if b < 0x80 {
|
||||||
|
return int(b), index
|
||||||
|
}
|
||||||
|
size := int(b & 0x7F)
|
||||||
|
for shift := uint(7); ; shift += 7 {
|
||||||
// Just panic if we go of the end of data, since all Labels strings are constructed internally and
|
// Just panic if we go of the end of data, since all Labels strings are constructed internally and
|
||||||
// malformed data indicates a bug, or memory corruption.
|
// malformed data indicates a bug, or memory corruption.
|
||||||
b := data[index]
|
b := data[index]
|
||||||
|
|
|
@ -240,16 +240,17 @@ func BenchmarkRangeQuery(b *testing.B) {
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
name := fmt.Sprintf("expr=%s,steps=%d", c.expr, c.steps)
|
name := fmt.Sprintf("expr=%s,steps=%d", c.expr, c.steps)
|
||||||
b.Run(name, func(b *testing.B) {
|
b.Run(name, func(b *testing.B) {
|
||||||
|
ctx := context.Background()
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
qry, err := engine.NewRangeQuery(
|
qry, err := engine.NewRangeQuery(
|
||||||
stor, nil, c.expr,
|
ctx, stor, nil, c.expr,
|
||||||
time.Unix(int64((numIntervals-c.steps)*10), 0),
|
time.Unix(int64((numIntervals-c.steps)*10), 0),
|
||||||
time.Unix(int64(numIntervals*10), 0), time.Second*10)
|
time.Unix(int64(numIntervals*10), 0), time.Second*10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
res := qry.Exec(context.Background())
|
res := qry.Exec(ctx)
|
||||||
if res.Err != nil {
|
if res.Err != nil {
|
||||||
b.Fatal(res.Err)
|
b.Fatal(res.Err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -400,7 +400,7 @@ func (ng *Engine) SetQueryLogger(l QueryLogger) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewInstantQuery returns an evaluation query for the given expression at the given time.
|
// NewInstantQuery returns an evaluation query for the given expression at the given time.
|
||||||
func (ng *Engine) NewInstantQuery(q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) {
|
func (ng *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (Query, error) {
|
||||||
expr, err := parser.ParseExpr(qs)
|
expr, err := parser.ParseExpr(qs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -416,7 +416,7 @@ func (ng *Engine) NewInstantQuery(q storage.Queryable, opts *QueryOpts, qs strin
|
||||||
|
|
||||||
// NewRangeQuery returns an evaluation query for the given time range and with
|
// NewRangeQuery returns an evaluation query for the given time range and with
|
||||||
// the resolution set by the interval.
|
// the resolution set by the interval.
|
||||||
func (ng *Engine) NewRangeQuery(q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) {
|
func (ng *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) {
|
||||||
expr, err := parser.ParseExpr(qs)
|
expr, err := parser.ParseExpr(qs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -231,14 +231,14 @@ func TestQueryError(t *testing.T) {
|
||||||
ctx, cancelCtx := context.WithCancel(context.Background())
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||||
defer cancelCtx()
|
defer cancelCtx()
|
||||||
|
|
||||||
vectorQuery, err := engine.NewInstantQuery(queryable, nil, "foo", time.Unix(1, 0))
|
vectorQuery, err := engine.NewInstantQuery(ctx, queryable, nil, "foo", time.Unix(1, 0))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
res := vectorQuery.Exec(ctx)
|
res := vectorQuery.Exec(ctx)
|
||||||
require.Error(t, res.Err, "expected error on failed select but got none")
|
require.Error(t, res.Err, "expected error on failed select but got none")
|
||||||
require.True(t, errors.Is(res.Err, errStorage), "expected error doesn't match")
|
require.True(t, errors.Is(res.Err, errStorage), "expected error doesn't match")
|
||||||
|
|
||||||
matrixQuery, err := engine.NewInstantQuery(queryable, nil, "foo[1m]", time.Unix(1, 0))
|
matrixQuery, err := engine.NewInstantQuery(ctx, queryable, nil, "foo[1m]", time.Unix(1, 0))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
res = matrixQuery.Exec(ctx)
|
res = matrixQuery.Exec(ctx)
|
||||||
|
@ -564,14 +564,15 @@ func TestSelectHintsSetCorrectly(t *testing.T) {
|
||||||
query Query
|
query Query
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
ctx := context.Background()
|
||||||
if tc.end == 0 {
|
if tc.end == 0 {
|
||||||
query, err = engine.NewInstantQuery(hintsRecorder, nil, tc.query, timestamp.Time(tc.start))
|
query, err = engine.NewInstantQuery(ctx, hintsRecorder, nil, tc.query, timestamp.Time(tc.start))
|
||||||
} else {
|
} else {
|
||||||
query, err = engine.NewRangeQuery(hintsRecorder, nil, tc.query, timestamp.Time(tc.start), timestamp.Time(tc.end), time.Second)
|
query, err = engine.NewRangeQuery(ctx, hintsRecorder, nil, tc.query, timestamp.Time(tc.start), timestamp.Time(tc.end), time.Second)
|
||||||
}
|
}
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
res := query.Exec(context.Background())
|
res := query.Exec(ctx)
|
||||||
require.NoError(t, res.Err)
|
require.NoError(t, res.Err)
|
||||||
|
|
||||||
require.Equal(t, tc.expected, hintsRecorder.hints)
|
require.Equal(t, tc.expected, hintsRecorder.hints)
|
||||||
|
@ -727,9 +728,9 @@ load 10s
|
||||||
var err error
|
var err error
|
||||||
var qry Query
|
var qry Query
|
||||||
if c.Interval == 0 {
|
if c.Interval == 0 {
|
||||||
qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), nil, c.Query, c.Start)
|
qry, err = test.QueryEngine().NewInstantQuery(test.context, test.Queryable(), nil, c.Query, c.Start)
|
||||||
} else {
|
} else {
|
||||||
qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval)
|
qry, err = test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval)
|
||||||
}
|
}
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -1204,9 +1205,9 @@ load 10s
|
||||||
var err error
|
var err error
|
||||||
var qry Query
|
var qry Query
|
||||||
if c.Interval == 0 {
|
if c.Interval == 0 {
|
||||||
qry, err = engine.NewInstantQuery(test.Queryable(), opts, c.Query, c.Start)
|
qry, err = engine.NewInstantQuery(test.context, test.Queryable(), opts, c.Query, c.Start)
|
||||||
} else {
|
} else {
|
||||||
qry, err = engine.NewRangeQuery(test.Queryable(), opts, c.Query, c.Start, c.End, c.Interval)
|
qry, err = engine.NewRangeQuery(test.context, test.Queryable(), opts, c.Query, c.Start, c.End, c.Interval)
|
||||||
}
|
}
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -1387,9 +1388,9 @@ load 10s
|
||||||
var err error
|
var err error
|
||||||
var qry Query
|
var qry Query
|
||||||
if c.Interval == 0 {
|
if c.Interval == 0 {
|
||||||
qry, err = engine.NewInstantQuery(test.Queryable(), nil, c.Query, c.Start)
|
qry, err = engine.NewInstantQuery(test.context, test.Queryable(), nil, c.Query, c.Start)
|
||||||
} else {
|
} else {
|
||||||
qry, err = engine.NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval)
|
qry, err = engine.NewRangeQuery(test.context, test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval)
|
||||||
}
|
}
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -1628,9 +1629,9 @@ load 1ms
|
||||||
var err error
|
var err error
|
||||||
var qry Query
|
var qry Query
|
||||||
if c.end == 0 {
|
if c.end == 0 {
|
||||||
qry, err = test.QueryEngine().NewInstantQuery(test.Queryable(), nil, c.query, start)
|
qry, err = test.QueryEngine().NewInstantQuery(test.context, test.Queryable(), nil, c.query, start)
|
||||||
} else {
|
} else {
|
||||||
qry, err = test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.query, start, end, interval)
|
qry, err = test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, c.query, start, end, interval)
|
||||||
}
|
}
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -1961,7 +1962,7 @@ func TestSubquerySelector(t *testing.T) {
|
||||||
engine := test.QueryEngine()
|
engine := test.QueryEngine()
|
||||||
for _, c := range tst.cases {
|
for _, c := range tst.cases {
|
||||||
t.Run(c.Query, func(t *testing.T) {
|
t.Run(c.Query, func(t *testing.T) {
|
||||||
qry, err := engine.NewInstantQuery(test.Queryable(), nil, c.Query, c.Start)
|
qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, c.Query, c.Start)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
res := qry.Exec(test.Context())
|
res := qry.Exec(test.Context())
|
||||||
|
@ -2909,6 +2910,7 @@ func TestPreprocessAndWrapWithStepInvariantExpr(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEngineOptsValidation(t *testing.T) {
|
func TestEngineOptsValidation(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
opts EngineOpts
|
opts EngineOpts
|
||||||
query string
|
query string
|
||||||
|
@ -2968,8 +2970,8 @@ func TestEngineOptsValidation(t *testing.T) {
|
||||||
|
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
eng := NewEngine(c.opts)
|
eng := NewEngine(c.opts)
|
||||||
_, err1 := eng.NewInstantQuery(nil, nil, c.query, time.Unix(10, 0))
|
_, err1 := eng.NewInstantQuery(ctx, nil, nil, c.query, time.Unix(10, 0))
|
||||||
_, err2 := eng.NewRangeQuery(nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second)
|
_, err2 := eng.NewRangeQuery(ctx, nil, nil, c.query, time.Unix(0, 0), time.Unix(10, 0), time.Second)
|
||||||
if c.fail {
|
if c.fail {
|
||||||
require.Equal(t, c.expError, err1)
|
require.Equal(t, c.expError, err1)
|
||||||
require.Equal(t, c.expError, err2)
|
require.Equal(t, c.expError, err2)
|
||||||
|
@ -3136,7 +3138,7 @@ func TestRangeQuery(t *testing.T) {
|
||||||
err = test.Run()
|
err = test.Run()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
qry, err := test.QueryEngine().NewRangeQuery(test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval)
|
qry, err := test.QueryEngine().NewRangeQuery(test.context, test.Queryable(), nil, c.Query, c.Start, c.End, c.Interval)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
res := qry.Exec(test.Context())
|
res := qry.Exec(test.Context())
|
||||||
|
@ -3167,7 +3169,7 @@ func TestNativeHistogramRate(t *testing.T) {
|
||||||
engine := test.QueryEngine()
|
engine := test.QueryEngine()
|
||||||
|
|
||||||
queryString := fmt.Sprintf("rate(%s[1m])", seriesName)
|
queryString := fmt.Sprintf("rate(%s[1m])", seriesName)
|
||||||
qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond)))
|
qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
res := qry.Exec(test.Context())
|
res := qry.Exec(test.Context())
|
||||||
require.NoError(t, res.Err)
|
require.NoError(t, res.Err)
|
||||||
|
@ -3211,7 +3213,7 @@ func TestNativeFloatHistogramRate(t *testing.T) {
|
||||||
engine := test.QueryEngine()
|
engine := test.QueryEngine()
|
||||||
|
|
||||||
queryString := fmt.Sprintf("rate(%s[1m])", seriesName)
|
queryString := fmt.Sprintf("rate(%s[1m])", seriesName)
|
||||||
qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond)))
|
qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(int64(5*time.Minute/time.Millisecond)))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
res := qry.Exec(test.Context())
|
res := qry.Exec(test.Context())
|
||||||
require.NoError(t, res.Err)
|
require.NoError(t, res.Err)
|
||||||
|
@ -3275,7 +3277,7 @@ func TestNativeHistogram_HistogramCountAndSum(t *testing.T) {
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
queryString := fmt.Sprintf("histogram_count(%s)", seriesName)
|
queryString := fmt.Sprintf("histogram_count(%s)", seriesName)
|
||||||
qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts))
|
qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
res := qry.Exec(test.Context())
|
res := qry.Exec(test.Context())
|
||||||
|
@ -3293,7 +3295,7 @@ func TestNativeHistogram_HistogramCountAndSum(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
queryString = fmt.Sprintf("histogram_sum(%s)", seriesName)
|
queryString = fmt.Sprintf("histogram_sum(%s)", seriesName)
|
||||||
qry, err = engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts))
|
qry, err = engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
res = qry.Exec(test.Context())
|
res = qry.Exec(test.Context())
|
||||||
|
@ -3529,7 +3531,7 @@ func TestNativeHistogram_HistogramQuantile(t *testing.T) {
|
||||||
for j, sc := range c.subCases {
|
for j, sc := range c.subCases {
|
||||||
t.Run(fmt.Sprintf("%d %s", j, sc.quantile), func(t *testing.T) {
|
t.Run(fmt.Sprintf("%d %s", j, sc.quantile), func(t *testing.T) {
|
||||||
queryString := fmt.Sprintf("histogram_quantile(%s, %s)", sc.quantile, seriesName)
|
queryString := fmt.Sprintf("histogram_quantile(%s, %s)", sc.quantile, seriesName)
|
||||||
qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts))
|
qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
res := qry.Exec(test.Context())
|
res := qry.Exec(test.Context())
|
||||||
|
@ -3960,7 +3962,7 @@ func TestNativeHistogram_HistogramFraction(t *testing.T) {
|
||||||
for j, sc := range c.subCases {
|
for j, sc := range c.subCases {
|
||||||
t.Run(fmt.Sprintf("%d %s %s", j, sc.lower, sc.upper), func(t *testing.T) {
|
t.Run(fmt.Sprintf("%d %s %s", j, sc.lower, sc.upper), func(t *testing.T) {
|
||||||
queryString := fmt.Sprintf("histogram_fraction(%s, %s, %s)", sc.lower, sc.upper, seriesName)
|
queryString := fmt.Sprintf("histogram_fraction(%s, %s, %s)", sc.lower, sc.upper, seriesName)
|
||||||
qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts))
|
qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
res := qry.Exec(test.Context())
|
res := qry.Exec(test.Context())
|
||||||
|
@ -4097,7 +4099,7 @@ func TestNativeHistogram_Sum_Count_AddOperator(t *testing.T) {
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
queryAndCheck := func(queryString string, exp Vector) {
|
queryAndCheck := func(queryString string, exp Vector) {
|
||||||
qry, err := engine.NewInstantQuery(test.Queryable(), nil, queryString, timestamp.Time(ts))
|
qry, err := engine.NewInstantQuery(test.context, test.Queryable(), nil, queryString, timestamp.Time(ts))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
res := qry.Exec(test.Context())
|
res := qry.Exec(test.Context())
|
||||||
|
@ -4206,7 +4208,7 @@ metric 0 1 2
|
||||||
opts := &QueryOpts{
|
opts := &QueryOpts{
|
||||||
LookbackDelta: c.queryLookback,
|
LookbackDelta: c.queryLookback,
|
||||||
}
|
}
|
||||||
qry, err := eng.NewInstantQuery(test.Queryable(), opts, query, c.ts)
|
qry, err := eng.NewInstantQuery(test.context, test.Queryable(), opts, query, c.ts)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
res := qry.Exec(test.Context())
|
res := qry.Exec(test.Context())
|
||||||
|
|
|
@ -56,10 +56,11 @@ func TestDeriv(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, a.Commit())
|
require.NoError(t, a.Commit())
|
||||||
|
|
||||||
query, err := engine.NewInstantQuery(storage, nil, "deriv(foo[30m])", timestamp.Time(1493712846939))
|
ctx := context.Background()
|
||||||
|
query, err := engine.NewInstantQuery(ctx, storage, nil, "deriv(foo[30m])", timestamp.Time(1493712846939))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
result := query.Exec(context.Background())
|
result := query.Exec(ctx)
|
||||||
require.NoError(t, result.Err)
|
require.NoError(t, result.Err)
|
||||||
|
|
||||||
vec, _ := result.Vector()
|
vec, _ := result.Vector()
|
||||||
|
|
|
@ -81,14 +81,15 @@ func TestConcurrentRangeQueries(t *testing.T) {
|
||||||
defer func() {
|
defer func() {
|
||||||
sem <- struct{}{}
|
sem <- struct{}{}
|
||||||
}()
|
}()
|
||||||
|
ctx := context.Background()
|
||||||
qry, err := engine.NewRangeQuery(
|
qry, err := engine.NewRangeQuery(
|
||||||
stor, nil, c.expr,
|
ctx, stor, nil, c.expr,
|
||||||
time.Unix(int64((numIntervals-c.steps)*10), 0),
|
time.Unix(int64((numIntervals-c.steps)*10), 0),
|
||||||
time.Unix(int64(numIntervals*10), 0), time.Second*10)
|
time.Unix(int64(numIntervals*10), 0), time.Second*10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
res := qry.Exec(context.Background())
|
res := qry.Exec(ctx)
|
||||||
if res.Err != nil {
|
if res.Err != nil {
|
||||||
t.Logf("Query: %q, steps: %d, result: %s", c.expr, c.steps, res.Err)
|
t.Logf("Query: %q, steps: %d, result: %s", c.expr, c.steps, res.Err)
|
||||||
return res.Err
|
return res.Err
|
||||||
|
|
|
@ -538,7 +538,7 @@ func (t *Test) exec(tc testCommand) error {
|
||||||
}
|
}
|
||||||
queries = append([]atModifierTestCase{{expr: cmd.expr, evalTime: cmd.start}}, queries...)
|
queries = append([]atModifierTestCase{{expr: cmd.expr, evalTime: cmd.start}}, queries...)
|
||||||
for _, iq := range queries {
|
for _, iq := range queries {
|
||||||
q, err := t.QueryEngine().NewInstantQuery(t.storage, nil, iq.expr, iq.evalTime)
|
q, err := t.QueryEngine().NewInstantQuery(t.context, t.storage, nil, iq.expr, iq.evalTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -560,7 +560,7 @@ func (t *Test) exec(tc testCommand) error {
|
||||||
|
|
||||||
// Check query returns same result in range mode,
|
// Check query returns same result in range mode,
|
||||||
// by checking against the middle step.
|
// by checking against the middle step.
|
||||||
q, err = t.queryEngine.NewRangeQuery(t.storage, nil, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute)
|
q, err = t.queryEngine.NewRangeQuery(t.context, t.storage, nil, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -189,7 +189,7 @@ type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector,
|
||||||
// It converts scalar into vector results.
|
// It converts scalar into vector results.
|
||||||
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {
|
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {
|
||||||
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
|
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
|
||||||
q, err := engine.NewInstantQuery(q, nil, qs, t)
|
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -601,7 +601,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
|
||||||
func (h *Head) Init(minValidTime int64) error {
|
func (h *Head) Init(minValidTime int64) error {
|
||||||
h.minValidTime.Store(minValidTime)
|
h.minValidTime.Store(minValidTime)
|
||||||
defer func() {
|
defer func() {
|
||||||
h.postings.EnsureOrder()
|
h.postings.EnsureOrder(h.opts.WALReplayConcurrency)
|
||||||
}()
|
}()
|
||||||
defer h.gc() // After loading the wal remove the obsolete data from the head.
|
defer h.gc() // After loading the wal remove the obsolete data from the head.
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
|
@ -224,7 +224,10 @@ func (p *MemPostings) All() Postings {
|
||||||
|
|
||||||
// EnsureOrder ensures that all postings lists are sorted. After it returns all further
|
// EnsureOrder ensures that all postings lists are sorted. After it returns all further
|
||||||
// calls to add and addFor will insert new IDs in a sorted manner.
|
// calls to add and addFor will insert new IDs in a sorted manner.
|
||||||
func (p *MemPostings) EnsureOrder() {
|
// Parameter numberOfConcurrentProcesses is used to specify the maximal number of
|
||||||
|
// CPU cores used for this operation. If it is <= 0, GOMAXPROCS is used.
|
||||||
|
// GOMAXPROCS was the default before introducing this parameter.
|
||||||
|
func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) {
|
||||||
p.mtx.Lock()
|
p.mtx.Lock()
|
||||||
defer p.mtx.Unlock()
|
defer p.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -232,13 +235,16 @@ func (p *MemPostings) EnsureOrder() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
n := runtime.GOMAXPROCS(0)
|
concurrency := numberOfConcurrentProcesses
|
||||||
|
if concurrency <= 0 {
|
||||||
|
concurrency = runtime.GOMAXPROCS(0)
|
||||||
|
}
|
||||||
workc := make(chan *[][]storage.SeriesRef)
|
workc := make(chan *[][]storage.SeriesRef)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(n)
|
wg.Add(concurrency)
|
||||||
|
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < concurrency; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
for job := range workc {
|
for job := range workc {
|
||||||
for _, l := range *job {
|
for _, l := range *job {
|
||||||
|
|
|
@ -54,7 +54,7 @@ func TestMemPostings_ensureOrder(t *testing.T) {
|
||||||
p.m["a"][v] = l
|
p.m["a"][v] = l
|
||||||
}
|
}
|
||||||
|
|
||||||
p.EnsureOrder()
|
p.EnsureOrder(0)
|
||||||
|
|
||||||
for _, e := range p.m {
|
for _, e := range p.m {
|
||||||
for _, l := range e {
|
for _, l := range e {
|
||||||
|
@ -114,7 +114,7 @@ func BenchmarkMemPostings_ensureOrder(b *testing.B) {
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
p.EnsureOrder()
|
p.EnsureOrder(0)
|
||||||
p.ordered = false
|
p.ordered = false
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -177,8 +177,8 @@ type TSDBAdminStats interface {
|
||||||
// QueryEngine defines the interface for the *promql.Engine, so it can be replaced, wrapped or mocked.
|
// QueryEngine defines the interface for the *promql.Engine, so it can be replaced, wrapped or mocked.
|
||||||
type QueryEngine interface {
|
type QueryEngine interface {
|
||||||
SetQueryLogger(l promql.QueryLogger)
|
SetQueryLogger(l promql.QueryLogger)
|
||||||
NewInstantQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error)
|
NewInstantQuery(ctx context.Context, q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error)
|
||||||
NewRangeQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error)
|
NewRangeQuery(ctx context.Context, q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// API can register a set of endpoints in a router and handle
|
// API can register a set of endpoints in a router and handle
|
||||||
|
@ -425,7 +425,7 @@ func (api *API) query(r *http.Request) (result apiFuncResult) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
|
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
|
||||||
}
|
}
|
||||||
qry, err := api.QueryEngine.NewInstantQuery(api.Queryable, opts, r.FormValue("query"), ts)
|
qry, err := api.QueryEngine.NewInstantQuery(ctx, api.Queryable, opts, r.FormValue("query"), ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return invalidParamError(err, "query")
|
return invalidParamError(err, "query")
|
||||||
}
|
}
|
||||||
|
@ -528,7 +528,7 @@ func (api *API) queryRange(r *http.Request) (result apiFuncResult) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
|
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
|
||||||
}
|
}
|
||||||
qry, err := api.QueryEngine.NewRangeQuery(api.Queryable, opts, r.FormValue("query"), start, end, step)
|
qry, err := api.QueryEngine.NewRangeQuery(ctx, api.Queryable, opts, r.FormValue("query"), start, end, step)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
|
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue