Limit maximum number of concurrent queries.

A high number of concurrent queries can slow each other down
so that none of them is reasonbly responsive. This commit limits
the number of queries being concurrently executed.
This commit is contained in:
Fabian Reinartz 2015-05-01 00:49:19 +02:00
parent d59d1cb2c1
commit 9ab1f6c690
3 changed files with 107 additions and 24 deletions

View file

@ -31,8 +31,9 @@ import (
) )
var ( var (
stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.") stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.")
defaultQueryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.") defaultQueryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.")
maxConcurrentQueries = flag.Int("query.max-concurrency", 20, "Maximum number of queries executed concurrently.")
) )
// SampleStream is a stream of Values belonging to an attached COWMetric. // SampleStream is a stream of Values belonging to an attached COWMetric.
@ -215,10 +216,7 @@ func (q *query) Cancel() {
// Exec implements the Query interface. // Exec implements the Query interface.
func (q *query) Exec() *Result { func (q *query) Exec() *Result {
ctx, cancel := context.WithTimeout(q.ng.baseCtx, *defaultQueryTimeout) res, err := q.ng.exec(q)
q.cancel = cancel
res, err := q.ng.exec(ctx, q)
return &Result{Err: err, Value: res} return &Result{Err: err, Value: res}
} }
@ -249,6 +247,8 @@ type Engine struct {
// The base context for all queries and its cancellation function. // The base context for all queries and its cancellation function.
baseCtx context.Context baseCtx context.Context
cancelQueries func() cancelQueries func()
// The gate limiting the maximum number of concurrent and waiting queries.
gate *queryGate
} }
// NewEngine returns a new engine. // NewEngine returns a new engine.
@ -258,6 +258,7 @@ func NewEngine(storage local.Storage) *Engine {
storage: storage, storage: storage,
baseCtx: ctx, baseCtx: ctx,
cancelQueries: cancel, cancelQueries: cancel,
gate: newQueryGate(*maxConcurrentQueries),
} }
} }
@ -316,9 +317,21 @@ func (ng *Engine) newTestQuery(stmts ...Statement) Query {
// //
// At this point per query only one EvalStmt is evaluated. Alert and record // At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine. // statements are not handled by the Engine.
func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) { func (ng *Engine) exec(q *query) (Value, error) {
const env = "query execution" const env = "query execution"
ctx, cancel := context.WithTimeout(q.ng.baseCtx, *defaultQueryTimeout)
q.cancel = cancel
queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start()
if err := ng.gate.Start(ctx); err != nil {
return nil, err
}
defer ng.gate.Done()
queueTimer.Stop()
// Cancel when execution is done or an error was raised. // Cancel when execution is done or an error was raised.
defer q.cancel() defer q.cancel()
@ -1125,3 +1138,35 @@ func interpolateSamples(first, second *metric.SamplePair, timestamp clientmodel.
Timestamp: timestamp, Timestamp: timestamp,
} }
} }
// A queryGate controls the maximum number of concurrently running and waiting queries.
type queryGate struct {
ch chan struct{}
}
// newQueryGate returns a query gate that limits the number of queries
// being concurrently executed.
func newQueryGate(length int) *queryGate {
return &queryGate{
ch: make(chan struct{}, length),
}
}
// Start blocks until the gate has a free spot or the context is done.
func (g *queryGate) Start(ctx context.Context) error {
select {
case <-ctx.Done():
return contextDone(ctx, "query queue")
case g.ch <- struct{}{}:
return nil
}
}
// Done releases a single spot in the gate.
func (g *queryGate) Done() {
select {
case <-g.ch:
default:
panic("engine.queryGate.Done: more operations done than started")
}
}

View file

@ -6,14 +6,61 @@ import (
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/prometheus/prometheus/storage/local"
) )
var noop = testStmt(func(context.Context) error { var noop = testStmt(func(context.Context) error {
return nil return nil
}) })
func TestQueryConcurreny(t *testing.T) {
engine := NewEngine(nil)
defer engine.Stop()
block := make(chan struct{})
processing := make(chan struct{})
f1 := testStmt(func(context.Context) error {
processing <- struct{}{}
<-block
return nil
})
for i := 0; i < *maxConcurrentQueries; i++ {
q := engine.newTestQuery(f1)
go q.Exec()
select {
case <-processing:
// Expected.
case <-time.After(5 * time.Millisecond):
t.Fatalf("Query within concurrency threshold not being executed")
}
}
q := engine.newTestQuery(f1)
go q.Exec()
select {
case <-processing:
t.Fatalf("Query above concurrency threhosld being executed")
case <-time.After(5 * time.Millisecond):
// Expected.
}
// Terminate a running query.
block <- struct{}{}
select {
case <-processing:
// Expected.
case <-time.After(5 * time.Millisecond):
t.Fatalf("Query within concurrency threshold not being executed")
}
// Terminate remaining queries.
for i := 0; i < *maxConcurrentQueries; i++ {
block <- struct{}{}
}
}
func TestQueryTimeout(t *testing.T) { func TestQueryTimeout(t *testing.T) {
*defaultQueryTimeout = 5 * time.Millisecond *defaultQueryTimeout = 5 * time.Millisecond
defer func() { defer func() {
@ -21,10 +68,7 @@ func TestQueryTimeout(t *testing.T) {
*defaultQueryTimeout = 2 * time.Minute *defaultQueryTimeout = 2 * time.Minute
}() }()
storage, closer := local.NewTestStorage(t, 1) engine := NewEngine(nil)
defer closer.Close()
engine := NewEngine(storage)
defer engine.Stop() defer engine.Stop()
f1 := testStmt(func(context.Context) error { f1 := testStmt(func(context.Context) error {
@ -46,10 +90,7 @@ func TestQueryTimeout(t *testing.T) {
} }
func TestQueryCancel(t *testing.T) { func TestQueryCancel(t *testing.T) {
storage, closer := local.NewTestStorage(t, 1) engine := NewEngine(nil)
defer closer.Close()
engine := NewEngine(storage)
defer engine.Stop() defer engine.Stop()
// As for timeouts, cancellation is only checked at designated points. We ensure // As for timeouts, cancellation is only checked at designated points. We ensure
@ -91,10 +132,7 @@ func TestQueryCancel(t *testing.T) {
} }
func TestEngineShutdown(t *testing.T) { func TestEngineShutdown(t *testing.T) {
storage, closer := local.NewTestStorage(t, 1) engine := NewEngine(nil)
defer closer.Close()
engine := NewEngine(storage)
handlerExecutions := 0 handlerExecutions := 0
// Shutdown engine on first handler execution. Should handler execution ever become // Shutdown engine on first handler execution. Should handler execution ever become

View file

@ -31,7 +31,7 @@ const (
GetValueAtTimeTime GetValueAtTimeTime
GetBoundaryValuesTime GetBoundaryValuesTime
GetRangeValuesTime GetRangeValuesTime
ViewQueueTime ExecQueueTime
ViewDiskPreparationTime ViewDiskPreparationTime
ViewDataExtractionTime ViewDataExtractionTime
ViewDiskExtractionTime ViewDiskExtractionTime
@ -64,8 +64,8 @@ func (s QueryTiming) String() string {
return "GetBoundaryValues() time" return "GetBoundaryValues() time"
case GetRangeValuesTime: case GetRangeValuesTime:
return "GetRangeValues() time" return "GetRangeValues() time"
case ViewQueueTime: case ExecQueueTime:
return "View queue wait time" return "Exec queue wait time"
case ViewDiskPreparationTime: case ViewDiskPreparationTime:
return "View building disk preparation time" return "View building disk preparation time"
case ViewDataExtractionTime: case ViewDataExtractionTime: