From c18730836689652d38999e55acaa332f04f09d6d Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Fri, 16 Sep 2016 00:58:06 +0200 Subject: [PATCH] storage: Contextify storage interfaces. This is based on https://github.com/prometheus/prometheus/pull/1997. This adds contexts to the relevant Storage methods and already passes PromQL's new per-query context into the storage's query methods. The immediate motivation supporting multi-tenancy in Frankenstein, but this could also be used by Prometheus's normal local storage to support cancellations and timeouts at some point. --- cmd/prometheus/main.go | 30 +++++++++++++++---------- promql/engine.go | 7 ++++-- promql/engine_test.go | 16 +++++++------- promql/test.go | 24 ++++++++++---------- rules/alerting.go | 6 ++--- rules/manager.go | 9 ++++---- rules/manager_test.go | 2 +- rules/recording.go | 4 ++-- rules/recording_test.go | 6 ++--- storage/local/interface.go | 13 ++++++----- storage/local/noop_storage.go | 13 ++++++----- storage/local/storage.go | 14 +++++++----- storage/local/storage_test.go | 12 +++++++--- template/template.go | 4 ++-- template/template_test.go | 2 +- web/api/v1/api.go | 16 +++++++------- web/api/v1/api_test.go | 2 +- web/federate.go | 2 +- web/web.go | 41 +++++++++++++++++------------------ 19 files changed, 121 insertions(+), 102 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 7d7bdc77dc..8fbe858845 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -104,26 +104,27 @@ func Main() int { } var ( - notifier = notifier.New(&cfg.notifier) - targetManager = retrieval.NewTargetManager(sampleAppender) - queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine) - queryCtx, cancelQueries = context.WithCancel(context.Background()) + notifier = notifier.New(&cfg.notifier) + targetManager = retrieval.NewTargetManager(sampleAppender) + queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine) + ctx, cancelCtx = context.WithCancel(context.Background()) ) ruleManager := rules.NewManager(&rules.ManagerOptions{ SampleAppender: sampleAppender, Notifier: notifier, QueryEngine: queryEngine, - QueryCtx: queryCtx, + Context: ctx, ExternalURL: cfg.web.ExternalURL, }) - flags := map[string]string{} - cfg.fs.VisitAll(func(f *flag.Flag) { - flags[f.Name] = f.Value.String() - }) + cfg.web.Context = ctx + cfg.web.Storage = localStorage + cfg.web.QueryEngine = queryEngine + cfg.web.TargetManager = targetManager + cfg.web.RuleManager = ruleManager - version := &web.PrometheusVersion{ + cfg.web.Version = &web.PrometheusVersion{ Version: version.Version, Revision: version.Revision, Branch: version.Branch, @@ -132,7 +133,12 @@ func Main() int { GoVersion: version.GoVersion, } - webHandler := web.New(localStorage, queryEngine, queryCtx, targetManager, ruleManager, version, flags, &cfg.web) + cfg.web.Flags = map[string]string{} + cfg.fs.VisitAll(func(f *flag.Flag) { + cfg.web.Flags[f.Name] = f.Value.String() + }) + + webHandler := web.New(&cfg.web) reloadables = append(reloadables, targetManager, ruleManager, webHandler, notifier) @@ -205,7 +211,7 @@ func Main() int { // Shutting down the query engine before the rule manager will cause pending queries // to be canceled and ensures a quick shutdown of the rule manager. - defer cancelQueries() + defer cancelCtx() go webHandler.Run() diff --git a/promql/engine.go b/promql/engine.go index 184d41b3de..64265060a1 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -352,7 +352,7 @@ func (ng *Engine) exec(ctx context.Context, q *query) (model.Value, error) { // execEvalStmt evaluates the expression of an evaluation statement for the given time range. func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (model.Value, error) { prepareTimer := query.stats.GetTimer(stats.QueryPreparationTime).Start() - err := ng.populateIterators(s) + err := ng.populateIterators(ctx, s) prepareTimer.Stop() if err != nil { return nil, err @@ -463,19 +463,21 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( return resMatrix, nil } -func (ng *Engine) populateIterators(s *EvalStmt) error { +func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) error { var queryErr error Inspect(s.Expr, func(node Node) bool { switch n := node.(type) { case *VectorSelector: if s.Start.Equal(s.End) { n.iterators, queryErr = ng.querier.QueryInstant( + ctx, s.Start.Add(-n.Offset), StalenessDelta, n.LabelMatchers..., ) } else { n.iterators, queryErr = ng.querier.QueryRange( + ctx, s.Start.Add(-n.Offset-StalenessDelta), s.End.Add(-n.Offset), n.LabelMatchers..., @@ -486,6 +488,7 @@ func (ng *Engine) populateIterators(s *EvalStmt) error { } case *MatrixSelector: n.iterators, queryErr = ng.querier.QueryRange( + ctx, s.Start.Add(-n.Offset-n.Range), s.End.Add(-n.Offset), n.LabelMatchers..., diff --git a/promql/engine_test.go b/promql/engine_test.go index 6aa63d2847..134f417f34 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -23,8 +23,8 @@ import ( func TestQueryConcurrency(t *testing.T) { engine := NewEngine(nil, nil) - ctx, cancelQueries := context.WithCancel(context.Background()) - defer cancelQueries() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() block := make(chan struct{}) processing := make(chan struct{}) @@ -77,8 +77,8 @@ func TestQueryTimeout(t *testing.T) { Timeout: 5 * time.Millisecond, MaxConcurrentQueries: 20, }) - ctx, cancelQueries := context.WithCancel(context.Background()) - defer cancelQueries() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() query := engine.newTestQuery(func(ctx context.Context) error { time.Sleep(50 * time.Millisecond) @@ -96,8 +96,8 @@ func TestQueryTimeout(t *testing.T) { func TestQueryCancel(t *testing.T) { engine := NewEngine(nil, nil) - ctx, cancelQueries := context.WithCancel(context.Background()) - defer cancelQueries() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() // Cancel a running query before it completes. block := make(chan struct{}) @@ -142,7 +142,7 @@ func TestQueryCancel(t *testing.T) { func TestEngineShutdown(t *testing.T) { engine := NewEngine(nil, nil) - ctx, cancelQueries := context.WithCancel(context.Background()) + ctx, cancelCtx := context.WithCancel(context.Background()) block := make(chan struct{}) processing := make(chan struct{}) @@ -167,7 +167,7 @@ func TestEngineShutdown(t *testing.T) { }() <-processing - cancelQueries() + cancelCtx() block <- struct{}{} <-processing diff --git a/promql/test.go b/promql/test.go index d7866239ba..8b10d450f0 100644 --- a/promql/test.go +++ b/promql/test.go @@ -50,11 +50,11 @@ type Test struct { cmds []testCommand - storage local.Storage - closeStorage func() - queryEngine *Engine - queryCtx context.Context - cancelQueries context.CancelFunc + storage local.Storage + closeStorage func() + queryEngine *Engine + context context.Context + cancelCtx context.CancelFunc } // NewTest returns an initialized empty Test. @@ -82,9 +82,9 @@ func (t *Test) QueryEngine() *Engine { return t.queryEngine } -// Context returns the test's query context. +// Context returns the test's context. func (t *Test) Context() context.Context { - return t.queryCtx + return t.context } // Storage returns the test's storage. @@ -471,7 +471,7 @@ func (t *Test) exec(tc testCommand) error { case *evalCmd: q := t.queryEngine.newQuery(cmd.expr, cmd.start, cmd.end, cmd.interval) - res := q.Exec(t.queryCtx) + res := q.Exec(t.context) if res.Err != nil { if cmd.fail { return nil @@ -498,8 +498,8 @@ func (t *Test) clear() { if t.closeStorage != nil { t.closeStorage() } - if t.cancelQueries != nil { - t.cancelQueries() + if t.cancelCtx != nil { + t.cancelCtx() } var closer testutil.Closer @@ -507,12 +507,12 @@ func (t *Test) clear() { t.closeStorage = closer.Close t.queryEngine = NewEngine(t.storage, nil) - t.queryCtx, t.cancelQueries = context.WithCancel(context.Background()) + t.context, t.cancelCtx = context.WithCancel(context.Background()) } // Close closes resources associated with the Test. func (t *Test) Close() { - t.cancelQueries() + t.cancelCtx() t.closeStorage() } diff --git a/rules/alerting.go b/rules/alerting.go index edd09f5f6b..ae1535c34a 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -148,12 +148,12 @@ const resolvedRetention = 15 * time.Minute // eval evaluates the rule expression and then creates pending alerts and fires // or removes previously pending alerts accordingly. -func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, queryCtx context.Context, externalURLPath string) (model.Vector, error) { +func (r *AlertingRule) eval(ctx context.Context, ts model.Time, engine *promql.Engine, externalURLPath string) (model.Vector, error) { query, err := engine.NewInstantQuery(r.vector.String(), ts) if err != nil { return nil, err } - res, err := query.Exec(queryCtx).Vector() + res, err := query.Exec(ctx).Vector() if err != nil { return nil, err } @@ -185,12 +185,12 @@ func (r *AlertingRule) eval(ts model.Time, engine *promql.Engine, queryCtx conte expand := func(text model.LabelValue) model.LabelValue { tmpl := template.NewTemplateExpander( + ctx, defs+string(text), "__alert_"+r.Name(), tmplData, ts, engine, - queryCtx, externalURLPath, ) result, err := tmpl.Expand() diff --git a/rules/manager.go b/rules/manager.go index dac4f4b0d1..397055ceb2 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -21,13 +21,12 @@ import ( "sync" "time" - "golang.org/x/net/context" - html_template "html/template" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notifier" @@ -107,7 +106,7 @@ const ( type Rule interface { Name() string // eval evaluates the rule, including any associated recording or alerting actions. - eval(model.Time, *promql.Engine, context.Context, string) (model.Vector, error) + eval(context.Context, model.Time, *promql.Engine, string) (model.Vector, error) // String returns a human-readable string representation of the rule. String() string // HTMLSnippet returns a human-readable string representation of the rule, @@ -258,7 +257,7 @@ func (g *Group) eval() { evalTotal.WithLabelValues(rtyp).Inc() - vector, err := rule.eval(now, g.opts.QueryEngine, g.opts.QueryCtx, g.opts.ExternalURL.Path) + vector, err := rule.eval(g.opts.Context, now, g.opts.QueryEngine, g.opts.ExternalURL.Path) if err != nil { // Canceled queries are intentional termination of queries. This normally // happens on shutdown and thus we skip logging of any errors here. @@ -343,7 +342,7 @@ type Manager struct { type ManagerOptions struct { ExternalURL *url.URL QueryEngine *promql.Engine - QueryCtx context.Context + Context context.Context Notifier *notifier.Notifier SampleAppender storage.SampleAppender } diff --git a/rules/manager_test.go b/rules/manager_test.go index 6f8a1c4fb7..8eb31175e4 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -105,7 +105,7 @@ func TestAlertingRule(t *testing.T) { for i, test := range tests { evalTime := model.Time(0).Add(test.time) - res, err := rule.eval(evalTime, suite.QueryEngine(), suite.Context(), "") + res, err := rule.eval(suite.Context(), evalTime, suite.QueryEngine(), "") if err != nil { t.Fatalf("Error during alerting rule evaluation: %s", err) } diff --git a/rules/recording.go b/rules/recording.go index ffe39c941f..14cd26a2cd 100644 --- a/rules/recording.go +++ b/rules/recording.go @@ -46,14 +46,14 @@ func (rule RecordingRule) Name() string { } // eval evaluates the rule and then overrides the metric names and labels accordingly. -func (rule RecordingRule) eval(timestamp model.Time, engine *promql.Engine, queryCtx context.Context, _ string) (model.Vector, error) { +func (rule RecordingRule) eval(ctx context.Context, timestamp model.Time, engine *promql.Engine, _ string) (model.Vector, error) { query, err := engine.NewInstantQuery(rule.vector.String(), timestamp) if err != nil { return nil, err } var ( - result = query.Exec(queryCtx) + result = query.Exec(ctx) vector model.Vector ) if result.Err != nil { diff --git a/rules/recording_test.go b/rules/recording_test.go index 6b58f4eaf2..3ad40f915c 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -28,8 +28,8 @@ func TestRuleEval(t *testing.T) { storage, closer := local.NewTestStorage(t, 2) defer closer.Close() engine := promql.NewEngine(storage, nil) - queryCtx, cancelQueries := context.WithCancel(context.Background()) - defer cancelQueries() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() now := model.Now() @@ -63,7 +63,7 @@ func TestRuleEval(t *testing.T) { for _, test := range suite { rule := NewRecordingRule(test.name, test.expr, test.labels) - result, err := rule.eval(now, engine, queryCtx, "") + result, err := rule.eval(ctx, now, engine, "") if err != nil { t.Fatalf("Error evaluating %s", test.name) } diff --git a/storage/local/interface.go b/storage/local/interface.go index 5bbefa309e..4b88a7f6be 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -17,6 +17,7 @@ import ( "time" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/metric" @@ -40,7 +41,7 @@ type Storage interface { // Drop all time series associated with the given label matchers. Returns // the number series that were dropped. - DropMetricsForLabelMatchers(...*metric.LabelMatcher) (int, error) + DropMetricsForLabelMatchers(context.Context, ...*metric.LabelMatcher) (int, error) // Run the various maintenance loops in goroutines. Returns when the // storage is ready to use. Keeps everything running in the background // until Stop is called. @@ -59,10 +60,10 @@ type Querier interface { // QueryRange returns a list of series iterators for the selected // time range and label matchers. The iterators need to be closed // after usage. - QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) + QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) // QueryInstant returns a list of series iterators for the selected // instant and label matchers. The iterators need to be closed after usage. - QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) + QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) // MetricsForLabelMatchers returns the metrics from storage that satisfy // the given sets of label matchers. Each set of matchers must contain at // least one label matcher that does not match the empty string. Otherwise, @@ -72,14 +73,14 @@ type Querier interface { // storage to optimize the search. The storage MAY exclude metrics that // have no samples in the specified interval from the returned map. In // doubt, specify model.Earliest for from and model.Latest for through. - MetricsForLabelMatchers(from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) + MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) // LastSampleForLabelMatchers returns the last samples that have been // ingested for the time series matching the given set of label matchers. // The label matching behavior is the same as in MetricsForLabelMatchers. // All returned samples are between the specified cutoff time and now. - LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) + LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) // Get all of the label values that are associated with a given label name. - LabelValuesForLabelName(model.LabelName) (model.LabelValues, error) + LabelValuesForLabelName(context.Context, model.LabelName) (model.LabelValues, error) } // SeriesIterator enables efficient access of sample values in a series. Its diff --git a/storage/local/noop_storage.go b/storage/local/noop_storage.go index 4832f8f459..77a48bbb8c 100644 --- a/storage/local/noop_storage.go +++ b/storage/local/noop_storage.go @@ -17,6 +17,8 @@ import ( "time" "github.com/prometheus/common/model" + "golang.org/x/net/context" + "github.com/prometheus/prometheus/storage/metric" ) @@ -39,22 +41,23 @@ func (s *NoopStorage) WaitForIndexing() { } // LastSampleForLabelMatchers implements Storage. -func (s *NoopStorage) LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { +func (s *NoopStorage) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { return nil, nil } // QueryRange implements Storage. -func (s *NoopStorage) QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { +func (s *NoopStorage) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { return nil, nil } // QueryInstant implements Storage. -func (s *NoopStorage) QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { +func (s *NoopStorage) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { return nil, nil } // MetricsForLabelMatchers implements Storage. func (s *NoopStorage) MetricsForLabelMatchers( + ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers, ) ([]metric.Metric, error) { @@ -62,12 +65,12 @@ func (s *NoopStorage) MetricsForLabelMatchers( } // LabelValuesForLabelName implements Storage. -func (s *NoopStorage) LabelValuesForLabelName(labelName model.LabelName) (model.LabelValues, error) { +func (s *NoopStorage) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) { return nil, nil } // DropMetricsForLabelMatchers implements Storage. -func (s *NoopStorage) DropMetricsForLabelMatchers(matchers ...*metric.LabelMatcher) (int, error) { +func (s *NoopStorage) DropMetricsForLabelMatchers(ctx context.Context, matchers ...*metric.LabelMatcher) (int, error) { return 0, nil } diff --git a/storage/local/storage.go b/storage/local/storage.go index ef4ff89ede..a00da8b8ae 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/storage/metric" ) @@ -413,7 +414,7 @@ func (s *MemorySeriesStorage) WaitForIndexing() { } // LastSampleForLabelMatchers implements Storage. -func (s *MemorySeriesStorage) LastSampleForLabelMatchers(cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { +func (s *MemorySeriesStorage) LastSampleForLabelMatchers(_ context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) { fps := map[model.Fingerprint]struct{}{} for _, matchers := range matcherSets { fpToMetric, err := s.metricsForLabelMatchers(cutoff, model.Latest, matchers...) @@ -483,7 +484,7 @@ func (bit *boundedIterator) Close() { } // QueryRange implements Storage. -func (s *MemorySeriesStorage) QueryRange(from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { +func (s *MemorySeriesStorage) QueryRange(_ context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { fpToMetric, err := s.metricsForLabelMatchers(from, through, matchers...) if err != nil { return nil, err @@ -497,7 +498,7 @@ func (s *MemorySeriesStorage) QueryRange(from, through model.Time, matchers ...* } // QueryInstant implements Storage. -func (s *MemorySeriesStorage) QueryInstant(ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { +func (s *MemorySeriesStorage) QueryInstant(_ context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]SeriesIterator, error) { from := ts.Add(-stalenessDelta) through := ts @@ -540,6 +541,7 @@ func (s *MemorySeriesStorage) fingerprintsForLabelPair( // MetricsForLabelMatchers implements Storage. func (s *MemorySeriesStorage) MetricsForLabelMatchers( + _ context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers, ) ([]metric.Metric, error) { @@ -603,7 +605,7 @@ func (s *MemorySeriesStorage) metricsForLabelMatchers( break } - lvs, err := s.LabelValuesForLabelName(m.Name) + lvs, err := s.LabelValuesForLabelName(context.TODO(), m.Name) if err != nil { return nil, err } @@ -693,12 +695,12 @@ func (s *MemorySeriesStorage) metricForRange( } // LabelValuesForLabelName implements Storage. -func (s *MemorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) (model.LabelValues, error) { +func (s *MemorySeriesStorage) LabelValuesForLabelName(_ context.Context, labelName model.LabelName) (model.LabelValues, error) { return s.persistence.labelValuesForLabelName(labelName) } // DropMetricsForLabelMatchers implements Storage. -func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(matchers ...*metric.LabelMatcher) (int, error) { +func (s *MemorySeriesStorage) DropMetricsForLabelMatchers(_ context.Context, matchers ...*metric.LabelMatcher) (int, error) { fpToMetric, err := s.metricsForLabelMatchers(model.Earliest, model.Latest, matchers...) if err != nil { return 0, err diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index d04c5ac92e..cd9e866f50 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "golang.org/x/net/context" "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/util/testutil" @@ -194,6 +195,7 @@ func TestMatches(t *testing.T) { for _, mt := range matcherTests { metrics, err := storage.MetricsForLabelMatchers( + context.Background(), model.Earliest, model.Latest, mt.matchers, ) @@ -218,6 +220,7 @@ func TestMatches(t *testing.T) { } // Smoketest for from/through. metrics, err = storage.MetricsForLabelMatchers( + context.Background(), model.Earliest, -10000, mt.matchers, ) @@ -228,6 +231,7 @@ func TestMatches(t *testing.T) { t.Error("expected no matches with 'through' older than any sample") } metrics, err = storage.MetricsForLabelMatchers( + context.Background(), 10000, model.Latest, mt.matchers, ) @@ -243,6 +247,7 @@ func TestMatches(t *testing.T) { through model.Time = 75 ) metrics, err = storage.MetricsForLabelMatchers( + context.Background(), from, through, mt.matchers, ) @@ -451,6 +456,7 @@ func BenchmarkLabelMatching(b *testing.B) { benchLabelMatchingRes = []metric.Metric{} for _, mt := range matcherTests { benchLabelMatchingRes, err = s.MetricsForLabelMatchers( + context.Background(), model.Earliest, model.Latest, mt, ) @@ -493,7 +499,7 @@ func TestRetentionCutoff(t *testing.T) { if err != nil { t.Fatalf("error creating label matcher: %s", err) } - its, err := s.QueryRange(insertStart, now, lm) + its, err := s.QueryRange(context.Background(), insertStart, now, lm) if err != nil { t.Fatal(err) } @@ -581,7 +587,7 @@ func TestDropMetrics(t *testing.T) { fpList := model.Fingerprints{m1.FastFingerprint(), m2.FastFingerprint(), fpToBeArchived} - n, err := s.DropMetricsForLabelMatchers(lm1) + n, err := s.DropMetricsForLabelMatchers(context.Background(), lm1) if err != nil { t.Fatal(err) } @@ -614,7 +620,7 @@ func TestDropMetrics(t *testing.T) { t.Errorf("chunk file does not exist for fp=%v", fpList[2]) } - n, err = s.DropMetricsForLabelMatchers(lmAll) + n, err = s.DropMetricsForLabelMatchers(context.Background(), lmAll) if err != nil { t.Fatal(err) } diff --git a/template/template.go b/template/template.go index d07c446e3f..f468b9b6ed 100644 --- a/template/template.go +++ b/template/template.go @@ -111,14 +111,14 @@ type Expander struct { } // NewTemplateExpander returns a template expander ready to use. -func NewTemplateExpander(text string, name string, data interface{}, timestamp model.Time, queryEngine *promql.Engine, queryCtx context.Context, pathPrefix string) *Expander { +func NewTemplateExpander(ctx context.Context, text string, name string, data interface{}, timestamp model.Time, queryEngine *promql.Engine, pathPrefix string) *Expander { return &Expander{ text: text, name: name, data: data, funcMap: text_template.FuncMap{ "query": func(q string) (queryResult, error) { - return query(queryCtx, q, timestamp, queryEngine) + return query(ctx, q, timestamp, queryEngine) }, "first": func(v queryResult) (*sample, error) { if len(v) > 0 { diff --git a/template/template_test.go b/template/template_test.go index 9d1c787a47..3f919ab1b6 100644 --- a/template/template_test.go +++ b/template/template_test.go @@ -221,7 +221,7 @@ func TestTemplateExpansion(t *testing.T) { for i, s := range scenarios { var result string var err error - expander := NewTemplateExpander(s.text, "test", s.input, time, engine, context.Background(), "") + expander := NewTemplateExpander(context.Background(), s.text, "test", s.input, time, engine, "") if s.html { result, err = expander.ExpandHTML(nil) } else { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index a79f9f9c7d..af9d86e7a3 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -85,19 +85,19 @@ type apiFunc func(r *http.Request) (interface{}, *apiError) // API can register a set of endpoints in a router and handle // them using the provided storage and query engine. type API struct { + Context context.Context Storage local.Storage QueryEngine *promql.Engine - QueryCtx context.Context context func(r *http.Request) context.Context now func() model.Time } // NewAPI returns an initialized API type. -func NewAPI(qe *promql.Engine, qc context.Context, st local.Storage) *API { +func NewAPI(ctx context.Context, qe *promql.Engine, st local.Storage) *API { return &API{ + Context: ctx, QueryEngine: qe, - QueryCtx: qc, Storage: st, context: route.Context, now: model.Now, @@ -159,7 +159,7 @@ func (api *API) query(r *http.Request) (interface{}, *apiError) { return nil, &apiError{errorBadData, err} } - res := qry.Exec(api.QueryCtx) + res := qry.Exec(api.Context) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: @@ -206,7 +206,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, *apiError) { return nil, &apiError{errorBadData, err} } - res := qry.Exec(api.QueryCtx) + res := qry.Exec(api.Context) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: @@ -228,7 +228,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, *apiError) { if !model.LabelNameRE.MatchString(name) { return nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)} } - vals, err := api.Storage.LabelValuesForLabelName(model.LabelName(name)) + vals, err := api.Storage.LabelValuesForLabelName(api.Context, model.LabelName(name)) if err != nil { return nil, &apiError{errorExec, err} } @@ -274,7 +274,7 @@ func (api *API) series(r *http.Request) (interface{}, *apiError) { matcherSets = append(matcherSets, matchers) } - res, err := api.Storage.MetricsForLabelMatchers(start, end, matcherSets...) + res, err := api.Storage.MetricsForLabelMatchers(api.Context, start, end, matcherSets...) if err != nil { return nil, &apiError{errorExec, err} } @@ -298,7 +298,7 @@ func (api *API) dropSeries(r *http.Request) (interface{}, *apiError) { if err != nil { return nil, &apiError{errorBadData, err} } - n, err := api.Storage.DropMetricsForLabelMatchers(matchers...) + n, err := api.Storage.DropMetricsForLabelMatchers(context.TODO(), matchers...) if err != nil { return nil, &apiError{errorExec, err} } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index f0e66eab09..5d3bec8ff5 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -52,7 +52,7 @@ func TestEndpoints(t *testing.T) { api := &API{ Storage: suite.Storage(), QueryEngine: suite.QueryEngine(), - QueryCtx: suite.Context(), + Context: suite.Context(), now: func() model.Time { return now }, } diff --git a/web/federate.go b/web/federate.go index 9004db880b..9a7d151933 100644 --- a/web/federate.go +++ b/web/federate.go @@ -50,7 +50,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { ) w.Header().Set("Content-Type", string(format)) - vector, err := h.storage.LastSampleForLabelMatchers(minTimestamp, matcherSets...) + vector, err := h.storage.LastSampleForLabelMatchers(h.context, minTimestamp, matcherSets...) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/web/web.go b/web/web.go index 0cce11cc48..e7eef450de 100644 --- a/web/web.go +++ b/web/web.go @@ -56,7 +56,7 @@ type Handler struct { targetManager *retrieval.TargetManager ruleManager *rules.Manager queryEngine *promql.Engine - queryCtx context.Context + context context.Context storage local.Storage apiV1 *api_v1.API @@ -99,6 +99,14 @@ type PrometheusVersion struct { // Options for the web Handler. type Options struct { + Context context.Context + Storage local.Storage + QueryEngine *promql.Engine + TargetManager *retrieval.TargetManager + RuleManager *rules.Manager + Version *PrometheusVersion + Flags map[string]string + ListenAddress string ExternalURL *url.URL RoutePrefix string @@ -111,16 +119,7 @@ type Options struct { } // New initializes a new web Handler. -func New( - st local.Storage, - qe *promql.Engine, - qc context.Context, - tm *retrieval.TargetManager, - rm *rules.Manager, - version *PrometheusVersion, - flags map[string]string, - o *Options, -) *Handler { +func New(o *Options) *Handler { router := route.New() h := &Handler{ @@ -129,17 +128,17 @@ func New( quitCh: make(chan struct{}), reloadCh: make(chan chan error), options: o, - versionInfo: version, + versionInfo: o.Version, birth: time.Now(), - flagsMap: flags, + flagsMap: o.Flags, - targetManager: tm, - ruleManager: rm, - queryEngine: qe, - queryCtx: qc, - storage: st, + context: o.Context, + targetManager: o.TargetManager, + ruleManager: o.RuleManager, + queryEngine: o.QueryEngine, + storage: o.Storage, - apiV1: api_v1.NewAPI(qe, qc, st), + apiV1: api_v1.NewAPI(o.Context, o.QueryEngine, o.Storage), now: model.Now, } @@ -297,7 +296,7 @@ func (h *Handler) consoles(w http.ResponseWriter, r *http.Request) { Path: strings.TrimLeft(name, "/"), } - tmpl := template.NewTemplateExpander(string(text), "__console_"+name, data, h.now(), h.queryEngine, h.queryCtx, h.options.ExternalURL.Path) + tmpl := template.NewTemplateExpander(h.context, string(text), "__console_"+name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path) filenames, err := filepath.Glob(h.options.ConsoleLibrariesPath + "/*.lib") if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -470,7 +469,7 @@ func (h *Handler) executeTemplate(w http.ResponseWriter, name string, data inter http.Error(w, err.Error(), http.StatusInternalServerError) } - tmpl := template.NewTemplateExpander(text, name, data, h.now(), h.queryEngine, h.queryCtx, h.options.ExternalURL.Path) + tmpl := template.NewTemplateExpander(h.context, text, name, data, h.now(), h.queryEngine, h.options.ExternalURL.Path) tmpl.Funcs(tmplFuncs(h.consolesPath(), h.options)) result, err := tmpl.ExpandHTML(nil)