promql: remove global flags

This commit is contained in:
Fabian Reinartz 2015-06-15 12:49:11 +02:00
parent de66e32a4d
commit fe301d7946
6 changed files with 41 additions and 29 deletions

View file

@ -137,7 +137,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
if err = contextDone(ctx, env); err != nil { if err = contextDone(ctx, env); err != nil {
return nil, err return nil, err
} }
err = p.PreloadRange(fp, start.Add(-rangeDuration), end, *stalenessDelta) err = p.PreloadRange(fp, start.Add(-rangeDuration), end, StalenessDelta)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -146,7 +146,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
if err = contextDone(ctx, env); err != nil { if err = contextDone(ctx, env); err != nil {
return nil, err return nil, err
} }
err = p.PreloadRange(fp, start, end, *stalenessDelta) err = p.PreloadRange(fp, start, end, StalenessDelta)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -14,7 +14,6 @@
package promql package promql
import ( import (
"flag"
"fmt" "fmt"
"math" "math"
"runtime" "runtime"
@ -30,12 +29,6 @@ import (
"github.com/prometheus/prometheus/util/stats" "github.com/prometheus/prometheus/util/stats"
) )
var (
stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.")
defaultQueryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.")
maxConcurrentQueries = flag.Int("query.max-concurrency", 20, "Maximum number of queries executed concurrently.")
)
// SampleStream is a stream of Values belonging to an attached COWMetric. // SampleStream is a stream of Values belonging to an attached COWMetric.
type SampleStream struct { type SampleStream struct {
Metric clientmodel.COWMetric `json:"metric"` Metric clientmodel.COWMetric `json:"metric"`
@ -249,19 +242,37 @@ type Engine struct {
cancelQueries func() cancelQueries func()
// The gate limiting the maximum number of concurrent and waiting queries. // The gate limiting the maximum number of concurrent and waiting queries.
gate *queryGate gate *queryGate
options *EngineOptions
} }
// NewEngine returns a new engine. // NewEngine returns a new engine.
func NewEngine(storage local.Storage) *Engine { func NewEngine(storage local.Storage, o *EngineOptions) *Engine {
if o == nil {
o = DefaultEngineOptions
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &Engine{ return &Engine{
storage: storage, storage: storage,
baseCtx: ctx, baseCtx: ctx,
cancelQueries: cancel, cancelQueries: cancel,
gate: newQueryGate(*maxConcurrentQueries), gate: newQueryGate(o.MaxConcurrentQueries),
options: o,
} }
} }
// EngineOptions contains configuration parameters for an Engine.
type EngineOptions struct {
MaxConcurrentQueries int
Timeout time.Duration
}
// DefaultEngineOptions are the default engine options.
var DefaultEngineOptions = &EngineOptions{
MaxConcurrentQueries: 20,
Timeout: 2 * time.Minute,
}
// Stop the engine and cancel all running queries. // Stop the engine and cancel all running queries.
func (ng *Engine) Stop() { func (ng *Engine) Stop() {
ng.cancelQueries() ng.cancelQueries()
@ -328,7 +339,7 @@ func (ng *Engine) newTestQuery(stmts ...Statement) Query {
func (ng *Engine) exec(q *query) (Value, error) { func (ng *Engine) exec(q *query) (Value, error) {
const env = "query execution" const env = "query execution"
ctx, cancel := context.WithTimeout(q.ng.baseCtx, *defaultQueryTimeout) ctx, cancel := context.WithTimeout(q.ng.baseCtx, ng.options.Timeout)
q.cancel = cancel q.cancel = cancel
queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start() queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start()
@ -1107,6 +1118,10 @@ func shouldDropMetricName(op itemType) bool {
} }
} }
// StalenessDelta determines the time since the last sample after which a time
// series is considered stale.
var StalenessDelta = 5 * time.Minute
// chooseClosestSample chooses the closest sample of a list of samples // chooseClosestSample chooses the closest sample of a list of samples
// surrounding a given target time. If samples are found both before and after // surrounding a given target time. If samples are found both before and after
// the target time, the sample value is interpolated between these. Otherwise, // the target time, the sample value is interpolated between these. Otherwise,
@ -1119,7 +1134,7 @@ func chooseClosestSample(samples metric.Values, timestamp clientmodel.Timestamp)
// Samples before target time. // Samples before target time.
if delta < 0 { if delta < 0 {
// Ignore samples outside of staleness policy window. // Ignore samples outside of staleness policy window.
if -delta > *stalenessDelta { if -delta > StalenessDelta {
continue continue
} }
// Ignore samples that are farther away than what we've seen before. // Ignore samples that are farther away than what we've seen before.
@ -1133,7 +1148,7 @@ func chooseClosestSample(samples metric.Values, timestamp clientmodel.Timestamp)
// Samples after target time. // Samples after target time.
if delta >= 0 { if delta >= 0 {
// Ignore samples outside of staleness policy window. // Ignore samples outside of staleness policy window.
if delta > *stalenessDelta { if delta > StalenessDelta {
continue continue
} }
// Ignore samples that are farther away than samples we've seen before. // Ignore samples that are farther away than samples we've seen before.

View file

@ -13,7 +13,7 @@ var noop = testStmt(func(context.Context) error {
}) })
func TestQueryConcurreny(t *testing.T) { func TestQueryConcurreny(t *testing.T) {
engine := NewEngine(nil) engine := NewEngine(nil, nil)
defer engine.Stop() defer engine.Stop()
block := make(chan struct{}) block := make(chan struct{})
@ -24,7 +24,7 @@ func TestQueryConcurreny(t *testing.T) {
return nil return nil
}) })
for i := 0; i < *maxConcurrentQueries; i++ { for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ {
q := engine.newTestQuery(f1) q := engine.newTestQuery(f1)
go q.Exec() go q.Exec()
select { select {
@ -56,19 +56,16 @@ func TestQueryConcurreny(t *testing.T) {
} }
// Terminate remaining queries. // Terminate remaining queries.
for i := 0; i < *maxConcurrentQueries; i++ { for i := 0; i < DefaultEngineOptions.MaxConcurrentQueries; i++ {
block <- struct{}{} block <- struct{}{}
} }
} }
func TestQueryTimeout(t *testing.T) { func TestQueryTimeout(t *testing.T) {
*defaultQueryTimeout = 5 * time.Millisecond engine := NewEngine(nil, &EngineOptions{
defer func() { Timeout: 5 * time.Millisecond,
// Restore default query timeout MaxConcurrentQueries: 20,
*defaultQueryTimeout = 2 * time.Minute })
}()
engine := NewEngine(nil)
defer engine.Stop() defer engine.Stop()
f1 := testStmt(func(context.Context) error { f1 := testStmt(func(context.Context) error {
@ -90,7 +87,7 @@ func TestQueryTimeout(t *testing.T) {
} }
func TestQueryCancel(t *testing.T) { func TestQueryCancel(t *testing.T) {
engine := NewEngine(nil) engine := NewEngine(nil, nil)
defer engine.Stop() defer engine.Stop()
// As for timeouts, cancellation is only checked at designated points. We ensure // As for timeouts, cancellation is only checked at designated points. We ensure
@ -132,7 +129,7 @@ func TestQueryCancel(t *testing.T) {
} }
func TestEngineShutdown(t *testing.T) { func TestEngineShutdown(t *testing.T) {
engine := NewEngine(nil) engine := NewEngine(nil, nil)
handlerExecutions := 0 handlerExecutions := 0
// Shutdown engine on first handler execution. Should handler execution ever become // Shutdown engine on first handler execution. Should handler execution ever become

View file

@ -472,7 +472,7 @@ func (t *Test) clear() {
t.storage, closer = local.NewTestStorage(t, 1) t.storage, closer = local.NewTestStorage(t, 1)
t.closeStorage = closer.Close t.closeStorage = closer.Close
t.queryEngine = NewEngine(t.storage) t.queryEngine = NewEngine(t.storage, nil)
} }
func (t *Test) Close() { func (t *Test) Close() {

View file

@ -170,7 +170,7 @@ func TestAlertingRule(t *testing.T) {
storeMatrix(storage, testMatrix) storeMatrix(storage, testMatrix)
engine := promql.NewEngine(storage) engine := promql.NewEngine(storage, nil)
defer engine.Stop() defer engine.Stop()
expr, err := promql.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`) expr, err := promql.ParseExpr(`http_requests{group="canary", job="app-server"} < 100`)

View file

@ -185,7 +185,7 @@ func TestTemplateExpansion(t *testing.T) {
}) })
storage.WaitForIndexing() storage.WaitForIndexing()
engine := promql.NewEngine(storage) engine := promql.NewEngine(storage, nil)
for i, s := range scenarios { for i, s := range scenarios {
var result string var result string