mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #2000 from prometheus/contextify-storage
Contextify storage and PromQL interfaces.
This commit is contained in:
commit
6dda28dbd4
|
@ -26,6 +26,8 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/log"
|
"github.com/prometheus/common/log"
|
||||||
"github.com/prometheus/common/version"
|
"github.com/prometheus/common/version"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/notifier"
|
"github.com/prometheus/prometheus/notifier"
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
|
@ -105,21 +107,24 @@ func Main() int {
|
||||||
notifier = notifier.New(&cfg.notifier)
|
notifier = notifier.New(&cfg.notifier)
|
||||||
targetManager = retrieval.NewTargetManager(sampleAppender)
|
targetManager = retrieval.NewTargetManager(sampleAppender)
|
||||||
queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine)
|
queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine)
|
||||||
|
ctx, cancelCtx = context.WithCancel(context.Background())
|
||||||
)
|
)
|
||||||
|
|
||||||
ruleManager := rules.NewManager(&rules.ManagerOptions{
|
ruleManager := rules.NewManager(&rules.ManagerOptions{
|
||||||
SampleAppender: sampleAppender,
|
SampleAppender: sampleAppender,
|
||||||
Notifier: notifier,
|
Notifier: notifier,
|
||||||
QueryEngine: queryEngine,
|
QueryEngine: queryEngine,
|
||||||
|
Context: ctx,
|
||||||
ExternalURL: cfg.web.ExternalURL,
|
ExternalURL: cfg.web.ExternalURL,
|
||||||
})
|
})
|
||||||
|
|
||||||
flags := map[string]string{}
|
cfg.web.Context = ctx
|
||||||
cfg.fs.VisitAll(func(f *flag.Flag) {
|
cfg.web.Storage = localStorage
|
||||||
flags[f.Name] = f.Value.String()
|
cfg.web.QueryEngine = queryEngine
|
||||||
})
|
cfg.web.TargetManager = targetManager
|
||||||
|
cfg.web.RuleManager = ruleManager
|
||||||
|
|
||||||
version := &web.PrometheusVersion{
|
cfg.web.Version = &web.PrometheusVersion{
|
||||||
Version: version.Version,
|
Version: version.Version,
|
||||||
Revision: version.Revision,
|
Revision: version.Revision,
|
||||||
Branch: version.Branch,
|
Branch: version.Branch,
|
||||||
|
@ -128,7 +133,12 @@ func Main() int {
|
||||||
GoVersion: version.GoVersion,
|
GoVersion: version.GoVersion,
|
||||||
}
|
}
|
||||||
|
|
||||||
webHandler := web.New(localStorage, queryEngine, targetManager, ruleManager, version, flags, &cfg.web)
|
cfg.web.Flags = map[string]string{}
|
||||||
|
cfg.fs.VisitAll(func(f *flag.Flag) {
|
||||||
|
cfg.web.Flags[f.Name] = f.Value.String()
|
||||||
|
})
|
||||||
|
|
||||||
|
webHandler := web.New(&cfg.web)
|
||||||
|
|
||||||
reloadables = append(reloadables, targetManager, ruleManager, webHandler, notifier)
|
reloadables = append(reloadables, targetManager, ruleManager, webHandler, notifier)
|
||||||
|
|
||||||
|
@ -201,7 +211,7 @@ func Main() int {
|
||||||
|
|
||||||
// Shutting down the query engine before the rule manager will cause pending queries
|
// Shutting down the query engine before the rule manager will cause pending queries
|
||||||
// to be canceled and ensures a quick shutdown of the rule manager.
|
// to be canceled and ensures a quick shutdown of the rule manager.
|
||||||
defer queryEngine.Stop()
|
defer cancelCtx()
|
||||||
|
|
||||||
go webHandler.Run()
|
go webHandler.Run()
|
||||||
|
|
||||||
|
|
|
@ -150,7 +150,7 @@ func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was cancele
|
||||||
// it is associated with.
|
// it is associated with.
|
||||||
type Query interface {
|
type Query interface {
|
||||||
// Exec processes the query and
|
// Exec processes the query and
|
||||||
Exec() *Result
|
Exec(ctx context.Context) *Result
|
||||||
// Statement returns the parsed statement of the query.
|
// Statement returns the parsed statement of the query.
|
||||||
Statement() Statement
|
Statement() Statement
|
||||||
// Stats returns statistics about the lifetime of the query.
|
// Stats returns statistics about the lifetime of the query.
|
||||||
|
@ -192,8 +192,8 @@ func (q *query) Cancel() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exec implements the Query interface.
|
// Exec implements the Query interface.
|
||||||
func (q *query) Exec() *Result {
|
func (q *query) Exec(ctx context.Context) *Result {
|
||||||
res, err := q.ng.exec(q)
|
res, err := q.ng.exec(ctx, q)
|
||||||
return &Result{Err: err, Value: res}
|
return &Result{Err: err, Value: res}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,13 +220,8 @@ func contextDone(ctx context.Context, env string) error {
|
||||||
type Engine struct {
|
type Engine struct {
|
||||||
// The querier on which the engine operates.
|
// The querier on which the engine operates.
|
||||||
querier local.Querier
|
querier local.Querier
|
||||||
|
|
||||||
// The base context for all queries and its cancellation function.
|
|
||||||
baseCtx context.Context
|
|
||||||
cancelQueries func()
|
|
||||||
// The gate limiting the maximum number of concurrent and waiting queries.
|
// The gate limiting the maximum number of concurrent and waiting queries.
|
||||||
gate *queryGate
|
gate *queryGate
|
||||||
|
|
||||||
options *EngineOptions
|
options *EngineOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -235,11 +230,8 @@ func NewEngine(querier local.Querier, o *EngineOptions) *Engine {
|
||||||
if o == nil {
|
if o == nil {
|
||||||
o = DefaultEngineOptions
|
o = DefaultEngineOptions
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
return &Engine{
|
return &Engine{
|
||||||
querier: querier,
|
querier: querier,
|
||||||
baseCtx: ctx,
|
|
||||||
cancelQueries: cancel,
|
|
||||||
gate: newQueryGate(o.MaxConcurrentQueries),
|
gate: newQueryGate(o.MaxConcurrentQueries),
|
||||||
options: o,
|
options: o,
|
||||||
}
|
}
|
||||||
|
@ -257,11 +249,6 @@ var DefaultEngineOptions = &EngineOptions{
|
||||||
Timeout: 2 * time.Minute,
|
Timeout: 2 * time.Minute,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop the engine and cancel all running queries.
|
|
||||||
func (ng *Engine) Stop() {
|
|
||||||
ng.cancelQueries()
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewInstantQuery returns an evaluation query for the given expression at the given time.
|
// NewInstantQuery returns an evaluation query for the given expression at the given time.
|
||||||
func (ng *Engine) NewInstantQuery(qs string, ts model.Time) (Query, error) {
|
func (ng *Engine) NewInstantQuery(qs string, ts model.Time) (Query, error) {
|
||||||
expr, err := ParseExpr(qs)
|
expr, err := ParseExpr(qs)
|
||||||
|
@ -326,8 +313,8 @@ func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
|
||||||
//
|
//
|
||||||
// At this point per query only one EvalStmt is evaluated. Alert and record
|
// At this point per query only one EvalStmt is evaluated. Alert and record
|
||||||
// statements are not handled by the Engine.
|
// statements are not handled by the Engine.
|
||||||
func (ng *Engine) exec(q *query) (model.Value, error) {
|
func (ng *Engine) exec(ctx context.Context, q *query) (model.Value, error) {
|
||||||
ctx, cancel := context.WithTimeout(q.ng.baseCtx, ng.options.Timeout)
|
ctx, cancel := context.WithTimeout(ctx, ng.options.Timeout)
|
||||||
q.cancel = cancel
|
q.cancel = cancel
|
||||||
|
|
||||||
queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start()
|
queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start()
|
||||||
|
@ -365,7 +352,7 @@ func (ng *Engine) exec(q *query) (model.Value, error) {
|
||||||
// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
|
// execEvalStmt evaluates the expression of an evaluation statement for the given time range.
|
||||||
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (model.Value, error) {
|
func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (model.Value, error) {
|
||||||
prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start()
|
prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start()
|
||||||
err := ng.populateIterators(s)
|
err := ng.populateIterators(ctx, s)
|
||||||
prepareTimer.Stop()
|
prepareTimer.Stop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -476,19 +463,21 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
||||||
return resMatrix, nil
|
return resMatrix, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ng *Engine) populateIterators(s *EvalStmt) error {
|
func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) error {
|
||||||
var queryErr error
|
var queryErr error
|
||||||
Inspect(s.Expr, func(node Node) bool {
|
Inspect(s.Expr, func(node Node) bool {
|
||||||
switch n := node.(type) {
|
switch n := node.(type) {
|
||||||
case *VectorSelector:
|
case *VectorSelector:
|
||||||
if s.Start.Equal(s.End) {
|
if s.Start.Equal(s.End) {
|
||||||
n.iterators, queryErr = ng.querier.QueryInstant(
|
n.iterators, queryErr = ng.querier.QueryInstant(
|
||||||
|
ctx,
|
||||||
s.Start.Add(-n.Offset),
|
s.Start.Add(-n.Offset),
|
||||||
StalenessDelta,
|
StalenessDelta,
|
||||||
n.LabelMatchers...,
|
n.LabelMatchers...,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
n.iterators, queryErr = ng.querier.QueryRange(
|
n.iterators, queryErr = ng.querier.QueryRange(
|
||||||
|
ctx,
|
||||||
s.Start.Add(-n.Offset-StalenessDelta),
|
s.Start.Add(-n.Offset-StalenessDelta),
|
||||||
s.End.Add(-n.Offset),
|
s.End.Add(-n.Offset),
|
||||||
n.LabelMatchers...,
|
n.LabelMatchers...,
|
||||||
|
@ -499,6 +488,7 @@ func (ng *Engine) populateIterators(s *EvalStmt) error {
|
||||||
}
|
}
|
||||||
case *MatrixSelector:
|
case *MatrixSelector:
|
||||||
n.iterators, queryErr = ng.querier.QueryRange(
|
n.iterators, queryErr = ng.querier.QueryRange(
|
||||||
|
ctx,
|
||||||
s.Start.Add(-n.Offset-n.Range),
|
s.Start.Add(-n.Offset-n.Range),
|
||||||
s.End.Add(-n.Offset),
|
s.End.Add(-n.Offset),
|
||||||
n.LabelMatchers...,
|
n.LabelMatchers...,
|
||||||
|
|
|
@ -23,7 +23,8 @@ import (
|
||||||
|
|
||||||
func TestQueryConcurrency(t *testing.T) {
|
func TestQueryConcurrency(t *testing.T) {
|
||||||
engine := NewEngine(nil, nil)
|
engine := NewEngine(nil, nil)
|
||||||
defer engine.Stop()
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||||
|
defer cancelCtx()
|
||||||
|
|
||||||
block := make(chan struct{})
|
block := make(chan struct{})
|
||||||
processing := make(chan struct{})
|
processing := make(chan struct{})
|
||||||
|
@ -36,7 +37,7 @@ func TestQueryConcurrency(t *testing.T) {
|
||||||
|
|
||||||
for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ {
|
for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ {
|
||||||
q := engine.newTestQuery(f)
|
q := engine.newTestQuery(f)
|
||||||
go q.Exec()
|
go q.Exec(ctx)
|
||||||
select {
|
select {
|
||||||
case <-processing:
|
case <-processing:
|
||||||
// Expected.
|
// Expected.
|
||||||
|
@ -46,7 +47,7 @@ func TestQueryConcurrency(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
q := engine.newTestQuery(f)
|
q := engine.newTestQuery(f)
|
||||||
go q.Exec()
|
go q.Exec(ctx)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-processing:
|
case <-processing:
|
||||||
|
@ -76,14 +77,15 @@ func TestQueryTimeout(t *testing.T) {
|
||||||
Timeout: 5 * time.Millisecond,
|
Timeout: 5 * time.Millisecond,
|
||||||
MaxConcurrentQueries: 20,
|
MaxConcurrentQueries: 20,
|
||||||
})
|
})
|
||||||
defer engine.Stop()
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||||
|
defer cancelCtx()
|
||||||
|
|
||||||
query := engine.newTestQuery(func(ctx context.Context) error {
|
query := engine.newTestQuery(func(ctx context.Context) error {
|
||||||
time.Sleep(50 * time.Millisecond)
|
time.Sleep(50 * time.Millisecond)
|
||||||
return contextDone(ctx, "test statement execution")
|
return contextDone(ctx, "test statement execution")
|
||||||
})
|
})
|
||||||
|
|
||||||
res := query.Exec()
|
res := query.Exec(ctx)
|
||||||
if res.Err == nil {
|
if res.Err == nil {
|
||||||
t.Fatalf("expected timeout error but got none")
|
t.Fatalf("expected timeout error but got none")
|
||||||
}
|
}
|
||||||
|
@ -94,7 +96,8 @@ func TestQueryTimeout(t *testing.T) {
|
||||||
|
|
||||||
func TestQueryCancel(t *testing.T) {
|
func TestQueryCancel(t *testing.T) {
|
||||||
engine := NewEngine(nil, nil)
|
engine := NewEngine(nil, nil)
|
||||||
defer engine.Stop()
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||||
|
defer cancelCtx()
|
||||||
|
|
||||||
// Cancel a running query before it completes.
|
// Cancel a running query before it completes.
|
||||||
block := make(chan struct{})
|
block := make(chan struct{})
|
||||||
|
@ -109,7 +112,7 @@ func TestQueryCancel(t *testing.T) {
|
||||||
var res *Result
|
var res *Result
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
res = query1.Exec()
|
res = query1.Exec(ctx)
|
||||||
processing <- struct{}{}
|
processing <- struct{}{}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -131,14 +134,15 @@ func TestQueryCancel(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
query2.Cancel()
|
query2.Cancel()
|
||||||
res = query2.Exec()
|
res = query2.Exec(ctx)
|
||||||
if res.Err != nil {
|
if res.Err != nil {
|
||||||
t.Fatalf("unexpeceted error on executing query2: %s", res.Err)
|
t.Fatalf("unexpected error on executing query2: %s", res.Err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEngineShutdown(t *testing.T) {
|
func TestEngineShutdown(t *testing.T) {
|
||||||
engine := NewEngine(nil, nil)
|
engine := NewEngine(nil, nil)
|
||||||
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||||
|
|
||||||
block := make(chan struct{})
|
block := make(chan struct{})
|
||||||
processing := make(chan struct{})
|
processing := make(chan struct{})
|
||||||
|
@ -158,12 +162,12 @@ func TestEngineShutdown(t *testing.T) {
|
||||||
|
|
||||||
var res *Result
|
var res *Result
|
||||||
go func() {
|
go func() {
|
||||||
res = query1.Exec()
|
res = query1.Exec(ctx)
|
||||||
processing <- struct{}{}
|
processing <- struct{}{}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
<-processing
|
<-processing
|
||||||
engine.Stop()
|
cancelCtx()
|
||||||
block <- struct{}{}
|
block <- struct{}{}
|
||||||
<-processing
|
<-processing
|
||||||
|
|
||||||
|
@ -181,9 +185,9 @@ func TestEngineShutdown(t *testing.T) {
|
||||||
|
|
||||||
// The second query is started after the engine shut down. It must
|
// The second query is started after the engine shut down. It must
|
||||||
// be canceled immediately.
|
// be canceled immediately.
|
||||||
res2 := query2.Exec()
|
res2 := query2.Exec(ctx)
|
||||||
if res2.Err == nil {
|
if res2.Err == nil {
|
||||||
t.Fatalf("expected error on querying shutdown engine but got none")
|
t.Fatalf("expected error on querying with canceled context but got none")
|
||||||
}
|
}
|
||||||
if _, ok := res2.Err.(ErrQueryCanceled); !ok {
|
if _, ok := res2.Err.(ErrQueryCanceled); !ok {
|
||||||
t.Fatalf("expected cancelation error, got %q", res2.Err)
|
t.Fatalf("expected cancelation error, got %q", res2.Err)
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/storage/local"
|
"github.com/prometheus/prometheus/storage/local"
|
||||||
|
@ -52,6 +53,8 @@ type Test struct {
|
||||||
storage local.Storage
|
storage local.Storage
|
||||||
closeStorage func()
|
closeStorage func()
|
||||||
queryEngine *Engine
|
queryEngine *Engine
|
||||||
|
context context.Context
|
||||||
|
cancelCtx context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTest returns an initialized empty Test.
|
// NewTest returns an initialized empty Test.
|
||||||
|
@ -79,6 +82,11 @@ func (t *Test) QueryEngine() *Engine {
|
||||||
return t.queryEngine
|
return t.queryEngine
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Context returns the test's context.
|
||||||
|
func (t *Test) Context() context.Context {
|
||||||
|
return t.context
|
||||||
|
}
|
||||||
|
|
||||||
// Storage returns the test's storage.
|
// Storage returns the test's storage.
|
||||||
func (t *Test) Storage() local.Storage {
|
func (t *Test) Storage() local.Storage {
|
||||||
return t.storage
|
return t.storage
|
||||||
|
@ -463,7 +471,7 @@ func (t *Test) exec(tc testCommand) error {
|
||||||
|
|
||||||
case *evalCmd:
|
case *evalCmd:
|
||||||
q := t.queryEngine.newQuery(cmd.expr, cmd.start, cmd.end, cmd.interval)
|
q := t.queryEngine.newQuery(cmd.expr, cmd.start, cmd.end, cmd.interval)
|
||||||
res := q.Exec()
|
res := q.Exec(t.context)
|
||||||
if res.Err != nil {
|
if res.Err != nil {
|
||||||
if cmd.fail {
|
if cmd.fail {
|
||||||
return nil
|
return nil
|
||||||
|
@ -490,8 +498,8 @@ func (t *Test) clear() {
|
||||||
if t.closeStorage != nil {
|
if t.closeStorage != nil {
|
||||||
t.closeStorage()
|
t.closeStorage()
|
||||||
}
|
}
|
||||||
if t.queryEngine != nil {
|
if t.cancelCtx != nil {
|
||||||
t.queryEngine.Stop()
|
t.cancelCtx()
|
||||||
}
|
}
|
||||||
|
|
||||||
var closer testutil.Closer
|
var closer testutil.Closer
|
||||||
|
@ -499,11 +507,12 @@ func (t *Test) clear() {
|
||||||
|
|
||||||
t.closeStorage = closer.Close
|
t.closeStorage = closer.Close
|
||||||
t.queryEngine = NewEngine(t.storage, nil)
|
t.queryEngine = NewEngine(t.storage, nil)
|
||||||
|
t.context, t.cancelCtx = context.WithCancel(context.Background())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes resources associated with the Test.
|
// Close closes resources associated with the Test.
|
||||||
func (t *Test) Close() {
|
func (t *Test) Close() {
|
||||||
t.queryEngine.Stop()
|
t.cancelCtx()
|
||||||
t.closeStorage()
|
t.closeStorage()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,8 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
html_template "html/template"
|
html_template "html/template"
|
||||||
|
|
||||||
"github.com/prometheus/common/log"
|
"github.com/prometheus/common/log"
|
||||||
|
@ -146,12 +148,12 @@ const resolvedRetention = 15 * time.Minute
|
||||||
|
|
||||||
// eval evaluates the rule expression and then creates pending alerts and fires
|
// eval evaluates the rule expression and then creates pending alerts and fires
|
||||||
// or removes previously pending alerts accordingly.
|
// or removes previously pending alerts accordingly.
|
||||||
func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, externalURLPath string) (model.Vector, error) {
|
func (r *AlertingRule) eval(ctx context.Context, ts model.Time, engine *promql.Engine, externalURLPath string) (model.Vector, error) {
|
||||||
query, err := engine.NewInstantQuery(r.vector.String(), ts)
|
query, err := engine.NewInstantQuery(r.vector.String(), ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res, err := query.Exec().Vector()
|
res, err := query.Exec(ctx).Vector()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -183,6 +185,7 @@ func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, externalURLPat
|
||||||
|
|
||||||
expand := func(text model.LabelValue) model.LabelValue {
|
expand := func(text model.LabelValue) model.LabelValue {
|
||||||
tmpl := template.NewTemplateExpander(
|
tmpl := template.NewTemplateExpander(
|
||||||
|
ctx,
|
||||||
defs+string(text),
|
defs+string(text),
|
||||||
"__alert_"+r.Name(),
|
"__alert_"+r.Name(),
|
||||||
tmplData,
|
tmplData,
|
||||||
|
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/log"
|
"github.com/prometheus/common/log"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/notifier"
|
"github.com/prometheus/prometheus/notifier"
|
||||||
|
@ -105,7 +106,7 @@ const (
|
||||||
type Rule interface {
|
type Rule interface {
|
||||||
Name() string
|
Name() string
|
||||||
// eval evaluates the rule, including any associated recording or alerting actions.
|
// eval evaluates the rule, including any associated recording or alerting actions.
|
||||||
eval(model.Time, *promql.Engine, string) (model.Vector, error)
|
eval(context.Context, model.Time, *promql.Engine, string) (model.Vector, error)
|
||||||
// String returns a human-readable string representation of the rule.
|
// String returns a human-readable string representation of the rule.
|
||||||
String() string
|
String() string
|
||||||
// HTMLSnippet returns a human-readable string representation of the rule,
|
// HTMLSnippet returns a human-readable string representation of the rule,
|
||||||
|
@ -256,7 +257,7 @@ func (g *Group) eval() {
|
||||||
|
|
||||||
evalTotal.WithLabelValues(rtyp).Inc()
|
evalTotal.WithLabelValues(rtyp).Inc()
|
||||||
|
|
||||||
vector, err := rule.eval(now, g.opts.QueryEngine, g.opts.ExternalURL.Path)
|
vector, err := rule.eval(g.opts.Context, now, g.opts.QueryEngine, g.opts.ExternalURL.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Canceled queries are intentional termination of queries. This normally
|
// Canceled queries are intentional termination of queries. This normally
|
||||||
// happens on shutdown and thus we skip logging of any errors here.
|
// happens on shutdown and thus we skip logging of any errors here.
|
||||||
|
@ -341,6 +342,7 @@ type Manager struct {
|
||||||
type ManagerOptions struct {
|
type ManagerOptions struct {
|
||||||
ExternalURL *url.URL
|
ExternalURL *url.URL
|
||||||
QueryEngine *promql.Engine
|
QueryEngine *promql.Engine
|
||||||
|
Context context.Context
|
||||||
Notifier *notifier.Notifier
|
Notifier *notifier.Notifier
|
||||||
SampleAppender storage.SampleAppender
|
SampleAppender storage.SampleAppender
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,7 +105,7 @@ func TestAlertingRule(t *testing.T) {
|
||||||
for i, test := range tests {
|
for i, test := range tests {
|
||||||
evalTime := model.Time(0).Add(test.time)
|
evalTime := model.Time(0).Add(test.time)
|
||||||
|
|
||||||
res, err := rule.eval(evalTime, suite.QueryEngine(), "")
|
res, err := rule.eval(suite.Context(), evalTime, suite.QueryEngine(), "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error during alerting rule evaluation: %s", err)
|
t.Fatalf("Error during alerting rule evaluation: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"html/template"
|
"html/template"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
"github.com/prometheus/prometheus/util/strutil"
|
"github.com/prometheus/prometheus/util/strutil"
|
||||||
|
@ -45,14 +46,14 @@ func (rule RecordingRule) Name() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// eval evaluates the rule and then overrides the metric names and labels accordingly.
|
// eval evaluates the rule and then overrides the metric names and labels accordingly.
|
||||||
func (rule RecordingRule) eval(timestamp model.Time, engine *promql.Engine, _ string) (model.Vector, error) {
|
func (rule RecordingRule) eval(ctx context.Context, timestamp model.Time, engine *promql.Engine, _ string) (model.Vector, error) {
|
||||||
query, err := engine.NewInstantQuery(rule.vector.String(), timestamp)
|
query, err := engine.NewInstantQuery(rule.vector.String(), timestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
result = query.Exec()
|
result = query.Exec(ctx)
|
||||||
vector model.Vector
|
vector model.Vector
|
||||||
)
|
)
|
||||||
if result.Err != nil {
|
if result.Err != nil {
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
"github.com/prometheus/prometheus/storage/local"
|
"github.com/prometheus/prometheus/storage/local"
|
||||||
|
@ -27,6 +28,9 @@ func TestRuleEval(t *testing.T) {
|
||||||
storage, closer := local.NewTestStorage(t, 2)
|
storage, closer := local.NewTestStorage(t, 2)
|
||||||
defer closer.Close()
|
defer closer.Close()
|
||||||
engine := promql.NewEngine(storage, nil)
|
engine := promql.NewEngine(storage, nil)
|
||||||
|
ctx, cancelCtx := context.WithCancel(context.Background())
|
||||||
|
defer cancelCtx()
|
||||||
|
|
||||||
now := model.Now()
|
now := model.Now()
|
||||||
|
|
||||||
suite := []struct {
|
suite := []struct {
|
||||||
|
@ -59,7 +63,7 @@ func TestRuleEval(t *testing.T) {
|
||||||
|
|
||||||
for _, test := range suite {
|
for _, test := range suite {
|
||||||
rule := NewRecordingRule(test.name, test.expr, test.labels)
|
rule := NewRecordingRule(test.name, test.expr, test.labels)
|
||||||
result, err := rule.eval(now, engine, "")
|
result, err := rule.eval(ctx, now, engine, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error evaluating %s", test.name)
|
t.Fatalf("Error evaluating %s", test.name)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/storage/metric"
|
"github.com/prometheus/prometheus/storage/metric"
|
||||||
|
@ -40,7 +41,7 @@ type Storage interface {
|
||||||
|
|
||||||
// Drop all time series associated with the given label matchers. Returns
|
// Drop all time series associated with the given label matchers. Returns
|
||||||
// the number series that were dropped.
|
// the number series that were dropped.
|
||||||
DropMetricsForLabelMatchers(...*metric.LabelMatcher) (int, error)
|
DropMetricsForLabelMatchers(context.Context, ...*metric.LabelMatcher) (int, error)
|
||||||
// Run the various maintenance loops in goroutines. Returns when the
|
// Run the various maintenance loops in goroutines. Returns when the
|
||||||
// storage is ready to use. Keeps everything running in the background
|
// storage is ready to use. Keeps everything running in the background
|
||||||
// until Stop is called.
|
// until Stop is called.
|
||||||
|
@ -59,10 +60,10 @@ type Querier interface {
|
||||||
// QueryRange returns a list of series iterators for the selected
|
// QueryRange returns a list of series iterators for the selected
|
||||||
// time range and label matchers. The iterators need to be closed
|
// time range and label matchers. The iterators need to be closed
|
||||||
// after usage.
|
// after usage.
|
||||||
QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error)
|
QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error)
|
||||||
// QueryInstant returns a list of series iterators for the selected
|
// QueryInstant returns a list of series iterators for the selected
|
||||||
// instant and label matchers. The iterators need to be closed after usage.
|
// instant and label matchers. The iterators need to be closed after usage.
|
||||||
QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error)
|
QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error)
|
||||||
// MetricsForLabelMatchers returns the metrics from storage that satisfy
|
// MetricsForLabelMatchers returns the metrics from storage that satisfy
|
||||||
// the given sets of label matchers. Each set of matchers must contain at
|
// the given sets of label matchers. Each set of matchers must contain at
|
||||||
// least one label matcher that does not match the empty string. Otherwise,
|
// least one label matcher that does not match the empty string. Otherwise,
|
||||||
|
@ -72,14 +73,14 @@ type Querier interface {
|
||||||
// storage to optimize the search. The storage MAY exclude metrics that
|
// storage to optimize the search. The storage MAY exclude metrics that
|
||||||
// have no samples in the specified interval from the returned map. In
|
// have no samples in the specified interval from the returned map. In
|
||||||
// doubt, specify model.Earliest for from and model.Latest for through.
|
// doubt, specify model.Earliest for from and model.Latest for through.
|
||||||
MetricsForLabelMatchers(from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error)
|
MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error)
|
||||||
// LastSampleForLabelMatchers returns the last samples that have been
|
// LastSampleForLabelMatchers returns the last samples that have been
|
||||||
// ingested for the time series matching the given set of label matchers.
|
// ingested for the time series matching the given set of label matchers.
|
||||||
// The label matching behavior is the same as in MetricsForLabelMatchers.
|
// The label matching behavior is the same as in MetricsForLabelMatchers.
|
||||||
// All returned samples are between the specified cutoff time and now.
|
// All returned samples are between the specified cutoff time and now.
|
||||||
LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error)
|
LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error)
|
||||||
// Get all of the label values that are associated with a given label name.
|
// Get all of the label values that are associated with a given label name.
|
||||||
LabelValuesForLabelName(model.LabelName) (model.LabelValues, error)
|
LabelValuesForLabelName(context.Context, model.LabelName) (model.LabelValues, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeriesIterator enables efficient access of sample values in a series. Its
|
// SeriesIterator enables efficient access of sample values in a series. Its
|
||||||
|
|
|
@ -17,6 +17,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/storage/metric"
|
"github.com/prometheus/prometheus/storage/metric"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -39,22 +41,23 @@ func (s *NoopStorage) WaitForIndexing() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// LastSampleForLabelMatchers implements Storage.
|
// LastSampleForLabelMatchers implements Storage.
|
||||||
func (s *NoopStorage) LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
|
func (s *NoopStorage) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryRange implements Storage.
|
// QueryRange implements Storage.
|
||||||
func (s *NoopStorage) QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
|
func (s *NoopStorage) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryInstant implements Storage.
|
// QueryInstant implements Storage.
|
||||||
func (s *NoopStorage) QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
|
func (s *NoopStorage) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MetricsForLabelMatchers implements Storage.
|
// MetricsForLabelMatchers implements Storage.
|
||||||
func (s *NoopStorage) MetricsForLabelMatchers(
|
func (s *NoopStorage) MetricsForLabelMatchers(
|
||||||
|
ctx context.Context,
|
||||||
from, through model.Time,
|
from, through model.Time,
|
||||||
matcherSets ...metric.LabelMatchers,
|
matcherSets ...metric.LabelMatchers,
|
||||||
) ([]metric.Metric, error) {
|
) ([]metric.Metric, error) {
|
||||||
|
@ -62,12 +65,12 @@ func (s *NoopStorage) MetricsForLabelMatchers(
|
||||||
}
|
}
|
||||||
|
|
||||||
// LabelValuesForLabelName implements Storage.
|
// LabelValuesForLabelName implements Storage.
|
||||||
func (s *NoopStorage) LabelValuesForLabelName(labelName model.LabelName) (model.LabelValues, error) {
|
func (s *NoopStorage) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DropMetricsForLabelMatchers implements Storage.
|
// DropMetricsForLabelMatchers implements Storage.
|
||||||
func (s *NoopStorage) DropMetricsForLabelMatchers(matchers ...*metric.LabelMatcher) (int, error) {
|
func (s *NoopStorage) DropMetricsForLabelMatchers(ctx context.Context, matchers ...*metric.LabelMatcher) (int, error) {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/log"
|
"github.com/prometheus/common/log"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/storage/metric"
|
"github.com/prometheus/prometheus/storage/metric"
|
||||||
)
|
)
|
||||||
|
@ -413,7 +414,7 @@ func (s *MemorySeriesStorage) WaitForIndexing() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// LastSampleForLabelMatchers implements Storage.
|
// LastSampleForLabelMatchers implements Storage.
|
||||||
func (s *MemorySeriesStorage) LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
|
func (s *MemorySeriesStorage) LastSampleForLabelMatchers(_ context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
|
||||||
fps := map[model.Fingerprint]struct{}{}
|
fps := map[model.Fingerprint]struct{}{}
|
||||||
for _, matchers := range matcherSets {
|
for _, matchers := range matcherSets {
|
||||||
fpToMetric, err := s.metricsForLabelMatchers(cutoff, model.Latest, matchers...)
|
fpToMetric, err := s.metricsForLabelMatchers(cutoff, model.Latest, matchers...)
|
||||||
|
@ -483,7 +484,7 @@ func (bit *boundedIterator) Close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryRange implements Storage.
|
// QueryRange implements Storage.
|
||||||
func (s *MemorySeriesStorage) QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
|
func (s *MemorySeriesStorage) QueryRange(_ context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
|
||||||
fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...)
|
fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -497,7 +498,7 @@ func (s *MemorySeriesStorage) QueryRange(from, through model.Time, matchers ...*
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryInstant implements Storage.
|
// QueryInstant implements Storage.
|
||||||
func (s *MemorySeriesStorage) QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
|
func (s *MemorySeriesStorage) QueryInstant(_ context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) {
|
||||||
from := ts.Add(-stalenessDelta)
|
from := ts.Add(-stalenessDelta)
|
||||||
through := ts
|
through := ts
|
||||||
|
|
||||||
|
@ -540,6 +541,7 @@ func (s *MemorySeriesStorage) fingerprintsForLabelPair(
|
||||||
|
|
||||||
// MetricsForLabelMatchers implements Storage.
|
// MetricsForLabelMatchers implements Storage.
|
||||||
func (s *MemorySeriesStorage) MetricsForLabelMatchers(
|
func (s *MemorySeriesStorage) MetricsForLabelMatchers(
|
||||||
|
_ context.Context,
|
||||||
from, through model.Time,
|
from, through model.Time,
|
||||||
matcherSets ...metric.LabelMatchers,
|
matcherSets ...metric.LabelMatchers,
|
||||||
) ([]metric.Metric, error) {
|
) ([]metric.Metric, error) {
|
||||||
|
@ -603,7 +605,7 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers(
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
lvs, err := s.LabelValuesForLabelName(m.Name)
|
lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -693,12 +695,12 @@ func (s *MemorySeriesStorage) metricForRange(
|
||||||
}
|
}
|
||||||
|
|
||||||
// LabelValuesForLabelName implements Storage.
|
// LabelValuesForLabelName implements Storage.
|
||||||
func (s *MemorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) (model.LabelValues, error) {
|
func (s *MemorySeriesStorage) LabelValuesForLabelName(_ context.Context, labelName model.LabelName) (model.LabelValues, error) {
|
||||||
return s.persistence.labelValuesForLabelName(labelName)
|
return s.persistence.labelValuesForLabelName(labelName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DropMetricsForLabelMatchers implements Storage.
|
// DropMetricsForLabelMatchers implements Storage.
|
||||||
func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(matchers ...*metric.LabelMatcher) (int, error) {
|
func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(_ context.Context, matchers ...*metric.LabelMatcher) (int, error) {
|
||||||
fpToMetric, err := s.metricsForLabelMatchers(model.Earliest, model.Latest, matchers...)
|
fpToMetric, err := s.metricsForLabelMatchers(model.Earliest, model.Latest, matchers...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
|
|
||||||
"github.com/prometheus/common/log"
|
"github.com/prometheus/common/log"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/storage/metric"
|
"github.com/prometheus/prometheus/storage/metric"
|
||||||
"github.com/prometheus/prometheus/util/testutil"
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
|
@ -194,6 +195,7 @@ func TestMatches(t *testing.T) {
|
||||||
|
|
||||||
for _, mt := range matcherTests {
|
for _, mt := range matcherTests {
|
||||||
metrics, err := storage.MetricsForLabelMatchers(
|
metrics, err := storage.MetricsForLabelMatchers(
|
||||||
|
context.Background(),
|
||||||
model.Earliest, model.Latest,
|
model.Earliest, model.Latest,
|
||||||
mt.matchers,
|
mt.matchers,
|
||||||
)
|
)
|
||||||
|
@ -218,6 +220,7 @@ func TestMatches(t *testing.T) {
|
||||||
}
|
}
|
||||||
// Smoketest for from/through.
|
// Smoketest for from/through.
|
||||||
metrics, err = storage.MetricsForLabelMatchers(
|
metrics, err = storage.MetricsForLabelMatchers(
|
||||||
|
context.Background(),
|
||||||
model.Earliest, -10000,
|
model.Earliest, -10000,
|
||||||
mt.matchers,
|
mt.matchers,
|
||||||
)
|
)
|
||||||
|
@ -228,6 +231,7 @@ func TestMatches(t *testing.T) {
|
||||||
t.Error("expected no matches with 'through' older than any sample")
|
t.Error("expected no matches with 'through' older than any sample")
|
||||||
}
|
}
|
||||||
metrics, err = storage.MetricsForLabelMatchers(
|
metrics, err = storage.MetricsForLabelMatchers(
|
||||||
|
context.Background(),
|
||||||
10000, model.Latest,
|
10000, model.Latest,
|
||||||
mt.matchers,
|
mt.matchers,
|
||||||
)
|
)
|
||||||
|
@ -243,6 +247,7 @@ func TestMatches(t *testing.T) {
|
||||||
through model.Time = 75
|
through model.Time = 75
|
||||||
)
|
)
|
||||||
metrics, err = storage.MetricsForLabelMatchers(
|
metrics, err = storage.MetricsForLabelMatchers(
|
||||||
|
context.Background(),
|
||||||
from, through,
|
from, through,
|
||||||
mt.matchers,
|
mt.matchers,
|
||||||
)
|
)
|
||||||
|
@ -451,6 +456,7 @@ func BenchmarkLabelMatching(b *testing.B) {
|
||||||
benchLabelMatchingRes = []metric.Metric{}
|
benchLabelMatchingRes = []metric.Metric{}
|
||||||
for _, mt := range matcherTests {
|
for _, mt := range matcherTests {
|
||||||
benchLabelMatchingRes, err = s.MetricsForLabelMatchers(
|
benchLabelMatchingRes, err = s.MetricsForLabelMatchers(
|
||||||
|
context.Background(),
|
||||||
model.Earliest, model.Latest,
|
model.Earliest, model.Latest,
|
||||||
mt,
|
mt,
|
||||||
)
|
)
|
||||||
|
@ -493,7 +499,7 @@ func TestRetentionCutoff(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error creating label matcher: %s", err)
|
t.Fatalf("error creating label matcher: %s", err)
|
||||||
}
|
}
|
||||||
its, err := s.QueryRange(insertStart, now, lm)
|
its, err := s.QueryRange(context.Background(), insertStart, now, lm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -581,7 +587,7 @@ func TestDropMetrics(t *testing.T) {
|
||||||
|
|
||||||
fpList := model.Fingerprints{m1.FastFingerprint(), m2.FastFingerprint(), fpToBeArchived}
|
fpList := model.Fingerprints{m1.FastFingerprint(), m2.FastFingerprint(), fpToBeArchived}
|
||||||
|
|
||||||
n, err := s.DropMetricsForLabelMatchers(lm1)
|
n, err := s.DropMetricsForLabelMatchers(context.Background(), lm1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -614,7 +620,7 @@ func TestDropMetrics(t *testing.T) {
|
||||||
t.Errorf("chunk file does not exist for fp=%v", fpList[2])
|
t.Errorf("chunk file does not exist for fp=%v", fpList[2])
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err = s.DropMetricsForLabelMatchers(lmAll)
|
n, err = s.DropMetricsForLabelMatchers(context.Background(), lmAll)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import (
|
||||||
text_template "text/template"
|
text_template "text/template"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
"github.com/prometheus/prometheus/util/strutil"
|
"github.com/prometheus/prometheus/util/strutil"
|
||||||
|
@ -55,12 +56,12 @@ func (q queryResultByLabelSorter) Swap(i, j int) {
|
||||||
q.results[i], q.results[j] = q.results[j], q.results[i]
|
q.results[i], q.results[j] = q.results[j], q.results[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
func query(q string, timestamp model.Time, queryEngine *promql.Engine) (queryResult, error) {
|
func query(ctx context.Context, q string, timestamp model.Time, queryEngine *promql.Engine) (queryResult, error) {
|
||||||
query, err := queryEngine.NewInstantQuery(q, timestamp)
|
query, err := queryEngine.NewInstantQuery(q, timestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
res := query.Exec()
|
res := query.Exec(ctx)
|
||||||
if res.Err != nil {
|
if res.Err != nil {
|
||||||
return nil, res.Err
|
return nil, res.Err
|
||||||
}
|
}
|
||||||
|
@ -110,14 +111,14 @@ type Expander struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTemplateExpander returns a template expander ready to use.
|
// NewTemplateExpander returns a template expander ready to use.
|
||||||
func NewTemplateExpander(text string, name string, data interface{}, timestamp model.Time, queryEngine *promql.Engine, pathPrefix string) *Expander {
|
func NewTemplateExpander(ctx context.Context, text string, name string, data interface{}, timestamp model.Time, queryEngine *promql.Engine, pathPrefix string) *Expander {
|
||||||
return &Expander{
|
return &Expander{
|
||||||
text: text,
|
text: text,
|
||||||
name: name,
|
name: name,
|
||||||
data: data,
|
data: data,
|
||||||
funcMap: text_template.FuncMap{
|
funcMap: text_template.FuncMap{
|
||||||
"query": func(q string) (queryResult, error) {
|
"query": func(q string) (queryResult, error) {
|
||||||
return query(q, timestamp, queryEngine)
|
return query(ctx, q, timestamp, queryEngine)
|
||||||
},
|
},
|
||||||
"first": func(v queryResult) (*sample, error) {
|
"first": func(v queryResult) (*sample, error) {
|
||||||
if len(v) > 0 {
|
if len(v) > 0 {
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
"github.com/prometheus/prometheus/storage/local"
|
"github.com/prometheus/prometheus/storage/local"
|
||||||
|
@ -220,7 +221,7 @@ func TestTemplateExpansion(t *testing.T) {
|
||||||
for i, s := range scenarios {
|
for i, s := range scenarios {
|
||||||
var result string
|
var result string
|
||||||
var err error
|
var err error
|
||||||
expander := NewTemplateExpander(s.text, "test", s.input, time, engine, "")
|
expander := NewTemplateExpander(context.Background(), s.text, "test", s.input, time, engine, "")
|
||||||
if s.html {
|
if s.html {
|
||||||
result, err = expander.ExpandHTML(nil)
|
result, err = expander.ExpandHTML(nil)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -85,6 +85,7 @@ type apiFunc func(r *http.Request) (interface{}, *apiError)
|
||||||
// API can register a set of endpoints in a router and handle
|
// API can register a set of endpoints in a router and handle
|
||||||
// them using the provided storage and query engine.
|
// them using the provided storage and query engine.
|
||||||
type API struct {
|
type API struct {
|
||||||
|
Context context.Context
|
||||||
Storage local.Storage
|
Storage local.Storage
|
||||||
QueryEngine *promql.Engine
|
QueryEngine *promql.Engine
|
||||||
|
|
||||||
|
@ -93,8 +94,9 @@ type API struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAPI returns an initialized API type.
|
// NewAPI returns an initialized API type.
|
||||||
func NewAPI(qe *promql.Engine, st local.Storage) *API {
|
func NewAPI(ctx context.Context, qe *promql.Engine, st local.Storage) *API {
|
||||||
return &API{
|
return &API{
|
||||||
|
Context: ctx,
|
||||||
QueryEngine: qe,
|
QueryEngine: qe,
|
||||||
Storage: st,
|
Storage: st,
|
||||||
context: route.Context,
|
context: route.Context,
|
||||||
|
@ -157,7 +159,7 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) {
|
||||||
return nil, &apiError{errorBadData, err}
|
return nil, &apiError{errorBadData, err}
|
||||||
}
|
}
|
||||||
|
|
||||||
res := qry.Exec()
|
res := qry.Exec(api.Context)
|
||||||
if res.Err != nil {
|
if res.Err != nil {
|
||||||
switch res.Err.(type) {
|
switch res.Err.(type) {
|
||||||
case promql.ErrQueryCanceled:
|
case promql.ErrQueryCanceled:
|
||||||
|
@ -204,7 +206,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) {
|
||||||
return nil, &apiError{errorBadData, err}
|
return nil, &apiError{errorBadData, err}
|
||||||
}
|
}
|
||||||
|
|
||||||
res := qry.Exec()
|
res := qry.Exec(api.Context)
|
||||||
if res.Err != nil {
|
if res.Err != nil {
|
||||||
switch res.Err.(type) {
|
switch res.Err.(type) {
|
||||||
case promql.ErrQueryCanceled:
|
case promql.ErrQueryCanceled:
|
||||||
|
@ -226,7 +228,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, *apiError) {
|
||||||
if !model.LabelNameRE.MatchString(name) {
|
if !model.LabelNameRE.MatchString(name) {
|
||||||
return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}
|
return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)}
|
||||||
}
|
}
|
||||||
vals, err := api.Storage.LabelValuesForLabelName(model.LabelName(name))
|
vals, err := api.Storage.LabelValuesForLabelName(api.Context, model.LabelName(name))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &apiError{errorExec, err}
|
return nil, &apiError{errorExec, err}
|
||||||
}
|
}
|
||||||
|
@ -272,7 +274,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) {
|
||||||
matcherSets = append(matcherSets, matchers)
|
matcherSets = append(matcherSets, matchers)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := api.Storage.MetricsForLabelMatchers(start, end, matcherSets...)
|
res, err := api.Storage.MetricsForLabelMatchers(api.Context, start, end, matcherSets...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &apiError{errorExec, err}
|
return nil, &apiError{errorExec, err}
|
||||||
}
|
}
|
||||||
|
@ -296,7 +298,7 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &apiError{errorBadData, err}
|
return nil, &apiError{errorBadData, err}
|
||||||
}
|
}
|
||||||
n, err := api.Storage.DropMetricsForLabelMatchers(matchers...)
|
n, err := api.Storage.DropMetricsForLabelMatchers(context.TODO(), matchers...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &apiError{errorExec, err}
|
return nil, &apiError{errorExec, err}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,7 @@ func TestEndpoints(t *testing.T) {
|
||||||
api := &API{
|
api := &API{
|
||||||
Storage: suite.Storage(),
|
Storage: suite.Storage(),
|
||||||
QueryEngine: suite.QueryEngine(),
|
QueryEngine: suite.QueryEngine(),
|
||||||
|
Context: suite.Context(),
|
||||||
now: func() model.Time { return now },
|
now: func() model.Time { return now },
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,7 +50,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
||||||
)
|
)
|
||||||
w.Header().Set("Content-Type", string(format))
|
w.Header().Set("Content-Type", string(format))
|
||||||
|
|
||||||
vector, err := h.storage.LastSampleForLabelMatchers(minTimestamp, matcherSets...)
|
vector, err := h.storage.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
|
39
web/web.go
39
web/web.go
|
@ -36,6 +36,7 @@ import (
|
||||||
"github.com/prometheus/common/log"
|
"github.com/prometheus/common/log"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/common/route"
|
"github.com/prometheus/common/route"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
|
@ -55,6 +56,7 @@ type Handler struct {
|
||||||
targetManager *retrieval.TargetManager
|
targetManager *retrieval.TargetManager
|
||||||
ruleManager *rules.Manager
|
ruleManager *rules.Manager
|
||||||
queryEngine *promql.Engine
|
queryEngine *promql.Engine
|
||||||
|
context context.Context
|
||||||
storage local.Storage
|
storage local.Storage
|
||||||
|
|
||||||
apiV1 *api_v1.API
|
apiV1 *api_v1.API
|
||||||
|
@ -97,6 +99,14 @@ type PrometheusVersion struct {
|
||||||
|
|
||||||
// Options for the web Handler.
|
// Options for the web Handler.
|
||||||
type Options struct {
|
type Options struct {
|
||||||
|
Context context.Context
|
||||||
|
Storage local.Storage
|
||||||
|
QueryEngine *promql.Engine
|
||||||
|
TargetManager *retrieval.TargetManager
|
||||||
|
RuleManager *rules.Manager
|
||||||
|
Version *PrometheusVersion
|
||||||
|
Flags map[string]string
|
||||||
|
|
||||||
ListenAddress string
|
ListenAddress string
|
||||||
ExternalURL *url.URL
|
ExternalURL *url.URL
|
||||||
RoutePrefix string
|
RoutePrefix string
|
||||||
|
@ -109,15 +119,7 @@ type Options struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New initializes a new web Handler.
|
// New initializes a new web Handler.
|
||||||
func New(
|
func New(o *Options) *Handler {
|
||||||
st local.Storage,
|
|
||||||
qe *promql.Engine,
|
|
||||||
tm *retrieval.TargetManager,
|
|
||||||
rm *rules.Manager,
|
|
||||||
version *PrometheusVersion,
|
|
||||||
flags map[string]string,
|
|
||||||
o *Options,
|
|
||||||
) *Handler {
|
|
||||||
router := route.New()
|
router := route.New()
|
||||||
|
|
||||||
h := &Handler{
|
h := &Handler{
|
||||||
|
@ -126,16 +128,17 @@ func New(
|
||||||
quitCh: make(chan struct{}),
|
quitCh: make(chan struct{}),
|
||||||
reloadCh: make(chan chan error),
|
reloadCh: make(chan chan error),
|
||||||
options: o,
|
options: o,
|
||||||
versionInfo: version,
|
versionInfo: o.Version,
|
||||||
birth: time.Now(),
|
birth: time.Now(),
|
||||||
flagsMap: flags,
|
flagsMap: o.Flags,
|
||||||
|
|
||||||
targetManager: tm,
|
context: o.Context,
|
||||||
ruleManager: rm,
|
targetManager: o.TargetManager,
|
||||||
queryEngine: qe,
|
ruleManager: o.RuleManager,
|
||||||
storage: st,
|
queryEngine: o.QueryEngine,
|
||||||
|
storage: o.Storage,
|
||||||
|
|
||||||
apiV1: api_v1.NewAPI(qe, st),
|
apiV1: api_v1.NewAPI(o.Context, o.QueryEngine, o.Storage),
|
||||||
now: model.Now,
|
now: model.Now,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -293,7 +296,7 @@ func (h *Handler) consoles(w http.ResponseWriter, r *http.Request) {
|
||||||
Path: strings.TrimLeft(name, "/"),
|
Path: strings.TrimLeft(name, "/"),
|
||||||
}
|
}
|
||||||
|
|
||||||
tmpl := template.NewTemplateExpander(string(text), "__console_"+name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path)
|
tmpl := template.NewTemplateExpander(h.context, string(text), "__console_"+name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path)
|
||||||
filenames, err := filepath.Glob(h.options.ConsoleLibrariesPath + "/*.lib")
|
filenames, err := filepath.Glob(h.options.ConsoleLibrariesPath + "/*.lib")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
@ -466,7 +469,7 @@ func (h *Handler) executeTemplate(w http.ResponseWriter, name string, data inter
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
}
|
}
|
||||||
|
|
||||||
tmpl := template.NewTemplateExpander(text, name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path)
|
tmpl := template.NewTemplateExpander(h.context, text, name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path)
|
||||||
tmpl.Funcs(tmplFuncs(h.consolesPath(), h.options))
|
tmpl.Funcs(tmplFuncs(h.consolesPath(), h.options))
|
||||||
|
|
||||||
result, err := tmpl.ExpandHTML(nil)
|
result, err := tmpl.ExpandHTML(nil)
|
||||||
|
|
Loading…
Reference in a new issue