Merge pull request #975 from prometheus/fabxc/single-query

Remove multi-statement queries
This commit is contained in:
Fabian Reinartz 2015-08-10 15:13:47 +02:00
commit 5e9ece46d5
2 changed files with 82 additions and 75 deletions

View file

@ -196,8 +196,8 @@ func (e ErrQueryCanceled) Error() string { return fmt.Sprintf("query was cancele
type Query interface { type Query interface {
// Exec processes the query and // Exec processes the query and
Exec() *Result Exec() *Result
// Statements returns the parsed statements of the query. // Statement returns the parsed statement of the query.
Statements() Statements Statement() Statement
// Stats returns statistics about the lifetime of the query. // Stats returns statistics about the lifetime of the query.
Stats() *stats.TimerGroup Stats() *stats.TimerGroup
// Cancel signals that a running query execution should be aborted. // Cancel signals that a running query execution should be aborted.
@ -208,8 +208,8 @@ type Query interface {
type query struct { type query struct {
// The original query string. // The original query string.
q string q string
// Statements of the parsed query. // Statement of the parsed query.
stmts Statements stmt Statement
// Timer stats for the query execution. // Timer stats for the query execution.
stats *stats.TimerGroup stats *stats.TimerGroup
// Cancelation function for the query. // Cancelation function for the query.
@ -219,9 +219,9 @@ type query struct {
ng *Engine ng *Engine
} }
// Statements implements the Query interface. // Statement implements the Query interface.
func (q *query) Statements() Statements { func (q *query) Statement() Statement {
return q.stmts return q.stmt
} }
// Stats implements the Query interface. // Stats implements the Query interface.
@ -343,7 +343,7 @@ func (ng *Engine) newQuery(expr Expr, start, end clientmodel.Timestamp, interval
Interval: interval, Interval: interval,
} }
qry := &query{ qry := &query{
stmts: Statements{es}, stmt: es,
ng: ng, ng: ng,
stats: stats.NewTimerGroup(), stats: stats.NewTimerGroup(),
} }
@ -358,10 +358,10 @@ func (testStmt) String() string { return "test statement" }
func (testStmt) DotGraph() string { return "test statement" } func (testStmt) DotGraph() string { return "test statement" }
func (testStmt) stmt() {} func (testStmt) stmt() {}
func (ng *Engine) newTestQuery(stmts ...Statement) Query { func (ng *Engine) newTestQuery(f func(context.Context) error) Query {
qry := &query{ qry := &query{
q: "test statement", q: "test statement",
stmts: Statements(stmts), stmt: testStmt(f),
ng: ng, ng: ng,
stats: stats.NewTimerGroup(), stats: stats.NewTimerGroup(),
} }
@ -373,8 +373,6 @@ func (ng *Engine) newTestQuery(stmts ...Statement) Query {
// At this point per query only one EvalStmt is evaluated. Alert and record // At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine. // statements are not handled by the Engine.
func (ng *Engine) exec(q *query) (Value, error) { func (ng *Engine) exec(q *query) (Value, error) {
const env = "query execution"
ctx, cancel := context.WithTimeout(q.ng.baseCtx, ng.options.Timeout) ctx, cancel := context.WithTimeout(q.ng.baseCtx, ng.options.Timeout)
q.cancel = cancel q.cancel = cancel
@ -390,30 +388,24 @@ func (ng *Engine) exec(q *query) (Value, error) {
// Cancel when execution is done or an error was raised. // Cancel when execution is done or an error was raised.
defer q.cancel() defer q.cancel()
const env = "query execution"
evalTimer := q.stats.GetTimer(stats.TotalEvalTime).Start() evalTimer := q.stats.GetTimer(stats.TotalEvalTime).Start()
defer evalTimer.Stop() defer evalTimer.Stop()
for _, stmt := range q.stmts {
// The base context might already be canceled on the first iteration (e.g. during shutdown). // The base context might already be canceled on the first iteration (e.g. during shutdown).
if err := contextDone(ctx, env); err != nil { if err := contextDone(ctx, env); err != nil {
return nil, err return nil, err
} }
switch s := stmt.(type) { switch s := q.Statement().(type) {
case *EvalStmt: case *EvalStmt:
// Currently, only one execution statement per query is allowed.
return ng.execEvalStmt(ctx, q, s) return ng.execEvalStmt(ctx, q, s)
case testStmt: case testStmt:
if err := s(ctx); err != nil { return nil, s(ctx)
return nil, err
} }
default: panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement()))
panic(fmt.Errorf("promql.Engine.exec: unhandled statement of type %T", stmt))
}
}
return nil, nil
} }
// execEvalStmt evaluates the expression of an evaluation statement for the given time range. // execEvalStmt evaluates the expression of an evaluation statement for the given time range.

View file

@ -1,7 +1,6 @@
package promql package promql
import ( import (
"sync"
"testing" "testing"
"time" "time"
@ -18,14 +17,15 @@ func TestQueryConcurreny(t *testing.T) {
block := make(chan struct{}) block := make(chan struct{})
processing := make(chan struct{}) processing := make(chan struct{})
f1 := testStmt(func(context.Context) error {
f := func(context.Context) error {
processing <- struct{}{} processing <- struct{}{}
<-block <-block
return nil return nil
}) }
for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ { for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ {
q := engine.newTestQuery(f1) q := engine.newTestQuery(f)
go q.Exec() go q.Exec()
select { select {
case <-processing: case <-processing:
@ -35,7 +35,7 @@ func TestQueryConcurreny(t *testing.T) {
} }
} }
q := engine.newTestQuery(f1) q := engine.newTestQuery(f)
go q.Exec() go q.Exec()
select { select {
@ -68,15 +68,11 @@ func TestQueryTimeout(t *testing.T) {
}) })
defer engine.Stop() defer engine.Stop()
f1 := testStmt(func(context.Context) error { query := engine.newTestQuery(func(ctx context.Context) error {
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
return nil return contextDone(ctx, "test statement execution")
}) })
// Timeouts are not exact but checked in designated places. For example between
// invoking test statements.
query := engine.newTestQuery(f1, f1)
res := query.Exec() res := query.Exec()
if res.Err == nil { if res.Err == nil {
t.Fatalf("expected timeout error but got none") t.Fatalf("expected timeout error but got none")
@ -90,37 +86,40 @@ func TestQueryCancel(t *testing.T) {
engine := NewEngine(nil, nil) engine := NewEngine(nil, nil)
defer engine.Stop() defer engine.Stop()
// As for timeouts, cancellation is only checked at designated points. We ensure // Cancel a running query before it completes.
// that we reach one of those points using the same method. block := make(chan struct{})
f1 := testStmt(func(context.Context) error { processing := make(chan struct{})
time.Sleep(2 * time.Millisecond)
return nil query1 := engine.newTestQuery(func(ctx context.Context) error {
processing <- struct{}{}
<-block
return contextDone(ctx, "test statement execution")
}) })
query1 := engine.newTestQuery(f1, f1)
query2 := engine.newTestQuery(f1, f1)
// Cancel query after starting it.
var wg sync.WaitGroup
var res *Result var res *Result
wg.Add(1)
go func() { go func() {
res = query1.Exec() res = query1.Exec()
wg.Done() processing <- struct{}{}
}() }()
time.Sleep(1 * time.Millisecond)
<-processing
query1.Cancel() query1.Cancel()
wg.Wait() block <- struct{}{}
<-processing
if res.Err == nil { if res.Err == nil {
t.Fatalf("expected cancellation error for query1 but got none") t.Fatalf("expected cancellation error for query1 but got none")
} }
if _, ok := res.Err.(ErrQueryCanceled); res.Err != nil && !ok { if ee := ErrQueryCanceled("test statement execution"); res.Err != ee {
t.Fatalf("expected cancellation error for query1 but got: %s", res.Err) t.Fatalf("expected error %q, got %q")
} }
// Canceling query before starting it must have no effect. // Canceling a query before starting it must have no effect.
query2 := engine.newTestQuery(func(ctx context.Context) error {
return contextDone(ctx, "test statement execution")
})
query2.Cancel() query2.Cancel()
res = query2.Exec() res = query2.Exec()
if res.Err != nil { if res.Err != nil {
@ -131,36 +130,52 @@ func TestQueryCancel(t *testing.T) {
func TestEngineShutdown(t *testing.T) { func TestEngineShutdown(t *testing.T) {
engine := NewEngine(nil, nil) engine := NewEngine(nil, nil)
handlerExecutions := 0 block := make(chan struct{})
processing := make(chan struct{})
// Shutdown engine on first handler execution. Should handler execution ever become // Shutdown engine on first handler execution. Should handler execution ever become
// concurrent this test has to be adjusted accordingly. // concurrent this test has to be adjusted accordingly.
f1 := testStmt(func(context.Context) error { f := func(ctx context.Context) error {
handlerExecutions++ processing <- struct{}{}
engine.Stop() <-block
time.Sleep(10 * time.Millisecond) return contextDone(ctx, "test statement execution")
return nil }
}) query1 := engine.newTestQuery(f)
query1 := engine.newTestQuery(f1, f1)
query2 := engine.newTestQuery(f1, f1)
// Stopping the engine must cancel the base context. While executing queries is // Stopping the engine must cancel the base context. While executing queries is
// still possible, their context is canceled from the beginning and execution should // still possible, their context is canceled from the beginning and execution should
// terminate immediately. // terminate immediately.
res := query1.Exec() var res *Result
go func() {
res = query1.Exec()
processing <- struct{}{}
}()
<-processing
engine.Stop()
block <- struct{}{}
<-processing
if res.Err == nil { if res.Err == nil {
t.Fatalf("expected error on shutdown during query but got none") t.Fatalf("expected error on shutdown during query but got none")
} }
if handlerExecutions != 1 { if ee := ErrQueryCanceled("test statement execution"); res.Err != ee {
t.Fatalf("expected only one handler to be executed before query cancellation but got %d executions", handlerExecutions) t.Fatalf("expected error %q, got %q", ee, res.Err)
} }
query2 := engine.newTestQuery(func(context.Context) error {
t.Fatalf("reached query execution unexpectedly")
return nil
})
// The second query is started after the engine shut down. It must
// be canceled immediately.
res2 := query2.Exec() res2 := query2.Exec()
if res2.Err == nil { if res2.Err == nil {
t.Fatalf("expected error on querying shutdown engine but got none") t.Fatalf("expected error on querying shutdown engine but got none")
} }
if handlerExecutions != 1 { if _, ok := res2.Err.(ErrQueryCanceled); !ok {
t.Fatalf("expected no handler execution for query after engine shutdown") t.Fatalf("expected cancelation error, got %q", res2.Err)
} }
} }