From 8fda83ea128ffc99af3a26d4771f2d480e4bfa54 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Tue, 21 Mar 2017 00:46:31 +0100 Subject: [PATCH] Make rules only read local data --- cmd/prometheus/main.go | 2 +- storage/fanin/fanin.go | 23 ++++++-- storage/fanin/fanin_test.go | 114 ++++++++++++++++++++++++++++++++++-- 3 files changed, 128 insertions(+), 11 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 4c2fbaa73..68c9a4ce1 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -114,7 +114,7 @@ func Main() int { SampleAppender: sampleAppender, Notifier: notifier, QueryEngine: queryEngine, - Context: ctx, + Context: fanin.WithLocalOnly(ctx), ExternalURL: cfg.web.ExternalURL, }) diff --git a/storage/fanin/fanin.go b/storage/fanin/fanin.go index ec97fb727..73c849b15 100644 --- a/storage/fanin/fanin.go +++ b/storage/fanin/fanin.go @@ -26,6 +26,20 @@ import ( "github.com/prometheus/prometheus/storage/remote" ) +type contextKey string + +const ctxLocalOnly contextKey = "local-only" + +// WithLocalOnly decorates a context to indicate that a query should +// only be executed against local data. +func WithLocalOnly(ctx context.Context) context.Context { + return context.WithValue(ctx, ctxLocalOnly, struct{}{}) +} + +func localOnly(ctx context.Context) bool { + return ctx.Value(ctxLocalOnly) == struct{}{} +} + // Queryable is a local.Queryable that reads from local and remote storage. type Queryable struct { Local promql.Queryable @@ -52,25 +66,24 @@ type querier struct { } func (q querier) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) { - return q.query(func(q local.Querier) ([]local.SeriesIterator, error) { + return q.query(ctx, func(q local.Querier) ([]local.SeriesIterator, error) { return q.QueryRange(ctx, from, through, matchers...) }) } func (q querier) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) { - return q.query(func(q local.Querier) ([]local.SeriesIterator, error) { + return q.query(ctx, func(q local.Querier) ([]local.SeriesIterator, error) { return q.QueryInstant(ctx, ts, stalenessDelta, matchers...) }) } -func (q querier) query(qFn func(q local.Querier) ([]local.SeriesIterator, error)) ([]local.SeriesIterator, error) { +func (q querier) query(ctx context.Context, qFn func(q local.Querier) ([]local.SeriesIterator, error)) ([]local.SeriesIterator, error) { localIts, err := qFn(q.local) if err != nil { return nil, err } - if len(q.remotes) == 0 { - // Skip merge logic if there are no remote queriers. + if len(q.remotes) == 0 || localOnly(ctx) { return localIts, nil } diff --git a/storage/fanin/fanin_test.go b/storage/fanin/fanin_test.go index ddb83031c..028c7300f 100644 --- a/storage/fanin/fanin_test.go +++ b/storage/fanin/fanin_test.go @@ -111,9 +111,10 @@ func (q testQuerier) Close() error { func TestQueryRange(t *testing.T) { type query struct { - from model.Time - through model.Time - out model.Matrix + from model.Time + through model.Time + out model.Matrix + localOnly bool } tests := []struct { @@ -485,6 +486,105 @@ func TestQueryRange(t *testing.T) { }, }, }, + + { + name: "context value to indicate only local querying is set", + local: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + model.MetricNameLabel: "testmetric", + }, + Values: []model.SamplePair{ + { + Timestamp: 2, + Value: 2, + }, + { + Timestamp: 3, + Value: 3, + }, + }, + }, + }, + remote: []model.Matrix{ + model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + model.MetricNameLabel: "testmetric", + }, + Values: []model.SamplePair{ + { + Timestamp: 0, + Value: 0, + }, + { + Timestamp: 1, + Value: 1, + }, + { + Timestamp: 2, + Value: 2, + }, + }, + }, + }, + }, + queries: []query{ + { + from: 0, + through: 3, + localOnly: true, + out: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + model.MetricNameLabel: "testmetric", + }, + Values: []model.SamplePair{ + { + Timestamp: 2, + Value: 2, + }, + { + Timestamp: 3, + Value: 3, + }, + }, + }, + }, + }, + { + from: 1, + through: 1, + localOnly: true, + out: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + model.MetricNameLabel: "testmetric", + }, + Values: []model.SamplePair{model.ZeroSamplePair}, + }, + }, + }, + { + from: 2, + through: 2, + localOnly: true, + out: model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + model.MetricNameLabel: "testmetric", + }, + Values: []model.SamplePair{ + { + Timestamp: 2, + Value: 2, + }, + }, + }, + }, + }, + }, + }, } matcher, err := metric.NewLabelMatcher(metric.Equal, model.MetricNameLabel, "testmetric") @@ -501,12 +601,16 @@ func TestQueryRange(t *testing.T) { } for i, query := range test.queries { + ctx := context.Background() + if query.localOnly { + ctx = WithLocalOnly(ctx) + } var its []local.SeriesIterator var err error if query.from == query.through { - its, err = q.QueryInstant(context.Background(), query.from, 5*time.Minute, matcher) + its, err = q.QueryInstant(ctx, query.from, 5*time.Minute, matcher) } else { - its, err = q.QueryRange(context.Background(), query.from, query.through, matcher) + its, err = q.QueryRange(ctx, query.from, query.through, matcher) } if err != nil { t.Fatal(err)