From 6daee89e5f8b3c81dbeb451fa86635051375160f Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 12 Sep 2023 12:37:38 +0200 Subject: [PATCH] Add context argument to Querier.Select (#12660) Signed-off-by: Arve Knudsen --- cmd/prometheus/main.go | 8 +- cmd/promtool/backfill_test.go | 4 +- cmd/promtool/main.go | 4 +- cmd/promtool/rules_test.go | 8 +- cmd/promtool/tsdb.go | 6 +- promql/engine.go | 8 +- promql/engine_test.go | 10 +-- promql/test_test.go | 4 +- rules/alerting.go | 4 +- rules/alerting_test.go | 2 +- rules/manager.go | 4 +- rules/manager_test.go | 16 ++-- scrape/scrape_test.go | 20 ++--- storage/fanout.go | 12 +-- storage/fanout_test.go | 24 +++--- storage/generic.go | 18 +++-- storage/interface.go | 18 ++--- storage/merge.go | 9 ++- storage/merge_test.go | 13 +-- storage/noop.go | 6 +- storage/remote/read.go | 15 ++-- storage/remote/read_handler.go | 8 +- storage/remote/read_test.go | 4 +- storage/remote/storage.go | 8 +- storage/secondary.go | 5 +- tsdb/agent/db.go | 4 +- tsdb/agent/db_test.go | 4 +- tsdb/block_test.go | 2 +- tsdb/db.go | 12 +-- tsdb/db_test.go | 140 ++++++++++++++++----------------- tsdb/example_test.go | 4 +- tsdb/head_test.go | 32 ++++---- tsdb/querier.go | 5 +- tsdb/querier_bench_test.go | 2 +- tsdb/querier_test.go | 26 +++--- web/api/v1/api.go | 12 +-- web/api/v1/api_test.go | 4 +- web/api/v1/errors_test.go | 6 +- web/federate.go | 6 +- web/federate_test.go | 2 +- 40 files changed, 254 insertions(+), 245 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index cab65626a..ab5790021 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1378,17 +1378,17 @@ func (s *readyStorage) StartTime() (int64, error) { } // Querier implements the Storage interface. -func (s *readyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { +func (s *readyStorage) Querier(mint, maxt int64) (storage.Querier, error) { if x := s.get(); x != nil { - return x.Querier(ctx, mint, maxt) + return x.Querier(mint, maxt) } return nil, tsdb.ErrNotReady } // ChunkQuerier implements the Storage interface. -func (s *readyStorage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { +func (s *readyStorage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { if x := s.get(); x != nil { - return x.ChunkQuerier(ctx, mint, maxt) + return x.ChunkQuerier(mint, maxt) } return nil, tsdb.ErrNotReady } diff --git a/cmd/promtool/backfill_test.go b/cmd/promtool/backfill_test.go index e6f7cad31..b77dc7826 100644 --- a/cmd/promtool/backfill_test.go +++ b/cmd/promtool/backfill_test.go @@ -45,7 +45,7 @@ func sortSamples(samples []backfillSample) { } func queryAllSeries(t testing.TB, q storage.Querier, expectedMinTime, expectedMaxTime int64) []backfillSample { // nolint:revive - ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) + ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) samples := []backfillSample{} for ss.Next() { series := ss.At() @@ -67,7 +67,7 @@ func testBlocks(t *testing.T, db *tsdb.DB, expectedMinTime, expectedMaxTime, exp require.Equal(t, block.MinTime()/expectedBlockDuration, (block.MaxTime()-1)/expectedBlockDuration, "block %d contains data outside of one aligned block duration", i) } - q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) defer func() { require.NoError(t, q.Close()) diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index da4b8dc79..1d330a02d 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -86,6 +86,8 @@ func main() { httpConfigFilePath string ) + ctx := context.Background() + app := kingpin.New(filepath.Base(os.Args[0]), "Tooling for the Prometheus monitoring system.").UsageWriter(os.Stdout) app.Version(version.Print("promtool")) app.HelpFlag.Short('h') @@ -376,7 +378,7 @@ func main() { os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable))) case tsdbDumpCmd.FullCommand(): - os.Exit(checkErr(dumpSamples(*dumpPath, *dumpMinTime, *dumpMaxTime, *dumpMatch))) + os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpMinTime, *dumpMaxTime, *dumpMatch))) // TODO(aSquare14): Work on adding support for custom block size. case openMetricsImportCmd.FullCommand(): os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration)) diff --git a/cmd/promtool/rules_test.go b/cmd/promtool/rules_test.go index 213b7d2a0..bfea7c937 100644 --- a/cmd/promtool/rules_test.go +++ b/cmd/promtool/rules_test.go @@ -124,10 +124,10 @@ func TestBackfillRuleIntegration(t *testing.T) { blocks := db.Blocks() require.Equal(t, (i+1)*tt.expectedBlockCount, len(blocks)) - q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) - selectedSeries := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) + selectedSeries := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) var seriesCount, samplesCount int for selectedSeries.Next() { seriesCount++ @@ -248,11 +248,11 @@ func TestBackfillLabels(t *testing.T) { db, err := tsdb.Open(tmpDir, nil, nil, opts, nil) require.NoError(t, err) - q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) t.Run("correct-labels", func(t *testing.T) { - selectedSeries := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) + selectedSeries := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) for selectedSeries.Next() { series := selectedSeries.At() expectedLabels := labels.FromStrings("__name__", "rulename", "name1", "value-from-rule") diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 820cd4687..2ade90d10 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -619,7 +619,7 @@ func analyzeCompaction(block tsdb.BlockReader, indexr tsdb.IndexReader) (err err return nil } -func dumpSamples(path string, mint, maxt int64, match string) (err error) { +func dumpSamples(ctx context.Context, path string, mint, maxt int64, match string) (err error) { db, err := tsdb.OpenDBReadOnly(path, nil) if err != nil { return err @@ -627,7 +627,7 @@ func dumpSamples(path string, mint, maxt int64, match string) (err error) { defer func() { err = tsdb_errors.NewMulti(err, db.Close()).Err() }() - q, err := db.Querier(context.TODO(), mint, maxt) + q, err := db.Querier(mint, maxt) if err != nil { return err } @@ -637,7 +637,7 @@ func dumpSamples(path string, mint, maxt int64, match string) (err error) { if err != nil { return err } - ss := q.Select(false, nil, matchers...) + ss := q.Select(ctx, false, nil, matchers...) for ss.Next() { series := ss.At() diff --git a/promql/engine.go b/promql/engine.go index 816f20721..13fcd6762 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -669,14 +669,14 @@ func durationMilliseconds(d time.Duration) int64 { func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.EvalStmt) (parser.Value, storage.Warnings, error) { prepareSpanTimer, ctxPrepare := query.stats.GetSpanTimer(ctx, stats.QueryPreparationTime, ng.metrics.queryPrepareTime) mint, maxt := ng.findMinMaxTime(s) - querier, err := query.queryable.Querier(ctxPrepare, mint, maxt) + querier, err := query.queryable.Querier(mint, maxt) if err != nil { prepareSpanTimer.Finish() return nil, nil, err } defer querier.Close() - ng.populateSeries(querier, s) + ng.populateSeries(ctxPrepare, querier, s) prepareSpanTimer.Finish() // Modify the offset of vector and matrix selectors for the @ modifier @@ -890,7 +890,7 @@ func (ng *Engine) getLastSubqueryInterval(path []parser.Node) time.Duration { return interval } -func (ng *Engine) populateSeries(querier storage.Querier, s *parser.EvalStmt) { +func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s *parser.EvalStmt) { // Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range. // The evaluation of the VectorSelector inside then evaluates the given range and unsets // the variable. @@ -913,7 +913,7 @@ func (ng *Engine) populateSeries(querier storage.Querier, s *parser.EvalStmt) { } evalRange = 0 hints.By, hints.Grouping = extractGroupsFromPath(path) - n.UnexpandedSeriesSet = querier.Select(false, hints, n.LabelMatchers...) + n.UnexpandedSeriesSet = querier.Select(ctx, false, hints, n.LabelMatchers...) case *parser.MatrixSelector: evalRange = n.Range diff --git a/promql/engine_test.go b/promql/engine_test.go index 82e44bcbc..3c6d30e47 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -194,7 +194,7 @@ type errQuerier struct { err error } -func (q *errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { +func (q *errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { return errSeriesSet{err: q.err} } @@ -226,7 +226,7 @@ func TestQueryError(t *testing.T) { } engine := NewEngine(opts) errStorage := ErrStorage{errors.New("storage error")} - queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + queryable := storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { return &errQuerier{err: errStorage}, nil }) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -251,7 +251,7 @@ type noopHintRecordingQueryable struct { hints []*storage.SelectHints } -func (h *noopHintRecordingQueryable) Querier(context.Context, int64, int64) (storage.Querier, error) { +func (h *noopHintRecordingQueryable) Querier(int64, int64) (storage.Querier, error) { return &hintRecordingQuerier{Querier: &errQuerier{}, h: h}, nil } @@ -261,9 +261,9 @@ type hintRecordingQuerier struct { h *noopHintRecordingQueryable } -func (h *hintRecordingQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { +func (h *hintRecordingQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { h.h.hints = append(h.h.hints, hints) - return h.Querier.Select(sortSeries, hints, matchers...) + return h.Querier.Select(ctx, sortSeries, hints, matchers...) } func TestSelectHintsSetCorrectly(t *testing.T) { diff --git a/promql/test_test.go b/promql/test_test.go index cc1df62d0..ee2a0e264 100644 --- a/promql/test_test.go +++ b/promql/test_test.go @@ -123,7 +123,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) { // Check the series. queryable := suite.Queryable() - querier, err := queryable.Querier(suite.Context(), math.MinInt64, math.MaxInt64) + querier, err := queryable.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) for _, s := range tc.series { var matchers []*labels.Matcher @@ -134,7 +134,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) { }) // Get the series for the matcher. - ss := querier.Select(false, nil, matchers...) + ss := querier.Select(suite.Context(), false, nil, matchers...) require.True(t, ss.Next()) storageSeries := ss.At() require.False(t, ss.Next(), "Expecting only 1 series") diff --git a/rules/alerting.go b/rules/alerting.go index aed4e8f53..228809486 100644 --- a/rules/alerting.go +++ b/rules/alerting.go @@ -261,7 +261,7 @@ func (r *AlertingRule) forStateSample(alert *Alert, ts time.Time, v float64) pro } // QueryforStateSeries returns the series for ALERTS_FOR_STATE. -func (r *AlertingRule) QueryforStateSeries(alert *Alert, q storage.Querier) (storage.Series, error) { +func (r *AlertingRule) QueryforStateSeries(ctx context.Context, alert *Alert, q storage.Querier) (storage.Series, error) { smpl := r.forStateSample(alert, time.Now(), 0) var matchers []*labels.Matcher smpl.Metric.Range(func(l labels.Label) { @@ -271,7 +271,7 @@ func (r *AlertingRule) QueryforStateSeries(alert *Alert, q storage.Querier) (sto } matchers = append(matchers, mt) }) - sset := q.Select(false, nil, matchers...) + sset := q.Select(ctx, false, nil, matchers...) var s storage.Series for sset.Next() { diff --git a/rules/alerting_test.go b/rules/alerting_test.go index f980c2a98..f8edcc767 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -659,7 +659,7 @@ func TestQueryForStateSeries(t *testing.T) { ValidUntil: time.Time{}, } - series, err := rule.QueryforStateSeries(alert, querier) + series, err := rule.QueryforStateSeries(context.Background(), alert, querier) require.Equal(t, tst.expectedSeries, series) require.Equal(t, tst.expectedError, err) diff --git a/rules/manager.go b/rules/manager.go index 0d8d022cd..e5e248cde 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -814,7 +814,7 @@ func (g *Group) RestoreForState(ts time.Time) { // We allow restoration only if alerts were active before after certain time. mint := ts.Add(-g.opts.OutageTolerance) mintMS := int64(model.TimeFromUnixNano(mint.UnixNano())) - q, err := g.opts.Queryable.Querier(g.opts.Context, mintMS, maxtMS) + q, err := g.opts.Queryable.Querier(mintMS, maxtMS) if err != nil { level.Error(g.logger).Log("msg", "Failed to get Querier", "err", err) return @@ -843,7 +843,7 @@ func (g *Group) RestoreForState(ts time.Time) { alertRule.ForEachActiveAlert(func(a *Alert) { var s storage.Series - s, err := alertRule.QueryforStateSeries(a, q) + s, err := alertRule.QueryforStateSeries(g.opts.Context, a, q) if err != nil { // Querier Warnings are ignored. We do not care unless we have an error. level.Error(g.logger).Log( diff --git a/rules/manager_test.go b/rules/manager_test.go index f301aa010..75ee34919 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -558,14 +558,14 @@ func TestStaleness(t *testing.T) { group.Eval(ctx, time.Unix(1, 0)) group.Eval(ctx, time.Unix(2, 0)) - querier, err := st.Querier(context.Background(), 0, 2000) + querier, err := st.Querier(0, 2000) require.NoError(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") require.NoError(t, err) - set := querier.Select(false, nil, matcher) + set := querier.Select(ctx, false, nil, matcher) samples, err := readSeriesSet(set) require.NoError(t, err) @@ -681,14 +681,14 @@ func TestDeletedRuleMarkedStale(t *testing.T) { newGroup.Eval(context.Background(), time.Unix(0, 0)) - querier, err := st.Querier(context.Background(), 0, 2000) + querier, err := st.Querier(0, 2000) require.NoError(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, "l1", "v1") require.NoError(t, err) - set := querier.Select(false, nil, matcher) + set := querier.Select(context.Background(), false, nil, matcher) samples, err := readSeriesSet(set) require.NoError(t, err) @@ -1107,14 +1107,14 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) { func countStaleNaN(t *testing.T, st storage.Storage) int { var c int - querier, err := st.Querier(context.Background(), 0, time.Now().Unix()*1000) + querier, err := st.Querier(0, time.Now().Unix()*1000) require.NoError(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2") require.NoError(t, err) - set := querier.Select(false, nil, matcher) + set := querier.Select(context.Background(), false, nil, matcher) samples, err := readSeriesSet(set) require.NoError(t, err) @@ -1381,9 +1381,9 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) { group.Eval(context.Background(), ts.Add(10*time.Second)) - q, err := db.Querier(context.Background(), ts.UnixMilli(), ts.Add(20*time.Second).UnixMilli()) + q, err := db.Querier(ts.UnixMilli(), ts.Add(20*time.Second).UnixMilli()) require.NoError(t, err) - ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "__name__", "sum:histogram_metric")) + ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "__name__", "sum:histogram_metric")) require.True(t, ss.Next()) s := ss.At() require.False(t, ss.Next()) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 4c80d1d98..3b7d6a7ab 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -2925,9 +2925,9 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { require.Error(t, err) require.NoError(t, slApp.Rollback()) - q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) + q, err := s.Querier(time.Time{}.UnixNano(), 0) require.NoError(t, err) - series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) require.Equal(t, false, series.Next(), "series found in tsdb") require.NoError(t, series.Err()) @@ -2937,9 +2937,9 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { require.NoError(t, err) require.NoError(t, slApp.Commit()) - q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0) + q, err = s.Querier(time.Time{}.UnixNano(), 0) require.NoError(t, err) - series = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500")) + series = q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500")) require.Equal(t, true, series.Next(), "series not found in tsdb") require.NoError(t, series.Err()) require.Equal(t, false, series.Next(), "more than one series found in tsdb") @@ -2984,9 +2984,9 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { require.NoError(t, slApp.Rollback()) require.Equal(t, errNameLabelMandatory, err) - q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) + q, err := s.Querier(time.Time{}.UnixNano(), 0) require.NoError(t, err) - series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) require.Equal(t, false, series.Next(), "series found in tsdb") require.NoError(t, series.Err()) } @@ -3346,9 +3346,9 @@ func TestScrapeReportSingleAppender(t *testing.T) { start := time.Now() for time.Since(start) < 3*time.Second { - q, err := s.Querier(ctx, time.Time{}.UnixNano(), time.Now().UnixNano()) + q, err := s.Querier(time.Time{}.UnixNano(), time.Now().UnixNano()) require.NoError(t, err) - series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".+")) + series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".+")) c := 0 for series.Next() { @@ -3418,10 +3418,10 @@ func TestScrapeReportLimit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - q, err := s.Querier(ctx, time.Time{}.UnixNano(), time.Now().UnixNano()) + q, err := s.Querier(time.Time{}.UnixNano(), time.Now().UnixNano()) require.NoError(t, err) defer q.Close() - series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "up")) + series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "up")) var found bool for series.Next() { diff --git a/storage/fanout.go b/storage/fanout.go index a9db4f628..33257046f 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -72,15 +72,15 @@ func (f *fanout) StartTime() (int64, error) { return firstTime, nil } -func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) { - primary, err := f.primary.Querier(ctx, mint, maxt) +func (f *fanout) Querier(mint, maxt int64) (Querier, error) { + primary, err := f.primary.Querier(mint, maxt) if err != nil { return nil, err } secondaries := make([]Querier, 0, len(f.secondaries)) for _, storage := range f.secondaries { - querier, err := storage.Querier(ctx, mint, maxt) + querier, err := storage.Querier(mint, maxt) if err != nil { // Close already open Queriers, append potential errors to returned error. errs := tsdb_errors.NewMulti(err, primary.Close()) @@ -94,15 +94,15 @@ func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) return NewMergeQuerier([]Querier{primary}, secondaries, ChainedSeriesMerge), nil } -func (f *fanout) ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, error) { - primary, err := f.primary.ChunkQuerier(ctx, mint, maxt) +func (f *fanout) ChunkQuerier(mint, maxt int64) (ChunkQuerier, error) { + primary, err := f.primary.ChunkQuerier(mint, maxt) if err != nil { return nil, err } secondaries := make([]ChunkQuerier, 0, len(f.secondaries)) for _, storage := range f.secondaries { - querier, err := storage.ChunkQuerier(ctx, mint, maxt) + querier, err := storage.ChunkQuerier(mint, maxt) if err != nil { // Close already open Queriers, append potential errors to returned error. errs := tsdb_errors.NewMulti(err, primary.Close()) diff --git a/storage/fanout_test.go b/storage/fanout_test.go index b4490636d..eeae01670 100644 --- a/storage/fanout_test.go +++ b/storage/fanout_test.go @@ -75,14 +75,14 @@ func TestFanout_SelectSorted(t *testing.T) { fanoutStorage := storage.NewFanout(nil, priStorage, remoteStorage1, remoteStorage2) t.Run("querier", func(t *testing.T) { - querier, err := fanoutStorage.Querier(context.Background(), 0, 8000) + querier, err := fanoutStorage.Querier(0, 8000) require.NoError(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") require.NoError(t, err) - seriesSet := querier.Select(true, nil, matcher) + seriesSet := querier.Select(ctx, true, nil, matcher) result := make(map[int64]float64) var labelsResult labels.Labels @@ -102,14 +102,14 @@ func TestFanout_SelectSorted(t *testing.T) { require.Equal(t, inputTotalSize, len(result)) }) t.Run("chunk querier", func(t *testing.T) { - querier, err := fanoutStorage.ChunkQuerier(ctx, 0, 8000) + querier, err := fanoutStorage.ChunkQuerier(0, 8000) require.NoError(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") require.NoError(t, err) - seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(true, nil, matcher)) + seriesSet := storage.NewSeriesSetFromChunkSeriesSet(querier.Select(ctx, true, nil, matcher)) result := make(map[int64]float64) var labelsResult labels.Labels @@ -159,12 +159,12 @@ func TestFanoutErrors(t *testing.T) { fanoutStorage := storage.NewFanout(nil, tc.primary, tc.secondary) t.Run("samples", func(t *testing.T) { - querier, err := fanoutStorage.Querier(context.Background(), 0, 8000) + querier, err := fanoutStorage.Querier(0, 8000) require.NoError(t, err) defer querier.Close() matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b") - ss := querier.Select(true, nil, matcher) + ss := querier.Select(context.Background(), true, nil, matcher) // Exhaust. for ss.Next() { @@ -184,12 +184,12 @@ func TestFanoutErrors(t *testing.T) { }) t.Run("chunks", func(t *testing.T) { t.Skip("enable once TestStorage and TSDB implements ChunkQuerier") - querier, err := fanoutStorage.ChunkQuerier(context.Background(), 0, 8000) + querier, err := fanoutStorage.ChunkQuerier(0, 8000) require.NoError(t, err) defer querier.Close() matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b") - ss := querier.Select(true, nil, matcher) + ss := querier.Select(context.Background(), true, nil, matcher) // Exhaust. for ss.Next() { @@ -216,20 +216,20 @@ type errStorage struct{} type errQuerier struct{} -func (errStorage) Querier(_ context.Context, _, _ int64) (storage.Querier, error) { +func (errStorage) Querier(_, _ int64) (storage.Querier, error) { return errQuerier{}, nil } type errChunkQuerier struct{ errQuerier } -func (errStorage) ChunkQuerier(_ context.Context, _, _ int64) (storage.ChunkQuerier, error) { +func (errStorage) ChunkQuerier(_, _ int64) (storage.ChunkQuerier, error) { return errChunkQuerier{}, nil } func (errStorage) Appender(_ context.Context) storage.Appender { return nil } func (errStorage) StartTime() (int64, error) { return 0, nil } func (errStorage) Close() error { return nil } -func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { +func (errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet { return storage.ErrSeriesSet(errSelect) } @@ -243,6 +243,6 @@ func (errQuerier) LabelNames(...*labels.Matcher) ([]string, storage.Warnings, er func (errQuerier) Close() error { return nil } -func (errChunkQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet { +func (errChunkQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels.Matcher) storage.ChunkSeriesSet { return storage.ErrChunkSeriesSet(errSelect) } diff --git a/storage/generic.go b/storage/generic.go index 6762f32a1..a62ba4a89 100644 --- a/storage/generic.go +++ b/storage/generic.go @@ -17,12 +17,14 @@ package storage import ( + "context" + "github.com/prometheus/prometheus/model/labels" ) type genericQuerier interface { LabelQuerier - Select(bool, *SelectHints, ...*labels.Matcher) genericSeriesSet + Select(context.Context, bool, *SelectHints, ...*labels.Matcher) genericSeriesSet } type genericSeriesSet interface { @@ -58,11 +60,11 @@ type genericQuerierAdapter struct { cq ChunkQuerier } -func (q *genericQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { +func (q *genericQuerierAdapter) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { if q.q != nil { - return &genericSeriesSetAdapter{q.q.Select(sortSeries, hints, matchers...)} + return &genericSeriesSetAdapter{q.q.Select(ctx, sortSeries, hints, matchers...)} } - return &genericChunkSeriesSetAdapter{q.cq.Select(sortSeries, hints, matchers...)} + return &genericChunkSeriesSetAdapter{q.cq.Select(ctx, sortSeries, hints, matchers...)} } func newGenericQuerierFrom(q Querier) genericQuerier { @@ -85,8 +87,8 @@ func (a *seriesSetAdapter) At() Series { return a.genericSeriesSet.At().(Series) } -func (q *querierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet { - return &seriesSetAdapter{q.genericQuerier.Select(sortSeries, hints, matchers...)} +func (q *querierAdapter) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet { + return &seriesSetAdapter{q.genericQuerier.Select(ctx, sortSeries, hints, matchers...)} } type chunkQuerierAdapter struct { @@ -101,8 +103,8 @@ func (a *chunkSeriesSetAdapter) At() ChunkSeries { return a.genericSeriesSet.At().(ChunkSeries) } -func (q *chunkQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet { - return &chunkSeriesSetAdapter{q.genericQuerier.Select(sortSeries, hints, matchers...)} +func (q *chunkQuerierAdapter) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet { + return &chunkSeriesSetAdapter{q.genericQuerier.Select(ctx, sortSeries, hints, matchers...)} } type seriesMergerAdapter struct { diff --git a/storage/interface.go b/storage/interface.go index b282f1fc6..36e15d4ba 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -91,7 +91,7 @@ type ExemplarStorage interface { // Use it when you need to have access to all samples without chunk encoding abstraction e.g promQL. type Queryable interface { // Querier returns a new Querier on the storage. - Querier(ctx context.Context, mint, maxt int64) (Querier, error) + Querier(mint, maxt int64) (Querier, error) } // A MockQueryable is used for testing purposes so that a mock Querier can be used. @@ -99,7 +99,7 @@ type MockQueryable struct { MockQuerier Querier } -func (q *MockQueryable) Querier(context.Context, int64, int64) (Querier, error) { +func (q *MockQueryable) Querier(int64, int64) (Querier, error) { return q.MockQuerier, nil } @@ -110,7 +110,7 @@ type Querier interface { // Select returns a set of series that matches the given label matchers. // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. // It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all. - Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet + Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet } // MockQuerier is used for test purposes to mock the selected series that is returned. @@ -130,7 +130,7 @@ func (q *MockQuerier) Close() error { return nil } -func (q *MockQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet { +func (q *MockQuerier) Select(_ context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet { return q.SelectMockFunction(sortSeries, hints, matchers...) } @@ -138,7 +138,7 @@ func (q *MockQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*l // Use it when you need to have access to samples in encoded format. type ChunkQueryable interface { // ChunkQuerier returns a new ChunkQuerier on the storage. - ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, error) + ChunkQuerier(mint, maxt int64) (ChunkQuerier, error) } // ChunkQuerier provides querying access over time series data of a fixed time range. @@ -148,7 +148,7 @@ type ChunkQuerier interface { // Select returns a set of series that matches the given label matchers. // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. // It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all. - Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet + Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) ChunkSeriesSet } // LabelQuerier provides querying access over labels. @@ -202,11 +202,11 @@ type SelectHints struct { // TODO(bwplotka): Move to promql/engine_test.go? // QueryableFunc is an adapter to allow the use of ordinary functions as // Queryables. It follows the idea of http.HandlerFunc. -type QueryableFunc func(ctx context.Context, mint, maxt int64) (Querier, error) +type QueryableFunc func(mint, maxt int64) (Querier, error) // Querier calls f() with the given parameters. -func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier, error) { - return f(ctx, mint, maxt) +func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) { + return f(mint, maxt) } // Appender provides batched appends against a storage. diff --git a/storage/merge.go b/storage/merge.go index c0665d720..4e76a9936 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -16,6 +16,7 @@ package storage import ( "bytes" "container/heap" + "context" "fmt" "math" "sync" @@ -97,19 +98,19 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica } // Select returns a set of series that matches the given label matchers. -func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { +func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { if len(q.queriers) == 0 { return noopGenericSeriesSet{} } if len(q.queriers) == 1 { - return q.queriers[0].Select(sortSeries, hints, matchers...) + return q.queriers[0].Select(ctx, sortSeries, hints, matchers...) } seriesSets := make([]genericSeriesSet, 0, len(q.queriers)) if !q.concurrentSelect { for _, querier := range q.queriers { // We need to sort for merge to work. - seriesSets = append(seriesSets, querier.Select(true, hints, matchers...)) + seriesSets = append(seriesSets, querier.Select(ctx, true, hints, matchers...)) } return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) { s := newGenericMergeSeriesSet(seriesSets, q.mergeFn) @@ -128,7 +129,7 @@ func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matche defer wg.Done() // We need to sort for NewMergeSeriesSet to work. - seriesSetChan <- qr.Select(true, hints, matchers...) + seriesSetChan <- qr.Select(ctx, true, hints, matchers...) }(querier) } go func() { diff --git a/storage/merge_test.go b/storage/merge_test.go index e0b938be7..3408b16b9 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -14,6 +14,7 @@ package storage import ( + "context" "errors" "fmt" "math" @@ -187,7 +188,7 @@ func TestMergeQuerierWithChainMerger(t *testing.T) { } qs = append(qs, tc.extraQueriers...) - mergedQuerier := NewMergeQuerier([]Querier{p}, qs, ChainedSeriesMerge).Select(false, nil) + mergedQuerier := NewMergeQuerier([]Querier{p}, qs, ChainedSeriesMerge).Select(context.Background(), false, nil) // Get all merged series upfront to make sure there are no incorrectly retained shared // buffers causing bugs. @@ -363,7 +364,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { } qs = append(qs, tc.extraQueriers...) - merged := NewMergeChunkQuerier([]ChunkQuerier{p}, qs, NewCompactingChunkSeriesMerger(nil)).Select(false, nil) + merged := NewMergeChunkQuerier([]ChunkQuerier{p}, qs, NewCompactingChunkSeriesMerger(nil)).Select(context.Background(), false, nil) for merged.Next() { require.True(t, tc.expected.Next(), "Expected Next() to be true") actualSeries := merged.At() @@ -721,7 +722,7 @@ func (a seriesByLabel) Len() int { return len(a) } func (a seriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a seriesByLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } -func (m *mockQuerier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) SeriesSet { +func (m *mockQuerier) Select(_ context.Context, sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) SeriesSet { cpy := make([]Series, len(m.toReturn)) copy(cpy, m.toReturn) if sortSeries { @@ -745,7 +746,7 @@ func (a chunkSeriesByLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } -func (m *mockChunkQurier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) ChunkSeriesSet { +func (m *mockChunkQurier) Select(_ context.Context, sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) ChunkSeriesSet { cpy := make([]ChunkSeries, len(m.toReturn)) copy(cpy, m.toReturn) if sortSeries { @@ -982,7 +983,7 @@ type labelNameRequest struct { matchers []*labels.Matcher } -func (m *mockGenericQuerier) Select(b bool, _ *SelectHints, _ ...*labels.Matcher) genericSeriesSet { +func (m *mockGenericQuerier) Select(_ context.Context, b bool, _ *SelectHints, _ ...*labels.Matcher) genericSeriesSet { m.mtx.Lock() m.sortedSeriesRequested = append(m.sortedSeriesRequested, b) m.mtx.Unlock() @@ -1177,7 +1178,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { } t.Run("Select", func(t *testing.T) { - res := q.Select(false, nil) + res := q.Select(context.Background(), false, nil) var lbls []labels.Labels for res.Next() { lbls = append(lbls, res.At().Labels()) diff --git a/storage/noop.go b/storage/noop.go index 83953ca43..9a7c68ec7 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -14,6 +14,8 @@ package storage import ( + "context" + "github.com/prometheus/prometheus/model/labels" ) @@ -24,7 +26,7 @@ func NoopQuerier() Querier { return noopQuerier{} } -func (noopQuerier) Select(bool, *SelectHints, ...*labels.Matcher) SeriesSet { +func (noopQuerier) Select(context.Context, bool, *SelectHints, ...*labels.Matcher) SeriesSet { return NoopSeriesSet() } @@ -47,7 +49,7 @@ func NoopChunkedQuerier() ChunkQuerier { return noopChunkQuerier{} } -func (noopChunkQuerier) Select(bool, *SelectHints, ...*labels.Matcher) ChunkSeriesSet { +func (noopChunkQuerier) Select(context.Context, bool, *SelectHints, ...*labels.Matcher) ChunkSeriesSet { return NoopChunkedSeriesSet() } diff --git a/storage/remote/read.go b/storage/remote/read.go index af61334f4..c7643b6ea 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -48,9 +48,8 @@ func NewSampleAndChunkQueryableClient( } } -func (c *sampleAndChunkQueryableClient) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { +func (c *sampleAndChunkQueryableClient) Querier(mint, maxt int64) (storage.Querier, error) { q := &querier{ - ctx: ctx, mint: mint, maxt: maxt, client: c.client, @@ -75,10 +74,9 @@ func (c *sampleAndChunkQueryableClient) Querier(ctx context.Context, mint, maxt return q, nil } -func (c *sampleAndChunkQueryableClient) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { +func (c *sampleAndChunkQueryableClient) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { cq := &chunkQuerier{ querier: querier{ - ctx: ctx, mint: mint, maxt: maxt, client: c.client, @@ -125,7 +123,6 @@ func (c *sampleAndChunkQueryableClient) preferLocalStorage(mint, maxt int64) (cm } type querier struct { - ctx context.Context mint, maxt int64 client ReadClient @@ -140,7 +137,7 @@ type querier struct { // // If requiredMatchers are given, select returns a NoopSeriesSet if the given matchers don't match the label set of the // requiredMatchers. Otherwise it'll just call remote endpoint. -func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { +func (q *querier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { if len(q.requiredMatchers) > 0 { // Copy to not modify slice configured by user. requiredMatchers := append([]*labels.Matcher{}, q.requiredMatchers...) @@ -167,7 +164,7 @@ func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers . return storage.ErrSeriesSet(fmt.Errorf("toQuery: %w", err)) } - res, err := q.client.Read(q.ctx, query) + res, err := q.client.Read(ctx, query) if err != nil { return storage.ErrSeriesSet(fmt.Errorf("remote_read: %w", err)) } @@ -235,9 +232,9 @@ type chunkQuerier struct { // Select implements storage.ChunkQuerier and uses the given matchers to read chunk series sets from the client. // It uses remote.querier.Select so it supports external labels and required matchers if specified. -func (q *chunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { +func (q *chunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { // TODO(bwplotka) Support remote read chunked and allow returning chunks directly (TODO ticket). - return storage.NewSeriesSetToChunkSet(q.querier.Select(sortSeries, hints, matchers...)) + return storage.NewSeriesSetToChunkSet(q.querier.Select(ctx, sortSeries, hints, matchers...)) } // Note strings in toFilter must be sorted. diff --git a/storage/remote/read_handler.go b/storage/remote/read_handler.go index aca4d7dd5..baa19e3ac 100644 --- a/storage/remote/read_handler.go +++ b/storage/remote/read_handler.go @@ -131,7 +131,7 @@ func (h *readHandler) remoteReadSamples( return err } - querier, err := h.queryable.Querier(ctx, query.StartTimestampMs, query.EndTimestampMs) + querier, err := h.queryable.Querier(query.StartTimestampMs, query.EndTimestampMs) if err != nil { return err } @@ -155,7 +155,7 @@ func (h *readHandler) remoteReadSamples( } var ws storage.Warnings - resp.Results[i], ws, err = ToQueryResult(querier.Select(false, hints, filteredMatchers...), h.remoteReadSampleLimit) + resp.Results[i], ws, err = ToQueryResult(querier.Select(ctx, false, hints, filteredMatchers...), h.remoteReadSampleLimit) if err != nil { return err } @@ -198,7 +198,7 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re return err } - querier, err := h.queryable.ChunkQuerier(ctx, query.StartTimestampMs, query.EndTimestampMs) + querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs) if err != nil { return err } @@ -225,7 +225,7 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re NewChunkedWriter(w, f), int64(i), // The streaming API has to provide the series sorted. - querier.Select(true, hints, filteredMatchers...), + querier.Select(ctx, true, hints, filteredMatchers...), sortedExternalLabels, h.remoteReadMaxBytesInFrame, h.marshalPool, diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index fe0633d7d..be4bac24f 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -469,11 +469,11 @@ func TestSampleAndChunkQueryableClient(t *testing.T) { tc.readRecent, tc.callback, ) - q, err := c.Querier(context.TODO(), tc.mint, tc.maxt) + q, err := c.Querier(tc.mint, tc.maxt) require.NoError(t, err) defer require.NoError(t, q.Close()) - ss := q.Select(true, nil, tc.matchers...) + ss := q.Select(context.Background(), true, nil, tc.matchers...) require.NoError(t, err) require.Equal(t, storage.Warnings(nil), ss.Warnings()) diff --git a/storage/remote/storage.go b/storage/remote/storage.go index d01f96b3b..b6533f927 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -152,14 +152,14 @@ func (s *Storage) StartTime() (int64, error) { // Returned querier will never return error as all queryables are assumed best effort. // Additionally all returned queriers ensure that its Select's SeriesSets have ready data after first `Next` invoke. // This is because Prometheus (fanout and secondary queries) can't handle the stream failing half way through by design. -func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { +func (s *Storage) Querier(mint, maxt int64) (storage.Querier, error) { s.mtx.Lock() queryables := s.queryables s.mtx.Unlock() queriers := make([]storage.Querier, 0, len(queryables)) for _, queryable := range queryables { - q, err := queryable.Querier(ctx, mint, maxt) + q, err := queryable.Querier(mint, maxt) if err != nil { return nil, err } @@ -170,14 +170,14 @@ func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querie // ChunkQuerier returns a storage.MergeQuerier combining the remote client queriers // of each configured remote read endpoint. -func (s *Storage) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { +func (s *Storage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { s.mtx.Lock() queryables := s.queryables s.mtx.Unlock() queriers := make([]storage.ChunkQuerier, 0, len(queryables)) for _, queryable := range queryables { - q, err := queryable.ChunkQuerier(ctx, mint, maxt) + q, err := queryable.ChunkQuerier(mint, maxt) if err != nil { return nil, err } diff --git a/storage/secondary.go b/storage/secondary.go index d66a28617..16740baaf 100644 --- a/storage/secondary.go +++ b/storage/secondary.go @@ -14,6 +14,7 @@ package storage import ( + "context" "sync" "github.com/prometheus/prometheus/model/labels" @@ -63,12 +64,12 @@ func (s *secondaryQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, Wa return names, w, nil } -func (s *secondaryQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { +func (s *secondaryQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { if s.done { panic("secondaryQuerier: Select invoked after first Next of any returned SeriesSet was done") } - s.asyncSets = append(s.asyncSets, s.genericQuerier.Select(sortSeries, hints, matchers...)) + s.asyncSets = append(s.asyncSets, s.genericQuerier.Select(ctx, sortSeries, hints, matchers...)) curr := len(s.asyncSets) - 1 return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) { s.once.Do(func() { diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 77b77fc23..72ceddb64 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -716,12 +716,12 @@ func (db *DB) StartTime() (int64, error) { } // Querier implements the Storage interface. -func (db *DB) Querier(context.Context, int64, int64) (storage.Querier, error) { +func (db *DB) Querier(int64, int64) (storage.Querier, error) { return nil, ErrUnsupported } // ChunkQuerier implements the Storage interface. -func (db *DB) ChunkQuerier(context.Context, int64, int64) (storage.ChunkQuerier, error) { +func (db *DB) ChunkQuerier(int64, int64) (storage.ChunkQuerier, error) { return nil, ErrUnsupported } diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index 7eda6110c..fe9dead80 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -103,12 +103,12 @@ func TestUnsupportedFunctions(t *testing.T) { defer s.Close() t.Run("Querier", func(t *testing.T) { - _, err := s.Querier(context.TODO(), 0, 0) + _, err := s.Querier(0, 0) require.Equal(t, err, ErrUnsupported) }) t.Run("ChunkQuerier", func(t *testing.T) { - _, err := s.ChunkQuerier(context.TODO(), 0, 0) + _, err := s.ChunkQuerier(0, 0) require.Equal(t, err, ErrUnsupported) }) diff --git a/tsdb/block_test.go b/tsdb/block_test.go index d65d76d8f..d8a893510 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -198,7 +198,7 @@ func TestCorruptedChunk(t *testing.T) { querier, err := NewBlockQuerier(b, 0, 1) require.NoError(t, err) defer func() { require.NoError(t, querier.Close()) }() - set := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + set := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) // Check chunk errors during iter time. require.True(t, set.Next()) diff --git a/tsdb/db.go b/tsdb/db.go index 111f0cb21..684d4813e 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -526,22 +526,22 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue // Querier loads the blocks and wal and returns a new querier over the data partition for the given time range. // Current implementation doesn't support multiple Queriers. -func (db *DBReadOnly) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { +func (db *DBReadOnly) Querier(mint, maxt int64) (storage.Querier, error) { q, err := db.loadDataAsQueryable(maxt) if err != nil { return nil, err } - return q.Querier(ctx, mint, maxt) + return q.Querier(mint, maxt) } // ChunkQuerier loads blocks and the wal and returns a new chunk querier over the data partition for the given time range. // Current implementation doesn't support multiple ChunkQueriers. -func (db *DBReadOnly) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { +func (db *DBReadOnly) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { q, err := db.loadDataAsQueryable(maxt) if err != nil { return nil, err } - return q.ChunkQuerier(ctx, mint, maxt) + return q.ChunkQuerier(mint, maxt) } // Blocks returns a slice of block readers for persisted blocks. @@ -1841,7 +1841,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error { } // Querier returns a new querier over the data partition for the given time range. -func (db *DB) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) { +func (db *DB) Querier(mint, maxt int64) (storage.Querier, error) { var blocks []BlockReader db.mtx.RLock() @@ -1989,7 +1989,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) ([]storage.ChunkQuerie } // ChunkQuerier returns a new chunk querier over the data partition for the given time range. -func (db *DB) ChunkQuerier(_ context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { +func (db *DB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { blockQueriers, err := db.blockChunkQuerierForRange(mint, maxt) if err != nil { return nil, err diff --git a/tsdb/db_test.go b/tsdb/db_test.go index ffe8b7cc0..a89d0277f 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -88,7 +88,7 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) { // query runs a matcher query against the querier and fully expands its data. func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample { - ss := q.Select(false, nil, matchers...) + ss := q.Select(context.Background(), false, nil, matchers...) defer func() { require.NoError(t, q.Close()) }() @@ -150,7 +150,7 @@ func queryAndExpandChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*lab // queryChunks runs a matcher query against the querier and expands its data. func queryChunks(t testing.TB, q storage.ChunkQuerier, matchers ...*labels.Matcher) map[string][]chunks.Meta { - ss := q.Select(false, nil, matchers...) + ss := q.Select(context.Background(), false, nil, matchers...) defer func() { require.NoError(t, q.Close()) }() @@ -219,7 +219,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { _, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0) require.NoError(t, err) - querier, err := db.Querier(context.TODO(), 0, 1) + querier, err := db.Querier(0, 1) require.NoError(t, err) seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) require.Equal(t, map[string][]chunks.Sample{}, seriesSet) @@ -227,7 +227,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { err = app.Commit() require.NoError(t, err) - querier, err = db.Querier(context.TODO(), 0, 1) + querier, err = db.Querier(0, 1) require.NoError(t, err) defer querier.Close() @@ -285,7 +285,7 @@ func TestNoPanicAfterWALCorruption(t *testing.T) { }() require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal), "WAL corruption count mismatch") - querier, err := db.Querier(context.TODO(), 0, maxt) + querier, err := db.Querier(0, maxt) require.NoError(t, err) seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "", "")) // The last sample should be missing as it was after the WAL segment corruption. @@ -306,7 +306,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { err = app.Rollback() require.NoError(t, err) - querier, err := db.Querier(context.TODO(), 0, 1) + querier, err := db.Querier(0, 1) require.NoError(t, err) defer querier.Close() @@ -357,7 +357,7 @@ func TestDBAppenderAddRef(t *testing.T) { require.NoError(t, app2.Commit()) - q, err := db.Querier(context.TODO(), 0, 200) + q, err := db.Querier(0, 200) require.NoError(t, err) res := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) @@ -450,10 +450,10 @@ Outer: } // Compare the result. - q, err := db.Querier(context.TODO(), 0, numSamples) + q, err := db.Querier(0, numSamples) require.NoError(t, err) - res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) expSamples := make([]chunks.Sample, 0, len(c.remaint)) for _, ts := range c.remaint { @@ -610,7 +610,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { require.NoError(t, app.Commit()) // Make sure the right value is stored. - q, err := db.Querier(context.TODO(), 0, 10) + q, err := db.Querier(0, 10) require.NoError(t, err) ssMap := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) @@ -627,7 +627,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { require.NoError(t, err) require.NoError(t, app.Commit()) - q, err = db.Querier(context.TODO(), 0, 10) + q, err = db.Querier(0, 10) require.NoError(t, err) ssMap = query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) @@ -660,12 +660,12 @@ func TestDB_Snapshot(t *testing.T) { require.NoError(t, err) defer func() { require.NoError(t, db.Close()) }() - querier, err := db.Querier(context.TODO(), mint, mint+1000) + querier, err := db.Querier(mint, mint+1000) require.NoError(t, err) defer func() { require.NoError(t, querier.Close()) }() // sum values - seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + seriesSet := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) var series chunkenc.Iterator sum := 0.0 for seriesSet.Next() { @@ -709,12 +709,12 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { require.NoError(t, err) defer func() { require.NoError(t, db.Close()) }() - querier, err := db.Querier(context.TODO(), mint, mint+1000) + querier, err := db.Querier(mint, mint+1000) require.NoError(t, err) defer func() { require.NoError(t, querier.Close()) }() // Sum values. - seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + seriesSet := querier.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) var series chunkenc.Iterator sum := 0.0 for seriesSet.Next() { @@ -777,11 +777,11 @@ Outer: defer func() { require.NoError(t, newDB.Close()) }() // Compare the result. - q, err := newDB.Querier(context.TODO(), 0, numSamples) + q, err := newDB.Querier(0, numSamples) require.NoError(t, err) defer func() { require.NoError(t, q.Close()) }() - res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) expSamples := make([]chunks.Sample, 0, len(c.remaint)) for _, ts := range c.remaint { @@ -952,10 +952,10 @@ func TestDB_e2e(t *testing.T) { } } - q, err := db.Querier(context.TODO(), mint, maxt) + q, err := db.Querier(mint, maxt) require.NoError(t, err) - ss := q.Select(false, nil, qry.ms...) + ss := q.Select(ctx, false, nil, qry.ms...) result := map[string][]chunks.Sample{} for ss.Next() { @@ -997,7 +997,7 @@ func TestWALFlushedOnDBClose(t *testing.T) { require.NoError(t, err) defer func() { require.NoError(t, db.Close()) }() - q, err := db.Querier(context.TODO(), 0, 1) + q, err := db.Querier(0, 1) require.NoError(t, err) values, ws, err := q.LabelValues("labelname") @@ -1143,10 +1143,10 @@ func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBefore }) // Query back chunks for all series. - q, err := reopenDB.ChunkQuerier(context.Background(), math.MinInt64, math.MaxInt64) + q, err := reopenDB.ChunkQuerier(math.MinInt64, math.MaxInt64) require.NoError(t, err) - set := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+")) + set := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchRegexp, "series_id", ".+")) actualSeries := 0 var chunksIt chunks.Iterator @@ -1214,11 +1214,11 @@ func TestTombstoneClean(t *testing.T) { require.NoError(t, db.CleanTombstones()) // Compare the result. - q, err := db.Querier(context.TODO(), 0, numSamples) + q, err := db.Querier(0, numSamples) require.NoError(t, err) defer q.Close() - res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) expSamples := make([]chunks.Sample, 0, len(c.remaint)) for _, ts := range c.remaint { @@ -1716,12 +1716,12 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { series: labelpairs[:1], }} - q, err := db.Querier(context.TODO(), 0, 10) + q, err := db.Querier(0, 10) require.NoError(t, err) defer func() { require.NoError(t, q.Close()) }() for _, c := range cases { - ss := q.Select(false, nil, c.selector...) + ss := q.Select(ctx, false, nil, c.selector...) lres, _, ws, err := expandSeriesSet(ss) require.NoError(t, err) require.Equal(t, 0, len(ws)) @@ -1925,7 +1925,7 @@ func TestQuerierWithBoundaryChunks(t *testing.T) { require.GreaterOrEqual(t, len(db.blocks), 3, "invalid test, less than three blocks in DB") - q, err := db.Querier(context.TODO(), blockRange, 2*blockRange) + q, err := db.Querier(blockRange, 2*blockRange) require.NoError(t, err) defer q.Close() @@ -2232,7 +2232,7 @@ func TestDB_LabelNames(t *testing.T) { appendSamples(db, 5, 9, tst.sampleLabels2) // Testing DB (union). - q, err := db.Querier(context.TODO(), math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) var ws storage.Warnings labelNames, ws, err = q.LabelNames() @@ -2434,10 +2434,10 @@ func TestDBReadOnly(t *testing.T) { require.NoError(t, err) require.Greater(t, expDbSize, dbSizeBeforeAppend, "db size didn't increase after an append") - q, err := dbWritable.Querier(context.TODO(), math.MinInt64, math.MaxInt64) + q, err := dbWritable.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) expSeries = query(t, q, matchAll) - cq, err := dbWritable.ChunkQuerier(context.TODO(), math.MinInt64, math.MaxInt64) + cq, err := dbWritable.ChunkQuerier(math.MinInt64, math.MaxInt64) require.NoError(t, err) expChunks = queryAndExpandChunks(t, cq, matchAll) @@ -2476,7 +2476,7 @@ func TestDBReadOnly(t *testing.T) { }) t.Run("querier", func(t *testing.T) { // Open a read only db and ensure that the API returns the same result as the normal DB. - q, err := dbReadOnly.Querier(context.TODO(), math.MinInt64, math.MaxInt64) + q, err := dbReadOnly.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) readOnlySeries := query(t, q, matchAll) readOnlyDBHash := testutil.DirHash(t, dbDir) @@ -2486,7 +2486,7 @@ func TestDBReadOnly(t *testing.T) { require.Equal(t, expDBHash, readOnlyDBHash, "after all read operations the db hash should remain the same") }) t.Run("chunk querier", func(t *testing.T) { - cq, err := dbReadOnly.ChunkQuerier(context.TODO(), math.MinInt64, math.MaxInt64) + cq, err := dbReadOnly.ChunkQuerier(math.MinInt64, math.MaxInt64) require.NoError(t, err) readOnlySeries := queryAndExpandChunks(t, cq, matchAll) readOnlyDBHash := testutil.DirHash(t, dbDir) @@ -2507,7 +2507,7 @@ func TestDBReadOnlyClosing(t *testing.T) { require.Equal(t, db.Close(), ErrClosed) _, err = db.Blocks() require.Equal(t, err, ErrClosed) - _, err = db.Querier(context.TODO(), 0, 1) + _, err = db.Querier(0, 1) require.Equal(t, err, ErrClosed) } @@ -2554,12 +2554,12 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { require.NoError(t, err) require.Equal(t, len(blocks), 1) - querier, err := db.Querier(context.TODO(), 0, int64(maxt)-1) + querier, err := db.Querier(0, int64(maxt)-1) require.NoError(t, err) defer func() { require.NoError(t, querier.Close()) }() // Sum the values. - seriesSet := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush")) + seriesSet := querier.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush")) var series chunkenc.Iterator sum := 0.0 @@ -2624,11 +2624,11 @@ func TestDBCannotSeePartialCommits(t *testing.T) { inconsistencies := 0 for i := 0; i < 10; i++ { func() { - querier, err := db.Querier(context.Background(), 0, 1000000) + querier, err := db.Querier(0, 1000000) require.NoError(t, err) defer querier.Close() - ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss := querier.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) _, seriesSet, ws, err := expandSeriesSet(ss) require.NoError(t, err) require.Equal(t, 0, len(ws)) @@ -2658,7 +2658,7 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { require.NoError(t, err) defer db.Close() - querierBeforeAdd, err := db.Querier(context.Background(), 0, 1000000) + querierBeforeAdd, err := db.Querier(0, 1000000) require.NoError(t, err) defer querierBeforeAdd.Close() @@ -2667,18 +2667,18 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { _, err = app.Append(0, labels.FromStrings("foo", "bar"), 0, 0) require.NoError(t, err) - querierAfterAddButBeforeCommit, err := db.Querier(context.Background(), 0, 1000000) + querierAfterAddButBeforeCommit, err := db.Querier(0, 1000000) require.NoError(t, err) defer querierAfterAddButBeforeCommit.Close() // None of the queriers should return anything after the Add but before the commit. - ss := querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss := querierBeforeAdd.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) _, seriesSet, ws, err := expandSeriesSet(ss) require.NoError(t, err) require.Equal(t, 0, len(ws)) require.Equal(t, map[string][]sample{}, seriesSet) - ss = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss = querierAfterAddButBeforeCommit.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) _, seriesSet, ws, err = expandSeriesSet(ss) require.NoError(t, err) require.Equal(t, 0, len(ws)) @@ -2689,25 +2689,25 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { require.NoError(t, err) // Nothing returned for querier created before the Add. - ss = querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss = querierBeforeAdd.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) _, seriesSet, ws, err = expandSeriesSet(ss) require.NoError(t, err) require.Equal(t, 0, len(ws)) require.Equal(t, map[string][]sample{}, seriesSet) // Series exists but has no samples for querier created after Add. - ss = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss = querierAfterAddButBeforeCommit.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) _, seriesSet, ws, err = expandSeriesSet(ss) require.NoError(t, err) require.Equal(t, 0, len(ws)) require.Equal(t, map[string][]sample{`{foo="bar"}`: {}}, seriesSet) - querierAfterCommit, err := db.Querier(context.Background(), 0, 1000000) + querierAfterCommit, err := db.Querier(0, 1000000) require.NoError(t, err) defer querierAfterCommit.Close() // Samples are returned for querier created after Commit. - ss = querierAfterCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss = querierAfterCommit.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) _, seriesSet, ws, err = expandSeriesSet(ss) require.NoError(t, err) require.Equal(t, 0, len(ws)) @@ -3008,11 +3008,11 @@ func TestCompactHead(t *testing.T) { require.Equal(t, 1, len(db.Blocks())) require.Equal(t, int64(maxt), db.Head().MinTime()) defer func() { require.NoError(t, db.Close()) }() - querier, err := db.Querier(context.Background(), 0, int64(maxt)-1) + querier, err := db.Querier(0, int64(maxt)-1) require.NoError(t, err) defer func() { require.NoError(t, querier.Close()) }() - seriesSet := querier.Select(false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "a", Value: "b"}) + seriesSet := querier.Select(ctx, false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "a", Value: "b"}) var series chunkenc.Iterator var actSamples []sample @@ -3403,7 +3403,7 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t // At this point we expect 2 mmap-ed head chunks. // Get a querier and make sure it's closed only once the test is over. - querier, err := db.Querier(ctx, 0, math.MaxInt64) + querier, err := db.Querier(0, math.MaxInt64) require.NoError(t, err) defer func() { require.NoError(t, querier.Close()) @@ -3411,7 +3411,7 @@ func testQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChunks(t // Query back all series. hints := &storage.SelectHints{Start: 0, End: math.MaxInt64, Step: interval} - seriesSet := querier.Select(true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+")) + seriesSet := querier.Select(ctx, true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+")) // Fetch samples iterators from all series. var iterators []chunkenc.Iterator @@ -3539,7 +3539,7 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun // At this point we expect 2 mmap-ed head chunks. // Get a querier and make sure it's closed only once the test is over. - querier, err := db.ChunkQuerier(ctx, 0, math.MaxInt64) + querier, err := db.ChunkQuerier(0, math.MaxInt64) require.NoError(t, err) defer func() { require.NoError(t, querier.Close()) @@ -3547,7 +3547,7 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun // Query back all series. hints := &storage.SelectHints{Start: 0, End: math.MaxInt64, Step: interval} - seriesSet := querier.Select(true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+")) + seriesSet := querier.Select(ctx, true, hints, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".+")) // Iterate all series and get their chunks. var it chunks.Iterator @@ -4170,7 +4170,7 @@ func TestOOOCompaction(t *testing.T) { series2.String(): series2Samples, } - q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) @@ -4561,7 +4561,7 @@ func TestOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T) { series2.String(): series2Samples, } - q, err := db.Querier(context.Background(), fromMins*time.Minute.Milliseconds(), toMins*time.Minute.Milliseconds()) + q, err := db.Querier(fromMins*time.Minute.Milliseconds(), toMins*time.Minute.Milliseconds()) require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) @@ -4661,7 +4661,7 @@ func Test_Querier_OOOQuery(t *testing.T) { return expSamples[i].T() < expSamples[j].T() }) - querier, err := db.Querier(context.TODO(), tc.queryMinT, tc.queryMaxT) + querier, err := db.Querier(tc.queryMinT, tc.queryMaxT) require.NoError(t, err) defer querier.Close() @@ -4746,7 +4746,7 @@ func Test_ChunkQuerier_OOOQuery(t *testing.T) { return expSamples[i].T() < expSamples[j].T() }) - querier, err := db.ChunkQuerier(context.TODO(), tc.queryMinT, tc.queryMaxT) + querier, err := db.ChunkQuerier(tc.queryMinT, tc.queryMaxT) require.NoError(t, err) defer querier.Close() @@ -4807,7 +4807,7 @@ func TestOOOAppendAndQuery(t *testing.T) { } testQuery := func(from, to int64) { - querier, err := db.Querier(context.TODO(), from, to) + querier, err := db.Querier(from, to) require.NoError(t, err) seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")) @@ -4936,7 +4936,7 @@ func TestOOODisabled(t *testing.T) { addSample(s1, 59, 59, true) // Out of time window again. addSample(s1, 301, 310, false) // More in-order samples. - querier, err := db.Querier(context.TODO(), math.MinInt64, math.MaxInt64) + querier, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")) @@ -4988,7 +4988,7 @@ func TestWBLAndMmapReplay(t *testing.T) { } testQuery := func(exp map[string][]chunks.Sample) { - querier, err := db.Querier(context.TODO(), math.MinInt64, math.MaxInt64) + querier, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")) @@ -5376,7 +5376,7 @@ func TestWBLCorruption(t *testing.T) { series1.String(): expSamples, } - q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) @@ -5484,7 +5484,7 @@ func TestOOOMmapCorruption(t *testing.T) { series1.String(): expSamples, } - q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) @@ -5602,7 +5602,7 @@ func TestOutOfOrderRuntimeConfig(t *testing.T) { series1.String(): expSamples, } - q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) @@ -5805,7 +5805,7 @@ func TestNoGapAfterRestartWithOOO(t *testing.T) { series1.String(): expSamples, } - q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) @@ -5924,7 +5924,7 @@ func TestWblReplayAfterOOODisableAndRestart(t *testing.T) { series1.String(): expSamples, } - q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) @@ -6126,7 +6126,7 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { testQuery := func(name, value string, exp map[string][]chunks.Sample) { t.Helper() - q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, name, value)) require.Equal(t, exp, act) @@ -6362,8 +6362,6 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { require.NoError(t, db.Close()) }) - ctx := context.Background() - var it chunkenc.Iterator exp := make(map[string][]chunks.Sample) for _, series := range blockSeries { @@ -6399,7 +6397,7 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { require.NoError(t, db.reload()) require.Len(t, db.Blocks(), len(blockSeries)) - q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) res := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) compareSeries(t, exp, res) @@ -6416,7 +6414,7 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { require.NoError(t, db.reload()) require.Len(t, db.Blocks(), 1) - q, err = db.Querier(ctx, math.MinInt64, math.MaxInt64) + q, err = db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) res = query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) @@ -6543,7 +6541,7 @@ func TestNativeHistogramFlag(t *testing.T) { require.NoError(t, app.Commit()) - q, err := db.Querier(context.Background(), math.MinInt, math.MaxInt64) + q, err := db.Querier(math.MinInt, math.MaxInt64) require.NoError(t, err) act := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) require.Equal(t, map[string][]chunks.Sample{ @@ -6627,12 +6625,12 @@ func TestChunkQuerierReadWriteRace(t *testing.T) { } reader := func() { - querier, err := db.ChunkQuerier(context.Background(), math.MinInt64, math.MaxInt64) + querier, err := db.ChunkQuerier(math.MinInt64, math.MaxInt64) require.NoError(t, err) defer func(q storage.ChunkQuerier) { require.NoError(t, q.Close()) }(querier) - ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) for ss.Next() { cs := ss.At() it := cs.Iterator(nil) diff --git a/tsdb/example_test.go b/tsdb/example_test.go index da0e37923..46deae519 100644 --- a/tsdb/example_test.go +++ b/tsdb/example_test.go @@ -59,9 +59,9 @@ func Example() { // ... adding more samples. // Open a querier for reading. - querier, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + querier, err := db.Querier(math.MinInt64, math.MaxInt64) noErr(err) - ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) for ss.Next() { series := ss.At() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index b501f5f3f..62022be50 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1140,7 +1140,7 @@ func TestHeadDeleteSimple(t *testing.T) { for _, h := range []*Head{head, reloadedHead} { q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) require.NoError(t, err) - actSeriesSet := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) + actSeriesSet := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) require.NoError(t, q.Close()) expSeriesSet := newMockSeriesSet([]storage.Series{ storage.NewListSeries(lblsDefault, func() []chunks.Sample { @@ -1200,7 +1200,7 @@ func TestDeleteUntilCurMax(t *testing.T) { // Test the series returns no samples. The series is cleared only after compaction. q, err := NewBlockQuerier(hb, 0, 100000) require.NoError(t, err) - res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, res.Next(), "series is not present") s := res.At() it := s.Iterator(nil) @@ -1217,7 +1217,7 @@ func TestDeleteUntilCurMax(t *testing.T) { require.NoError(t, app.Commit()) q, err = NewBlockQuerier(hb, 0, 100000) require.NoError(t, err) - res = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res = q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, res.Next(), "series don't exist") exps := res.At() it = exps.Iterator(nil) @@ -1389,7 +1389,7 @@ func TestDelete_e2e(t *testing.T) { q, err := NewBlockQuerier(hb, 0, 100000) require.NoError(t, err) defer q.Close() - ss := q.Select(true, nil, del.ms...) + ss := q.Select(context.Background(), true, nil, del.ms...) // Build the mockSeriesSet. matchedSeries := make([]storage.Series, 0, len(matched)) for _, m := range matched { @@ -1835,7 +1835,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { require.NoError(t, err) defer q.Close() - ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) + ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) require.Equal(t, true, ss.Next()) for ss.Next() { } @@ -1864,7 +1864,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { q, err := NewBlockQuerier(h, 1500, 2500) require.NoError(t, err) - ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) + ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) require.Equal(t, false, ss.Next()) require.Equal(t, 0, len(ss.Warnings())) require.NoError(t, q.Close()) @@ -2149,7 +2149,7 @@ func TestMemSeriesIsolation(t *testing.T) { require.NoError(t, err) defer querier.Close() - ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) _, seriesSet, ws, err := expandSeriesSet(ss) require.NoError(t, err) require.Equal(t, 0, len(ws)) @@ -2521,7 +2521,7 @@ func testHeadSeriesChunkRace(t *testing.T) { h.gc() wg.Done() }() - ss := q.Select(false, nil, matcher) + ss := q.Select(context.Background(), false, nil, matcher) for ss.Next() { } require.NoError(t, ss.Err()) @@ -2926,11 +2926,11 @@ func TestChunkNotFoundHeadGCRace(t *testing.T) { require.NoError(t, app.Commit()) // Get a querier before compaction (or when compaction is about to begin). - q, err := db.Querier(context.Background(), mint, maxt) + q, err := db.Querier(mint, maxt) require.NoError(t, err) // Query the compacted range and get the first series before compaction. - ss := q.Select(true, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + ss := q.Select(context.Background(), true, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, ss.Next()) s := ss.At() @@ -2993,7 +2993,7 @@ func TestDataMissingOnQueryDuringCompaction(t *testing.T) { require.NoError(t, app.Commit()) // Get a querier before compaction (or when compaction is about to begin). - q, err := db.Querier(context.Background(), mint, maxt) + q, err := db.Querier(mint, maxt) require.NoError(t, err) var wg sync.WaitGroup @@ -3107,11 +3107,11 @@ func TestWaitForPendingReadersInTimeRange(t *testing.T) { require.True(t, waitOver.Load()) } - q, err := db.Querier(context.Background(), c.mint, c.maxt) + q, err := db.Querier(c.mint, c.maxt) require.NoError(t, err) checkWaiting(q) - cq, err := db.ChunkQuerier(context.Background(), c.mint, c.maxt) + cq, err := db.ChunkQuerier(c.mint, c.maxt) require.NoError(t, err) checkWaiting(cq) }) @@ -3191,7 +3191,7 @@ func TestAppendHistogram(t *testing.T) { require.NoError(t, q.Close()) }) - ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, ss.Next()) s := ss.At() @@ -3844,7 +3844,7 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) { require.NoError(t, q.Close()) }) - ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) require.True(t, ss.Next()) s := ss.At() @@ -4236,7 +4236,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { } // Query back and expect same order of samples. - q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) diff --git a/tsdb/querier.go b/tsdb/querier.go index ae09f4772..673bb16a6 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -14,6 +14,7 @@ package tsdb import ( + "context" "fmt" "math" "strings" @@ -125,7 +126,7 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) { return &blockQuerier{blockBaseQuerier: q}, nil } -func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { +func (q *blockQuerier) Select(_ context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { mint := q.mint maxt := q.maxt disableTrimming := false @@ -165,7 +166,7 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier return &blockChunkQuerier{blockBaseQuerier: q}, nil } -func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet { +func (q *blockChunkQuerier) Select(_ context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet { mint := q.mint maxt := q.maxt disableTrimming := false diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 1657061fd..c6fa43bc2 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -258,7 +258,7 @@ func BenchmarkQuerierSelect(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - ss := q.Select(sorted, nil, matcher) + ss := q.Select(context.Background(), sorted, nil, matcher) for ss.Next() { // nolint:revive } require.NoError(b, ss.Err()) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 2af0fd934..afb7e6815 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -182,7 +182,7 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C }, } - res := q.Select(false, c.hints, c.ms...) + res := q.Select(context.Background(), false, c.hints, c.ms...) defer func() { require.NoError(t, q.Close()) }() for { @@ -217,7 +217,7 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C maxt: c.maxt, }, } - res := q.Select(false, c.hints, c.ms...) + res := q.Select(context.Background(), false, c.hints, c.ms...) defer func() { require.NoError(t, q.Close()) }() for { @@ -1689,7 +1689,7 @@ func BenchmarkQuerySeek(b *testing.B) { b.ReportAllocs() var it chunkenc.Iterator - ss := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + ss := sq.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) for ss.Next() { it = ss.At().Iterator(it) for t := mint; t <= maxt; t++ { @@ -1822,7 +1822,7 @@ func BenchmarkSetMatcher(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - ss := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern)) + ss := sq.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern)) for ss.Next() { } require.NoError(b, ss.Err()) @@ -2234,7 +2234,7 @@ func TestQuerierIndexQueriesRace(t *testing.T) { t.Cleanup(cancel) for i := 0; i < testRepeats; i++ { - q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err) values, _, err := q.LabelValues("seq", c.matchers...) @@ -2275,7 +2275,7 @@ func TestClose(t *testing.T) { require.NoError(t, db.Close()) }() - q, err := db.Querier(context.TODO(), 0, 20) + q, err := db.Querier(0, 20) require.NoError(t, err) require.NoError(t, q.Close()) require.Error(t, q.Close()) @@ -2408,7 +2408,7 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - ss := q.Select(false, nil, selectors...) + ss := q.Select(context.Background(), false, nil, selectors...) var actualExpansions int var it chunkenc.Iterator for ss.Next() { @@ -2628,7 +2628,7 @@ func BenchmarkHeadChunkQuerier(b *testing.B) { } require.NoError(b, app.Commit()) - querier, err := db.ChunkQuerier(context.Background(), math.MinInt64, math.MaxInt64) + querier, err := db.ChunkQuerier(math.MinInt64, math.MaxInt64) require.NoError(b, err) defer func(q storage.ChunkQuerier) { require.NoError(b, q.Close()) @@ -2636,7 +2636,7 @@ func BenchmarkHeadChunkQuerier(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) total := 0 for ss.Next() { cs := ss.At() @@ -2673,7 +2673,7 @@ func BenchmarkHeadQuerier(b *testing.B) { } require.NoError(b, app.Commit()) - querier, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) + querier, err := db.Querier(math.MinInt64, math.MaxInt64) require.NoError(b, err) defer func(q storage.Querier) { require.NoError(b, q.Close()) @@ -2681,7 +2681,7 @@ func BenchmarkHeadQuerier(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) total := int64(0) for ss.Next() { cs := ss.At() @@ -2746,10 +2746,10 @@ func TestQueryWithDeletedHistograms(t *testing.T) { err = db.Delete(80, 100, matcher) require.NoError(t, err) - chunkQuerier, err := db.ChunkQuerier(context.Background(), 0, 100) + chunkQuerier, err := db.ChunkQuerier(0, 100) require.NoError(t, err) - css := chunkQuerier.Select(false, nil, matcher) + css := chunkQuerier.Select(context.Background(), false, nil, matcher) seriesCount := 0 for css.Next() { diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 227027e46..2aee55e10 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -659,7 +659,7 @@ func (api *API) labelNames(r *http.Request) apiFuncResult { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } - q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + q, err := api.Queryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return apiFuncResult{nil, returnAPIError(err), nil, nil} } @@ -725,7 +725,7 @@ func (api *API) labelValues(r *http.Request) (result apiFuncResult) { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } - q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + q, err := api.Queryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} } @@ -793,6 +793,8 @@ var ( ) func (api *API) series(r *http.Request) (result apiFuncResult) { + ctx := r.Context() + if err := r.ParseForm(); err != nil { return apiFuncResult{nil, &apiError{errorBadData, errors.Wrapf(err, "error parsing form values")}, nil, nil} } @@ -814,7 +816,7 @@ func (api *API) series(r *http.Request) (result apiFuncResult) { return invalidParamError(err, "match[]") } - q, err := api.Queryable.Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) + q, err := api.Queryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return apiFuncResult{nil, returnAPIError(err), nil, nil} } @@ -841,13 +843,13 @@ func (api *API) series(r *http.Request) (result apiFuncResult) { var sets []storage.SeriesSet for _, mset := range matcherSets { // We need to sort this select results to merge (deduplicate) the series sets later. - s := q.Select(true, hints, mset...) + s := q.Select(ctx, true, hints, mset...) sets = append(sets, s) } set = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) } else { // At this point at least one match exists. - set = q.Select(false, hints, matcherSets[0]...) + set = q.Select(ctx, false, hints, matcherSets[0]...) } metrics := []labels.Labels{} diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index c4710c69f..33e524e8c 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -993,14 +993,14 @@ func setupRemote(s storage.Storage) *httptest.Server { } } - querier, err := s.Querier(r.Context(), query.StartTimestampMs, query.EndTimestampMs) + querier, err := s.Querier(query.StartTimestampMs, query.EndTimestampMs) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } defer querier.Close() - set := querier.Select(false, hints, matchers...) + set := querier.Select(r.Context(), false, hints, matchers...) resp.Results[i], _, err = remote.ToQueryResult(set, 1e6) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index 8d194a058..6bafc4caa 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -154,11 +154,11 @@ func (t errorTestQueryable) ExemplarQuerier(ctx context.Context) (storage.Exempl return nil, t.err } -func (t errorTestQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { +func (t errorTestQueryable) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { return nil, t.err } -func (t errorTestQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { +func (t errorTestQueryable) Querier(mint, maxt int64) (storage.Querier, error) { if t.q != nil { return t.q, nil } @@ -182,7 +182,7 @@ func (t errorTestQuerier) Close() error { return nil } -func (t errorTestQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { +func (t errorTestQuerier) Select(_ context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { if t.s != nil { return t.s } diff --git a/web/federate.go b/web/federate.go index 1c50faed0..fde1942bb 100644 --- a/web/federate.go +++ b/web/federate.go @@ -57,6 +57,8 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { h.mtx.RLock() defer h.mtx.RUnlock() + ctx := req.Context() + if err := req.ParseForm(); err != nil { http.Error(w, fmt.Sprintf("error parsing form values: %v", err), http.StatusBadRequest) return @@ -80,7 +82,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { ) w.Header().Set("Content-Type", string(format)) - q, err := h.localStorage.Querier(req.Context(), mint, maxt) + q, err := h.localStorage.Querier(mint, maxt) if err != nil { federationErrors.Inc() if errors.Cause(err) == tsdb.ErrNotReady { @@ -98,7 +100,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { var sets []storage.SeriesSet for _, mset := range matcherSets { - s := q.Select(true, hints, mset...) + s := q.Select(ctx, true, hints, mset...) sets = append(sets, s) } diff --git a/web/federate_test.go b/web/federate_test.go index 30db0d640..ab93dcf28 100644 --- a/web/federate_test.go +++ b/web/federate_test.go @@ -237,7 +237,7 @@ type notReadyReadStorage struct { LocalStorage } -func (notReadyReadStorage) Querier(context.Context, int64, int64) (storage.Querier, error) { +func (notReadyReadStorage) Querier(int64, int64) (storage.Querier, error) { return nil, errors.Wrap(tsdb.ErrNotReady, "wrap") }