mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #661 from prometheus/fabxc/pql/rule-stmts
Stop routing rule statements through the engine.
This commit is contained in:
commit
adba3b4daa
13
main.go
13
main.go
|
@ -167,16 +167,9 @@ func NewPrometheus() *prometheus {
|
||||||
PrometheusURL: web.MustBuildServerURL(*pathPrefix),
|
PrometheusURL: web.MustBuildServerURL(*pathPrefix),
|
||||||
PathPrefix: *pathPrefix,
|
PathPrefix: *pathPrefix,
|
||||||
})
|
})
|
||||||
for _, rf := range conf.Global.GetRuleFile() {
|
if err := ruleManager.LoadRuleFiles(conf.Global.GetRuleFile()...); err != nil {
|
||||||
query, err := queryEngine.NewQueryFromFile(rf)
|
glog.Errorf("Error loading rule files: %s", err)
|
||||||
if err != nil {
|
os.Exit(1)
|
||||||
glog.Errorf("Error loading rule file %q: %s", rf, err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
if res := query.Exec(); res.Err != nil {
|
|
||||||
glog.Errorf("Error initializing rules: %s", res.Err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
flags := map[string]string{}
|
flags := map[string]string{}
|
||||||
|
|
172
promql/engine.go
172
promql/engine.go
|
@ -16,11 +16,9 @@ package promql
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"math"
|
"math"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
@ -165,14 +163,10 @@ type (
|
||||||
ErrQueryTimeout string
|
ErrQueryTimeout string
|
||||||
// ErrQueryCanceled is returned if a query was canceled during processing.
|
// ErrQueryCanceled is returned if a query was canceled during processing.
|
||||||
ErrQueryCanceled string
|
ErrQueryCanceled string
|
||||||
// ErrNoHandlers is returned if no handlers were registered for the
|
|
||||||
// execution of a statement.
|
|
||||||
ErrNoHandlers string
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (e ErrQueryTimeout) Error() string { return fmt.Sprintf("query timed out in %s", e) }
|
func (e ErrQueryTimeout) Error() string { return fmt.Sprintf("query timed out in %s", e) }
|
||||||
func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was canceled in %s", e) }
|
func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was canceled in %s", e) }
|
||||||
func (e ErrNoHandlers) Error() string { return fmt.Sprintf("no handlers registered to process %s", e) }
|
|
||||||
|
|
||||||
// A Query is derived from an a raw query string and can be run against an engine
|
// A Query is derived from an a raw query string and can be run against an engine
|
||||||
// it is associated with.
|
// it is associated with.
|
||||||
|
@ -193,9 +187,6 @@ type query struct {
|
||||||
q string
|
q string
|
||||||
// Statements of the parsed query.
|
// Statements of the parsed query.
|
||||||
stmts Statements
|
stmts Statements
|
||||||
// On finished execution two bools indicating success of the execution
|
|
||||||
// are sent on the channel.
|
|
||||||
done chan bool
|
|
||||||
// Timer stats for the query execution.
|
// Timer stats for the query execution.
|
||||||
stats *stats.TimerGroup
|
stats *stats.TimerGroup
|
||||||
// Cancelation function for the query.
|
// Cancelation function for the query.
|
||||||
|
@ -231,15 +222,6 @@ func (q *query) Exec() *Result {
|
||||||
return &Result{Err: err, Value: res}
|
return &Result{Err: err, Value: res}
|
||||||
}
|
}
|
||||||
|
|
||||||
type (
|
|
||||||
// AlertHandlers can be registered with an engine and are called on
|
|
||||||
// each executed alert statement.
|
|
||||||
AlertHandler func(context.Context, *AlertStmt) error
|
|
||||||
// RecordHandlers can be registered with an engine and are called on
|
|
||||||
// each executed record statement.
|
|
||||||
RecordHandler func(context.Context, *RecordStmt) error
|
|
||||||
)
|
|
||||||
|
|
||||||
// contextDone returns an error if the context was canceled or timed out.
|
// contextDone returns an error if the context was canceled or timed out.
|
||||||
func contextDone(ctx context.Context, env string) error {
|
func contextDone(ctx context.Context, env string) error {
|
||||||
select {
|
select {
|
||||||
|
@ -258,32 +240,24 @@ func contextDone(ctx context.Context, env string) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Engine handles the liftetime of queries from beginning to end. It is connected
|
// Engine handles the liftetime of queries from beginning to end.
|
||||||
// to a storage.
|
// It is connected to a storage.
|
||||||
type Engine struct {
|
type Engine struct {
|
||||||
sync.RWMutex
|
|
||||||
|
|
||||||
// The storage on which the engine operates.
|
// The storage on which the engine operates.
|
||||||
storage local.Storage
|
storage local.Storage
|
||||||
|
|
||||||
// The base context for all queries and its cancellation function.
|
// The base context for all queries and its cancellation function.
|
||||||
baseCtx context.Context
|
baseCtx context.Context
|
||||||
cancelQueries func()
|
cancelQueries func()
|
||||||
|
|
||||||
// Handlers for the statements.
|
|
||||||
alertHandlers map[string]AlertHandler
|
|
||||||
recordHandlers map[string]RecordHandler
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEngine returns a new engine.
|
// NewEngine returns a new engine.
|
||||||
func NewEngine(storage local.Storage) *Engine {
|
func NewEngine(storage local.Storage) *Engine {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
return &Engine{
|
return &Engine{
|
||||||
storage: storage,
|
storage: storage,
|
||||||
baseCtx: ctx,
|
baseCtx: ctx,
|
||||||
cancelQueries: cancel,
|
cancelQueries: cancel,
|
||||||
alertHandlers: map[string]AlertHandler{},
|
|
||||||
recordHandlers: map[string]RecordHandler{},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,31 +266,6 @@ func (ng *Engine) Stop() {
|
||||||
ng.cancelQueries()
|
ng.cancelQueries()
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewQuery returns a new query of the given query string.
|
|
||||||
func (ng *Engine) NewQuery(qs string) (Query, error) {
|
|
||||||
stmts, err := ParseStmts(qs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
query := &query{
|
|
||||||
q: qs,
|
|
||||||
stmts: stmts,
|
|
||||||
ng: ng,
|
|
||||||
done: make(chan bool, 2),
|
|
||||||
stats: stats.NewTimerGroup(),
|
|
||||||
}
|
|
||||||
return query, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewQueryFromFile reads a file and returns a query of statements it contains.
|
|
||||||
func (ng *Engine) NewQueryFromFile(filename string) (Query, error) {
|
|
||||||
content, err := ioutil.ReadFile(filename)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return ng.NewQuery(string(content))
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewInstantQuery returns an evaluation query for the given expression at the given time.
|
// NewInstantQuery returns an evaluation query for the given expression at the given time.
|
||||||
func (ng *Engine) NewInstantQuery(es string, ts clientmodel.Timestamp) (Query, error) {
|
func (ng *Engine) NewInstantQuery(es string, ts clientmodel.Timestamp) (Query, error) {
|
||||||
return ng.NewRangeQuery(es, ts, ts, 0)
|
return ng.NewRangeQuery(es, ts, ts, 0)
|
||||||
|
@ -336,77 +285,64 @@ func (ng *Engine) NewRangeQuery(qs string, start, end clientmodel.Timestamp, int
|
||||||
Interval: interval,
|
Interval: interval,
|
||||||
}
|
}
|
||||||
|
|
||||||
query := &query{
|
qry := &query{
|
||||||
q: qs,
|
q: qs,
|
||||||
stmts: Statements{es},
|
stmts: Statements{es},
|
||||||
ng: ng,
|
ng: ng,
|
||||||
done: make(chan bool, 2),
|
|
||||||
stats: stats.NewTimerGroup(),
|
stats: stats.NewTimerGroup(),
|
||||||
}
|
}
|
||||||
return query, nil
|
return qry, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// exec executes all statements in the query. For evaluation statements only
|
// testStmt is an internal helper statement that allows execution
|
||||||
// one statement per query is allowed, after which the execution returns.
|
// of an arbitrary function during handling. It is used to test the Engine.
|
||||||
|
type testStmt func(context.Context) error
|
||||||
|
|
||||||
|
func (testStmt) String() string { return "test statement" }
|
||||||
|
func (testStmt) DotGraph() string { return "test statement" }
|
||||||
|
func (testStmt) stmt() {}
|
||||||
|
|
||||||
|
func (ng *Engine) newTestQuery(stmts ...Statement) Query {
|
||||||
|
qry := &query{
|
||||||
|
q: "test statement",
|
||||||
|
stmts: Statements(stmts),
|
||||||
|
ng: ng,
|
||||||
|
stats: stats.NewTimerGroup(),
|
||||||
|
}
|
||||||
|
return qry
|
||||||
|
}
|
||||||
|
|
||||||
|
// exec executes the query.
|
||||||
|
//
|
||||||
|
// At this point per query only one EvalStmt is evaluated. Alert and record
|
||||||
|
// statements are not handled by the Engine.
|
||||||
func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
|
func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
|
||||||
const env = "query execution"
|
const env = "query execution"
|
||||||
|
|
||||||
// Cancel when execution is done or an error was raised.
|
// Cancel when execution is done or an error was raised.
|
||||||
defer q.cancel()
|
defer q.cancel()
|
||||||
|
|
||||||
// The base context might already be canceled (e.g. during shutdown).
|
|
||||||
if err := contextDone(ctx, env); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
evalTimer := q.stats.GetTimer(stats.TotalEvalTime).Start()
|
evalTimer := q.stats.GetTimer(stats.TotalEvalTime).Start()
|
||||||
defer evalTimer.Stop()
|
defer evalTimer.Stop()
|
||||||
|
|
||||||
ng.RLock()
|
|
||||||
alertHandlers := []AlertHandler{}
|
|
||||||
for _, h := range ng.alertHandlers {
|
|
||||||
alertHandlers = append(alertHandlers, h)
|
|
||||||
}
|
|
||||||
recordHandlers := []RecordHandler{}
|
|
||||||
for _, h := range ng.recordHandlers {
|
|
||||||
recordHandlers = append(recordHandlers, h)
|
|
||||||
}
|
|
||||||
ng.RUnlock()
|
|
||||||
|
|
||||||
for _, stmt := range q.stmts {
|
for _, stmt := range q.stmts {
|
||||||
|
// The base context might already be canceled on the first iteration (e.g. during shutdown).
|
||||||
|
if err := contextDone(ctx, env); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
switch s := stmt.(type) {
|
switch s := stmt.(type) {
|
||||||
case *AlertStmt:
|
|
||||||
if len(alertHandlers) == 0 {
|
|
||||||
return nil, ErrNoHandlers("alert statement")
|
|
||||||
}
|
|
||||||
for _, h := range alertHandlers {
|
|
||||||
if err := contextDone(ctx, env); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err := h(ctx, s)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case *RecordStmt:
|
|
||||||
if len(recordHandlers) == 0 {
|
|
||||||
return nil, ErrNoHandlers("record statement")
|
|
||||||
}
|
|
||||||
for _, h := range recordHandlers {
|
|
||||||
if err := contextDone(ctx, env); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err := h(ctx, s)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case *EvalStmt:
|
case *EvalStmt:
|
||||||
// Currently, only one execution statement per query is allowed.
|
// Currently, only one execution statement per query is allowed.
|
||||||
return ng.execEvalStmt(ctx, q, s)
|
return ng.execEvalStmt(ctx, q, s)
|
||||||
|
|
||||||
|
case testStmt:
|
||||||
|
if err := s(ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
panic(fmt.Errorf("statement of unknown type %T", stmt))
|
panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", stmt))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -1050,34 +986,6 @@ func (ev *evaluator) aggregation(op itemType, grouping clientmodel.LabelNames, k
|
||||||
return resultVector
|
return resultVector
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterAlertHandler registers a new alert handler of the given name.
|
|
||||||
func (ng *Engine) RegisterAlertHandler(name string, h AlertHandler) {
|
|
||||||
ng.Lock()
|
|
||||||
ng.alertHandlers[name] = h
|
|
||||||
ng.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// RegisterRecordHandler registers a new record handler of the given name.
|
|
||||||
func (ng *Engine) RegisterRecordHandler(name string, h RecordHandler) {
|
|
||||||
ng.Lock()
|
|
||||||
ng.recordHandlers[name] = h
|
|
||||||
ng.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnregisterAlertHandler removes the alert handler with the given name.
|
|
||||||
func (ng *Engine) UnregisterAlertHandler(name string) {
|
|
||||||
ng.Lock()
|
|
||||||
delete(ng.alertHandlers, name)
|
|
||||||
ng.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnregisterRecordHandler removes the record handler with the given name.
|
|
||||||
func (ng *Engine) UnregisterRecordHandler(name string) {
|
|
||||||
ng.Lock()
|
|
||||||
delete(ng.recordHandlers, name)
|
|
||||||
ng.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// btos returns 1 if b is true, 0 otherwise.
|
// btos returns 1 if b is true, 0 otherwise.
|
||||||
func btos(b bool) clientmodel.SampleValue {
|
func btos(b bool) clientmodel.SampleValue {
|
||||||
if b {
|
if b {
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package promql
|
package promql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"reflect"
|
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -11,6 +10,10 @@ import (
|
||||||
"github.com/prometheus/prometheus/storage/local"
|
"github.com/prometheus/prometheus/storage/local"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var noop = testStmt(func(context.Context) error {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
func TestQueryTimeout(t *testing.T) {
|
func TestQueryTimeout(t *testing.T) {
|
||||||
*defaultQueryTimeout = 5 * time.Millisecond
|
*defaultQueryTimeout = 5 * time.Millisecond
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -24,23 +27,14 @@ func TestQueryTimeout(t *testing.T) {
|
||||||
engine := NewEngine(storage)
|
engine := NewEngine(storage)
|
||||||
defer engine.Stop()
|
defer engine.Stop()
|
||||||
|
|
||||||
query, err := engine.NewQuery("foo = bar")
|
f1 := testStmt(func(context.Context) error {
|
||||||
if err != nil {
|
time.Sleep(10 * time.Millisecond)
|
||||||
t.Fatalf("error parsing query: %s", err)
|
return nil
|
||||||
}
|
})
|
||||||
|
|
||||||
// Timeouts are not exact but checked in designated places. For example between
|
// Timeouts are not exact but checked in designated places. For example between
|
||||||
// invoking handlers. Thus, we reigster two handlers that take some time to ensure we check
|
// invoking test statements.
|
||||||
// after exceeding the timeout.
|
query := engine.newTestQuery(f1, f1)
|
||||||
// Should the implementation of this area change, the test might have to be adjusted.
|
|
||||||
engine.RegisterRecordHandler("test", func(context.Context, *RecordStmt) error {
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
engine.RegisterRecordHandler("test2", func(context.Context, *RecordStmt) error {
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
res := query.Exec()
|
res := query.Exec()
|
||||||
if res.Err == nil {
|
if res.Err == nil {
|
||||||
|
@ -58,26 +52,16 @@ func TestQueryCancel(t *testing.T) {
|
||||||
engine := NewEngine(storage)
|
engine := NewEngine(storage)
|
||||||
defer engine.Stop()
|
defer engine.Stop()
|
||||||
|
|
||||||
query1, err := engine.NewQuery("foo = bar")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("error parsing query: %s", err)
|
|
||||||
}
|
|
||||||
query2, err := engine.NewQuery("foo = baz")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("error parsing query: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// As for timeouts, cancellation is only checked at designated points. We ensure
|
// As for timeouts, cancellation is only checked at designated points. We ensure
|
||||||
// that we reach one of those points using the same method.
|
// that we reach one of those points using the same method.
|
||||||
engine.RegisterRecordHandler("test1", func(context.Context, *RecordStmt) error {
|
f1 := testStmt(func(context.Context) error {
|
||||||
<-time.After(2 * time.Millisecond)
|
time.Sleep(2 * time.Millisecond)
|
||||||
return nil
|
|
||||||
})
|
|
||||||
engine.RegisterRecordHandler("test2", func(context.Context, *RecordStmt) error {
|
|
||||||
<-time.After(2 * time.Millisecond)
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
query1 := engine.newTestQuery(f1, f1)
|
||||||
|
query2 := engine.newTestQuery(f1, f1)
|
||||||
|
|
||||||
// Cancel query after starting it.
|
// Cancel query after starting it.
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var res *Result
|
var res *Result
|
||||||
|
@ -87,7 +71,7 @@ func TestQueryCancel(t *testing.T) {
|
||||||
res = query1.Exec()
|
res = query1.Exec()
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
<-time.After(1 * time.Millisecond)
|
time.Sleep(1 * time.Millisecond)
|
||||||
query1.Cancel()
|
query1.Cancel()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
@ -112,34 +96,20 @@ func TestEngineShutdown(t *testing.T) {
|
||||||
|
|
||||||
engine := NewEngine(storage)
|
engine := NewEngine(storage)
|
||||||
|
|
||||||
query1, err := engine.NewQuery("foo = bar")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("error parsing query: %s", err)
|
|
||||||
}
|
|
||||||
query2, err := engine.NewQuery("foo = baz")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("error parsing query: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
handlerExecutions := 0
|
handlerExecutions := 0
|
||||||
|
|
||||||
// Shutdown engine on first handler execution. Should handler execution ever become
|
// Shutdown engine on first handler execution. Should handler execution ever become
|
||||||
// concurrent this test has to be adjusted accordingly.
|
// concurrent this test has to be adjusted accordingly.
|
||||||
engine.RegisterRecordHandler("test", func(context.Context, *RecordStmt) error {
|
f1 := testStmt(func(context.Context) error {
|
||||||
handlerExecutions++
|
|
||||||
engine.Stop()
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
engine.RegisterRecordHandler("test2", func(context.Context, *RecordStmt) error {
|
|
||||||
handlerExecutions++
|
handlerExecutions++
|
||||||
engine.Stop()
|
engine.Stop()
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
query1 := engine.newTestQuery(f1, f1)
|
||||||
|
query2 := engine.newTestQuery(f1, f1)
|
||||||
|
|
||||||
// Stopping the engine should cancel the base context. While setting up queries is
|
// Stopping the engine must cancel the base context. While executing queries is
|
||||||
// still possible their context is canceled from the beginning and execution should
|
// still possible, their context is canceled from the beginning and execution should
|
||||||
// terminate immediately.
|
// terminate immediately.
|
||||||
|
|
||||||
res := query1.Exec()
|
res := query1.Exec()
|
||||||
|
@ -147,7 +117,7 @@ func TestEngineShutdown(t *testing.T) {
|
||||||
t.Fatalf("expected error on shutdown during query but got none")
|
t.Fatalf("expected error on shutdown during query but got none")
|
||||||
}
|
}
|
||||||
if handlerExecutions != 1 {
|
if handlerExecutions != 1 {
|
||||||
t.Fatalf("expected only one handler to be executed before query cancellation but got %d executons", handlerExecutions)
|
t.Fatalf("expected only one handler to be executed before query cancellation but got %d executions", handlerExecutions)
|
||||||
}
|
}
|
||||||
|
|
||||||
res2 := query2.Exec()
|
res2 := query2.Exec()
|
||||||
|
@ -159,114 +129,3 @@ func TestEngineShutdown(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAlertHandler(t *testing.T) {
|
|
||||||
storage, closer := local.NewTestStorage(t, 1)
|
|
||||||
defer closer.Close()
|
|
||||||
|
|
||||||
engine := NewEngine(storage)
|
|
||||||
defer engine.Stop()
|
|
||||||
|
|
||||||
qs := `ALERT Foo IF bar FOR 5m WITH {a="b"} SUMMARY "sum" DESCRIPTION "desc"`
|
|
||||||
|
|
||||||
doQuery := func(expectFailure bool) *AlertStmt {
|
|
||||||
query, err := engine.NewQuery(qs)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("error parsing query: %s", err)
|
|
||||||
}
|
|
||||||
res := query.Exec()
|
|
||||||
if expectFailure && res.Err == nil {
|
|
||||||
t.Fatalf("expected error but got none.")
|
|
||||||
}
|
|
||||||
if res.Err != nil && !expectFailure {
|
|
||||||
t.Fatalf("error on executing alert query: %s", res.Err)
|
|
||||||
}
|
|
||||||
// That this alert statement is correct is tested elsewhere.
|
|
||||||
return query.Statements()[0].(*AlertStmt)
|
|
||||||
}
|
|
||||||
|
|
||||||
// We expect an error if nothing is registered to handle the query.
|
|
||||||
alertStmt := doQuery(true)
|
|
||||||
|
|
||||||
receivedCalls := 0
|
|
||||||
|
|
||||||
// Ensure that we receive the correct statement.
|
|
||||||
engine.RegisterAlertHandler("test", func(ctx context.Context, as *AlertStmt) error {
|
|
||||||
if !reflect.DeepEqual(alertStmt, as) {
|
|
||||||
t.Errorf("received alert statement did not match input: %q", qs)
|
|
||||||
t.Fatalf("no match\n\nexpected:\n%s\ngot: \n%s\n", Tree(alertStmt), Tree(as))
|
|
||||||
}
|
|
||||||
receivedCalls++
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
doQuery(false)
|
|
||||||
if receivedCalls != i+1 {
|
|
||||||
t.Fatalf("alert handler was not called on query execution")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
engine.UnregisterAlertHandler("test")
|
|
||||||
|
|
||||||
// We must receive no further calls after unregistering.
|
|
||||||
doQuery(true)
|
|
||||||
if receivedCalls != 10 {
|
|
||||||
t.Fatalf("received calls after unregistering alert handler")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRecordHandler(t *testing.T) {
|
|
||||||
storage, closer := local.NewTestStorage(t, 1)
|
|
||||||
defer closer.Close()
|
|
||||||
|
|
||||||
engine := NewEngine(storage)
|
|
||||||
defer engine.Stop()
|
|
||||||
|
|
||||||
qs := `foo = bar`
|
|
||||||
|
|
||||||
doQuery := func(expectFailure bool) *RecordStmt {
|
|
||||||
query, err := engine.NewQuery(qs)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("error parsing query: %s", err)
|
|
||||||
}
|
|
||||||
res := query.Exec()
|
|
||||||
if expectFailure && res.Err == nil {
|
|
||||||
t.Fatalf("expected error but got none.")
|
|
||||||
}
|
|
||||||
if res.Err != nil && !expectFailure {
|
|
||||||
t.Fatalf("error on executing record query: %s", res.Err)
|
|
||||||
}
|
|
||||||
return query.Statements()[0].(*RecordStmt)
|
|
||||||
}
|
|
||||||
|
|
||||||
// We expect an error if nothing is registered to handle the query.
|
|
||||||
recordStmt := doQuery(true)
|
|
||||||
|
|
||||||
receivedCalls := 0
|
|
||||||
|
|
||||||
// Ensure that we receive the correct statement.
|
|
||||||
engine.RegisterRecordHandler("test", func(ctx context.Context, rs *RecordStmt) error {
|
|
||||||
if !reflect.DeepEqual(recordStmt, rs) {
|
|
||||||
t.Errorf("received record statement did not match input: %q", qs)
|
|
||||||
t.Fatalf("no match\n\nexpected:\n%s\ngot: \n%s\n", Tree(recordStmt), Tree(rs))
|
|
||||||
}
|
|
||||||
receivedCalls++
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
doQuery(false)
|
|
||||||
if receivedCalls != i+1 {
|
|
||||||
t.Fatalf("record handler was not called on query execution")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
engine.UnregisterRecordHandler("test")
|
|
||||||
|
|
||||||
// We must receive no further calls after unregistering.
|
|
||||||
doQuery(true)
|
|
||||||
if receivedCalls != 10 {
|
|
||||||
t.Fatalf("received calls after unregistering record handler")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -15,12 +15,12 @@ package rules
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"golang.org/x/net/context"
|
|
||||||
|
|
||||||
clientmodel "github.com/prometheus/client_golang/model"
|
clientmodel "github.com/prometheus/client_golang/model"
|
||||||
|
|
||||||
|
@ -113,9 +113,6 @@ func NewManager(o *ManagerOptions) *Manager {
|
||||||
notificationHandler: o.NotificationHandler,
|
notificationHandler: o.NotificationHandler,
|
||||||
prometheusURL: o.PrometheusURL,
|
prometheusURL: o.PrometheusURL,
|
||||||
}
|
}
|
||||||
manager.queryEngine.RegisterAlertHandler("rule_manager", manager.AddAlertingRule)
|
|
||||||
manager.queryEngine.RegisterRecordHandler("rule_manager", manager.AddRecordingRule)
|
|
||||||
|
|
||||||
return manager
|
return manager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -258,24 +255,37 @@ func (m *Manager) runIteration() {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) AddAlertingRule(ctx context.Context, r *promql.AlertStmt) error {
|
// LoadRuleFiles loads alerting and recording rules from the given files.
|
||||||
rule := NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Summary, r.Description)
|
func (m *Manager) LoadRuleFiles(filenames ...string) error {
|
||||||
|
|
||||||
m.Lock()
|
m.Lock()
|
||||||
m.rules = append(m.rules, rule)
|
defer m.Unlock()
|
||||||
m.Unlock()
|
|
||||||
return nil
|
for _, fn := range filenames {
|
||||||
}
|
content, err := ioutil.ReadFile(fn)
|
||||||
|
if err != nil {
|
||||||
func (m *Manager) AddRecordingRule(ctx context.Context, r *promql.RecordStmt) error {
|
return err
|
||||||
rule := &RecordingRule{r.Name, r.Expr, r.Labels}
|
}
|
||||||
|
stmts, err := promql.ParseStmts(string(content))
|
||||||
m.Lock()
|
if err != nil {
|
||||||
m.rules = append(m.rules, rule)
|
return fmt.Errorf("error parsing %s: %s", fn, err)
|
||||||
m.Unlock()
|
}
|
||||||
|
for _, stmt := range stmts {
|
||||||
|
switch r := stmt.(type) {
|
||||||
|
case *promql.AlertStmt:
|
||||||
|
rule := NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Summary, r.Description)
|
||||||
|
m.rules = append(m.rules, rule)
|
||||||
|
case *promql.RecordStmt:
|
||||||
|
rule := &RecordingRule{r.Name, r.Expr, r.Labels}
|
||||||
|
m.rules = append(m.rules, rule)
|
||||||
|
default:
|
||||||
|
panic("retrieval.Manager.LoadRuleFiles: unknown statement type")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Rules returns the list of the manager's rules.
|
||||||
func (m *Manager) Rules() []Rule {
|
func (m *Manager) Rules() []Rule {
|
||||||
m.Lock()
|
m.Lock()
|
||||||
defer m.Unlock()
|
defer m.Unlock()
|
||||||
|
@ -285,6 +295,7 @@ func (m *Manager) Rules() []Rule {
|
||||||
return rules
|
return rules
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AlertingRules returns the list of the manager's alerting rules.
|
||||||
func (m *Manager) AlertingRules() []*AlertingRule {
|
func (m *Manager) AlertingRules() []*AlertingRule {
|
||||||
m.Lock()
|
m.Lock()
|
||||||
defer m.Unlock()
|
defer m.Unlock()
|
||||||
|
|
Loading…
Reference in a new issue