From fe301d7946498a416073cdbf06ca8611c93b8d6f Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 15 Jun 2015 12:49:11 +0200 Subject: [PATCH] promql: remove global flags --- promql/analyzer.go | 4 ++-- promql/engine.go | 39 +++++++++++++++++++++++++++------------ promql/engine_test.go | 21 +++++++++------------ promql/test.go | 2 +- rules/rules_test.go | 2 +- template/template_test.go | 2 +- 6 files changed, 41 insertions(+), 29 deletions(-) diff --git a/promql/analyzer.go b/promql/analyzer.go index b1695b7a42..efadaf4ebd 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -137,7 +137,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { if err = contextDone(ctx, env); err != nil { return nil, err } - err = p.PreloadRange(fp, start.Add(-rangeDuration), end, *stalenessDelta) + err = p.PreloadRange(fp, start.Add(-rangeDuration), end, StalenessDelta) if err != nil { return nil, err } @@ -146,7 +146,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) { if err = contextDone(ctx, env); err != nil { return nil, err } - err = p.PreloadRange(fp, start, end, *stalenessDelta) + err = p.PreloadRange(fp, start, end, StalenessDelta) if err != nil { return nil, err } diff --git a/promql/engine.go b/promql/engine.go index 9d8036f1a4..2a99154029 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -14,7 +14,6 @@ package promql import ( - "flag" "fmt" "math" "runtime" @@ -30,12 +29,6 @@ import ( "github.com/prometheus/prometheus/util/stats" ) -var ( - stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.") - defaultQueryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.") - maxConcurrentQueries = flag.Int("query.max-concurrency", 20, "Maximum number of queries executed concurrently.") -) - // SampleStream is a stream of Values belonging to an attached COWMetric. type SampleStream struct { Metric clientmodel.COWMetric `json:"metric"` @@ -249,19 +242,37 @@ type Engine struct { cancelQueries func() // The gate limiting the maximum number of concurrent and waiting queries. gate *queryGate + + options *EngineOptions } // NewEngine returns a new engine. -func NewEngine(storage local.Storage) *Engine { +func NewEngine(storage local.Storage, o *EngineOptions) *Engine { + if o == nil { + o = DefaultEngineOptions + } ctx, cancel := context.WithCancel(context.Background()) return &Engine{ storage: storage, baseCtx: ctx, cancelQueries: cancel, - gate: newQueryGate(*maxConcurrentQueries), + gate: newQueryGate(o.MaxConcurrentQueries), + options: o, } } +// EngineOptions contains configuration parameters for an Engine. +type EngineOptions struct { + MaxConcurrentQueries int + Timeout time.Duration +} + +// DefaultEngineOptions are the default engine options. +var DefaultEngineOptions = &EngineOptions{ + MaxConcurrentQueries: 20, + Timeout: 2 * time.Minute, +} + // Stop the engine and cancel all running queries. func (ng *Engine) Stop() { ng.cancelQueries() @@ -328,7 +339,7 @@ func (ng *Engine) newTestQuery(stmts ...Statement) Query { func (ng *Engine) exec(q *query) (Value, error) { const env = "query execution" - ctx, cancel := context.WithTimeout(q.ng.baseCtx, *defaultQueryTimeout) + ctx, cancel := context.WithTimeout(q.ng.baseCtx, ng.options.Timeout) q.cancel = cancel queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start() @@ -1107,6 +1118,10 @@ func shouldDropMetricName(op itemType) bool { } } +// StalenessDelta determines the time since the last sample after which a time +// series is considered stale. +var StalenessDelta = 5 * time.Minute + // chooseClosestSample chooses the closest sample of a list of samples // surrounding a given target time. If samples are found both before and after // the target time, the sample value is interpolated between these. Otherwise, @@ -1119,7 +1134,7 @@ func chooseClosestSample(samples metric.Values, timestamp clientmodel.Timestamp) // Samples before target time. if delta < 0 { // Ignore samples outside of staleness policy window. - if -delta > *stalenessDelta { + if -delta > StalenessDelta { continue } // Ignore samples that are farther away than what we've seen before. @@ -1133,7 +1148,7 @@ func chooseClosestSample(samples metric.Values, timestamp clientmodel.Timestamp) // Samples after target time. if delta >= 0 { // Ignore samples outside of staleness policy window. - if delta > *stalenessDelta { + if delta > StalenessDelta { continue } // Ignore samples that are farther away than samples we've seen before. diff --git a/promql/engine_test.go b/promql/engine_test.go index ab86afba13..6636d5c5aa 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -13,7 +13,7 @@ var noop = testStmt(func(context.Context) error { }) func TestQueryConcurreny(t *testing.T) { - engine := NewEngine(nil) + engine := NewEngine(nil, nil) defer engine.Stop() block := make(chan struct{}) @@ -24,7 +24,7 @@ func TestQueryConcurreny(t *testing.T) { return nil }) - for i := 0; i < *maxConcurrentQueries; i++ { + for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ { q := engine.newTestQuery(f1) go q.Exec() select { @@ -56,19 +56,16 @@ func TestQueryConcurreny(t *testing.T) { } // Terminate remaining queries. - for i := 0; i < *maxConcurrentQueries; i++ { + for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ { block <- struct{}{} } } func TestQueryTimeout(t *testing.T) { - *defaultQueryTimeout = 5 * time.Millisecond - defer func() { - // Restore default query timeout - *defaultQueryTimeout = 2 * time.Minute - }() - - engine := NewEngine(nil) + engine := NewEngine(nil, &EngineOptions{ + Timeout: 5 * time.Millisecond, + MaxConcurrentQueries: 20, + }) defer engine.Stop() f1 := testStmt(func(context.Context) error { @@ -90,7 +87,7 @@ func TestQueryTimeout(t *testing.T) { } func TestQueryCancel(t *testing.T) { - engine := NewEngine(nil) + engine := NewEngine(nil, nil) defer engine.Stop() // As for timeouts, cancellation is only checked at designated points. We ensure @@ -132,7 +129,7 @@ func TestQueryCancel(t *testing.T) { } func TestEngineShutdown(t *testing.T) { - engine := NewEngine(nil) + engine := NewEngine(nil, nil) handlerExecutions := 0 // Shutdown engine on first handler execution. Should handler execution ever become diff --git a/promql/test.go b/promql/test.go index a7604952d1..3e5e4fcc21 100644 --- a/promql/test.go +++ b/promql/test.go @@ -472,7 +472,7 @@ func (t *Test) clear() { t.storage, closer = local.NewTestStorage(t, 1) t.closeStorage = closer.Close - t.queryEngine = NewEngine(t.storage) + t.queryEngine = NewEngine(t.storage, nil) } func (t *Test) Close() { diff --git a/rules/rules_test.go b/rules/rules_test.go index 6869b1651c..c2e5c2f633 100644 --- a/rules/rules_test.go +++ b/rules/rules_test.go @@ -170,7 +170,7 @@ func TestAlertingRule(t *testing.T) { storeMatrix(storage, testMatrix) - engine := promql.NewEngine(storage) + engine := promql.NewEngine(storage, nil) defer engine.Stop() expr, err := promql.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) diff --git a/template/template_test.go b/template/template_test.go index 7c3c47311a..b7e1b54c05 100644 --- a/template/template_test.go +++ b/template/template_test.go @@ -185,7 +185,7 @@ func TestTemplateExpansion(t *testing.T) { }) storage.WaitForIndexing() - engine := promql.NewEngine(storage) + engine := promql.NewEngine(storage, nil) for i, s := range scenarios { var result string