diff --git a/promql/engine.go b/promql/engine.go index ba52501b4..bf55e6032 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -655,7 +655,7 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error { var set storage.SeriesSet var wrn storage.Warnings - params := &storage.SelectParams{ + hints := &storage.SelectHints{ Start: timestamp.FromTime(s.Start), End: timestamp.FromTime(s.End), Step: durationToInt64Millis(s.Interval), @@ -667,29 +667,29 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s // from end also. subqOffset := ng.cumulativeSubqueryOffset(path) offsetMilliseconds := durationMilliseconds(subqOffset) - params.Start = params.Start - offsetMilliseconds + hints.Start = hints.Start - offsetMilliseconds switch n := node.(type) { case *parser.VectorSelector: if evalRange == 0 { - params.Start = params.Start - durationMilliseconds(ng.lookbackDelta) + hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta) } else { - params.Range = durationMilliseconds(evalRange) + hints.Range = durationMilliseconds(evalRange) // For all matrix queries we want to ensure that we have (end-start) + range selected // this way we have `range` data before the start time - params.Start = params.Start - durationMilliseconds(evalRange) + hints.Start = hints.Start - durationMilliseconds(evalRange) evalRange = 0 } - params.Func = extractFuncFromPath(path) - params.By, params.Grouping = extractGroupsFromPath(path) + hints.Func = extractFuncFromPath(path) + hints.By, hints.Grouping = extractGroupsFromPath(path) if n.Offset > 0 { offsetMilliseconds := durationMilliseconds(n.Offset) - params.Start = params.Start - offsetMilliseconds - params.End = params.End - offsetMilliseconds + hints.Start = hints.Start - offsetMilliseconds + hints.End = hints.End - offsetMilliseconds } - set, wrn, err = querier.Select(params, n.LabelMatchers...) + set, wrn, err = querier.Select(false, hints, n.LabelMatchers...) warnings = append(warnings, wrn...) if err != nil { level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) diff --git a/promql/engine_test.go b/promql/engine_test.go index e75c27bca..96204b392 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -175,15 +175,12 @@ type errQuerier struct { err error } -func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { return errSeriesSet{err: q.err}, nil, q.err } -func (q *errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return errSeriesSet{err: q.err}, nil, q.err -} -func (*errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return nil, nil, nil } -func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } -func (*errQuerier) Close() error { return nil } +func (*errQuerier) LabelValues(string) ([]string, storage.Warnings, error) { return nil, nil, nil } +func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } +func (*errQuerier) Close() error { return nil } // errSeriesSet implements storage.SeriesSet which always returns error. type errSeriesSet struct { @@ -224,9 +221,9 @@ func TestQueryError(t *testing.T) { testutil.Equals(t, errStorage, res.Err) } -// paramCheckerQuerier implements storage.Querier which checks the start and end times -// in params. -type paramCheckerQuerier struct { +// hintCheckerQuerier implements storage.Querier which checks the start and end times +// in hints. +type hintCheckerQuerier struct { start int64 end int64 grouping []string @@ -237,10 +234,7 @@ type paramCheckerQuerier struct { t *testing.T } -func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, m ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return q.SelectSorted(sp, m...) -} -func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *hintCheckerQuerier) Select(_ bool, sp *storage.SelectHints, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { testutil.Equals(q.t, q.start, sp.Start) testutil.Equals(q.t, q.end, sp.End) testutil.Equals(q.t, q.grouping, sp.Grouping) @@ -250,11 +244,11 @@ func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*label return errSeriesSet{err: nil}, nil, nil } -func (*paramCheckerQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { +func (*hintCheckerQuerier) LabelValues(string) ([]string, storage.Warnings, error) { return nil, nil, nil } -func (*paramCheckerQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } -func (*paramCheckerQuerier) Close() error { return nil } +func (*hintCheckerQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } +func (*hintCheckerQuerier) Close() error { return nil } func TestParamsSetCorrectly(t *testing.T) { opts := EngineOpts{ @@ -435,7 +429,7 @@ func TestParamsSetCorrectly(t *testing.T) { for _, tc := range cases { engine := NewEngine(opts) queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return ¶mCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, grouping: tc.paramGrouping, by: tc.paramBy, selRange: tc.paramRange, function: tc.paramFunc, t: t}, nil + return &hintCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, grouping: tc.paramGrouping, by: tc.paramBy, selRange: tc.paramRange, function: tc.paramFunc, t: t}, nil }) var ( diff --git a/promql/test_test.go b/promql/test_test.go index f03f6d203..7de4387f5 100644 --- a/promql/test_test.go +++ b/promql/test_test.go @@ -133,7 +133,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) { } // Get the series for the matcher. - ss, _, err := querier.Select(nil, matchers...) + ss, _, err := querier.Select(false, nil, matchers...) testutil.Ok(t, err) testutil.Assert(t, ss.Next(), "") storageSeries := ss.At() diff --git a/rules/manager.go b/rules/manager.go index 425fb01a8..383d38327 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -707,7 +707,7 @@ func (g *Group) RestoreForState(ts time.Time) { matchers = append(matchers, mt) } - sset, err, _ := q.Select(nil, matchers...) + sset, err, _ := q.Select(false, nil, matchers...) if err != nil { level.Error(g.logger).Log("msg", "Failed to restore 'for' state", labels.AlertName, alertRule.Name(), "stage", "Select", "err", err) diff --git a/rules/manager_test.go b/rules/manager_test.go index ac74f9ea0..4d26885c8 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -512,8 +512,8 @@ func TestForStateRestore(t *testing.T) { } func TestStaleness(t *testing.T) { - storage := teststorage.New(t) - defer storage.Close() + st := teststorage.New(t) + defer st.Close() engineOpts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -522,9 +522,9 @@ func TestStaleness(t *testing.T) { } engine := promql.NewEngine(engineOpts) opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(engine, storage), - Appendable: storage, - TSDB: storage, + QueryFunc: EngineQueryFunc(engine, st), + Appendable: st, + TSDB: st, Context: context.Background(), Logger: log.NewNopLogger(), } @@ -541,7 +541,7 @@ func TestStaleness(t *testing.T) { }) // A time series that has two samples and then goes stale. - app := storage.Appender() + app := st.Appender() app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN)) @@ -556,14 +556,14 @@ func TestStaleness(t *testing.T) { group.Eval(ctx, time.Unix(1, 0)) group.Eval(ctx, time.Unix(2, 0)) - querier, err := storage.Querier(context.Background(), 0, 2000) + querier, err := st.Querier(context.Background(), 0, 2000) testutil.Ok(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") testutil.Ok(t, err) - set, _, err := querier.Select(nil, matcher) + set, _, err := querier.Select(false, nil, matcher) testutil.Ok(t, err) samples, err := readSeriesSet(set) @@ -658,8 +658,8 @@ func TestCopyState(t *testing.T) { } func TestDeletedRuleMarkedStale(t *testing.T) { - storage := teststorage.New(t) - defer storage.Close() + st := teststorage.New(t) + defer st.Close() oldGroup := &Group{ rules: []Rule{ NewRecordingRule("rule1", nil, labels.Labels{{Name: "l1", Value: "v1"}}), @@ -672,21 +672,21 @@ func TestDeletedRuleMarkedStale(t *testing.T) { rules: []Rule{}, seriesInPreviousEval: []map[string]labels.Labels{}, opts: &ManagerOptions{ - Appendable: storage, + Appendable: st, }, } newGroup.CopyState(oldGroup) newGroup.Eval(context.Background(), time.Unix(0, 0)) - querier, err := storage.Querier(context.Background(), 0, 2000) + querier, err := st.Querier(context.Background(), 0, 2000) testutil.Ok(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, "l1", "v1") testutil.Ok(t, err) - set, _, err := querier.Select(nil, matcher) + set, _, err := querier.Select(false, nil, matcher) testutil.Ok(t, err) samples, err := readSeriesSet(set) @@ -704,8 +704,8 @@ func TestUpdate(t *testing.T) { expected := map[string]labels.Labels{ "test": labels.FromStrings("name", "value"), } - storage := teststorage.New(t) - defer storage.Close() + st := teststorage.New(t) + defer st.Close() opts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -714,9 +714,9 @@ func TestUpdate(t *testing.T) { } engine := promql.NewEngine(opts) ruleManager := NewManager(&ManagerOptions{ - Appendable: storage, - TSDB: storage, - QueryFunc: EngineQueryFunc(engine, storage), + Appendable: st, + TSDB: st, + QueryFunc: EngineQueryFunc(engine, st), Context: context.Background(), Logger: log.NewNopLogger(), }) @@ -1096,16 +1096,16 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) { testutil.Equals(t, 0, countStaleNaN(t, storage), "invalid count of staleness markers after stopping the engine") } -func countStaleNaN(t *testing.T, storage storage.Storage) int { +func countStaleNaN(t *testing.T, st storage.Storage) int { var c int - querier, err := storage.Querier(context.Background(), 0, time.Now().Unix()*1000) + querier, err := st.Querier(context.Background(), 0, time.Now().Unix()*1000) testutil.Ok(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2") testutil.Ok(t, err) - set, _, err := querier.Select(nil, matcher) + set, _, err := querier.Select(false, nil, matcher) testutil.Ok(t, err) samples, err := readSeriesSet(set) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index da4f6261d..1d3864bb9 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1574,7 +1574,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) testutil.Ok(t, err) - series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + series, _, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) testutil.Ok(t, err) testutil.Equals(t, false, series.Next(), "series found in tsdb") @@ -1584,7 +1584,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0) testutil.Ok(t, err) - series, _, err = q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "le", "500")) + series, _, err = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500")) testutil.Ok(t, err) testutil.Equals(t, true, series.Next(), "series not found in tsdb") testutil.Equals(t, false, series.Next(), "more than one series found in tsdb") @@ -1620,7 +1620,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) testutil.Ok(t, err) - series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + series, _, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) testutil.Ok(t, err) testutil.Equals(t, false, series.Next(), "series found in tsdb") } diff --git a/storage/fanout.go b/storage/fanout.go index f55f4c8dc..2366fb272 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -221,20 +221,16 @@ func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier { } // Select returns a set of series that matches the given label matchers. -func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { - if len(q.queriers) != 1 { - // We need to sort for NewMergeSeriesSet to work. - return q.SelectSorted(params, matchers...) +func (q *mergeQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { + if len(q.queriers) == 1 { + return q.queriers[0].Select(sortSeries, hints, matchers...) } - return q.queriers[0].Select(params, matchers...) -} -// SelectSorted returns a set of sorted series that matches the given label matchers. -func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { - seriesSets := make([]SeriesSet, 0, len(q.queriers)) - var warnings Warnings - - var priErr error = nil + var ( + seriesSets = make([]SeriesSet, 0, len(q.queriers)) + warnings Warnings + priErr error + ) type queryResult struct { qr Querier set SeriesSet @@ -242,9 +238,11 @@ func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Ma selectError error } queryResultChan := make(chan *queryResult) + for _, querier := range q.queriers { go func(qr Querier) { - set, wrn, err := qr.SelectSorted(params, matchers...) + // We need to sort for NewMergeSeriesSet to work. + set, wrn, err := qr.Select(true, hints, matchers...) queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err} }(querier) } diff --git a/storage/fanout/fanout_test.go b/storage/fanout/fanout_test.go index 7afb04202..d62b6f5e1 100644 --- a/storage/fanout/fanout_test.go +++ b/storage/fanout/fanout_test.go @@ -79,7 +79,7 @@ func TestSelectSorted(t *testing.T) { matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") testutil.Ok(t, err) - seriesSet, _, err := querier.SelectSorted(nil, matcher) + seriesSet, _, err := querier.Select(true, nil, matcher) testutil.Ok(t, err) result := make(map[int64]float64) @@ -131,7 +131,7 @@ func TestFanoutErrors(t *testing.T) { defer querier.Close() matcher := labels.MustNewMatcher(labels.MatchEqual, "a", "b") - ss, warnings, err := querier.SelectSorted(nil, matcher) + ss, warnings, err := querier.Select(true, nil, matcher) testutil.Equals(t, tc.err, err) testutil.Equals(t, tc.warnings, warnings) @@ -169,11 +169,7 @@ func (errStorage) Close() error { type errQuerier struct{} -func (errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return nil, nil, errSelect -} - -func (errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { return nil, nil, errSelect } diff --git a/storage/interface.go b/storage/interface.go index e85a159e4..4ac799ebb 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -60,10 +60,9 @@ type Queryable interface { // time range. type Querier interface { // Select returns a set of series that matches the given label matchers. - Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) - - // SelectSorted returns a sorted set of series that matches the given label matchers. - SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) + // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance. + // It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all. + Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) // LabelValues returns all potential values for a label name. // It is not safe to use the strings beyond the lifefime of the querier. @@ -76,8 +75,9 @@ type Querier interface { Close() error } -// SelectParams specifies parameters passed to data selections. -type SelectParams struct { +// SelectHints specifies hints passed for data selections. +// This is used only as an option for implementation to use. +type SelectHints struct { Start int64 // Start time in milliseconds for this select. End int64 // End time in milliseconds for this select. diff --git a/storage/noop.go b/storage/noop.go index a8be634fd..4c0383233 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -24,11 +24,7 @@ func NoopQuerier() Querier { return noopQuerier{} } -func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) { - return NoopSeriesSet(), nil, nil -} - -func (noopQuerier) SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) { +func (noopQuerier) Select(bool, *SelectHints, ...*labels.Matcher) (SeriesSet, Warnings, error) { return NoopSeriesSet(), nil, nil } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 8f07eb0d1..3b6556a12 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -79,22 +79,22 @@ func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error } // ToQuery builds a Query proto. -func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams) (*prompb.Query, error) { +func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHints) (*prompb.Query, error) { ms, err := toLabelMatchers(matchers) if err != nil { return nil, err } var rp *prompb.ReadHints - if p != nil { + if hints != nil { rp = &prompb.ReadHints{ - StepMs: p.Step, - Func: p.Func, - StartMs: p.Start, - EndMs: p.End, - Grouping: p.Grouping, - By: p.By, - RangeMs: p.Range, + StartMs: hints.Start, + EndMs: hints.End, + StepMs: hints.Step, + Func: hints.Func, + Grouping: hints.Grouping, + By: hints.By, + RangeMs: hints.Range, } } @@ -145,7 +145,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, } // FromQueryResult unpacks and sorts a QueryResult proto. -func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet { +func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet { series := make([]storage.Series, 0, len(res.Timeseries)) for _, ts := range res.Timeseries { labels := labelProtosToLabels(ts.Labels) @@ -158,7 +158,10 @@ func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet { samples: ts.Samples, }) } - sort.Sort(byLabel(series)) + + if sortSeries { + sort.Sort(byLabel(series)) + } return &concreteSeriesSet{ series: series, } diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 9f7b2c218..d694b487c 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -178,7 +178,7 @@ func TestFromQueryResultWithDuplicates(t *testing.T) { }, } - series := FromQueryResult(&res) + series := FromQueryResult(false, &res) errSeries, isErrSeriesSet := series.(errSeriesSet) diff --git a/storage/remote/read.go b/storage/remote/read.go index 3e5c9573c..b03bb8869 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -57,16 +57,9 @@ type querier struct { client *Client } -// Select implements storage.Querier and uses the given matchers to read series -// sets from the Client. -func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return q.SelectSorted(p, matchers...) -} - -// SelectSorted implements storage.Querier and uses the given matchers to read series -// sets from the Client. -func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - query, err := ToQuery(q.mint, q.maxt, matchers, p) +// Select implements storage.Querier and uses the given matchers to read series sets from the Client. +func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + query, err := ToQuery(q.mint, q.maxt, matchers, hints) if err != nil { return nil, nil, err } @@ -80,12 +73,11 @@ func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matc return nil, nil, fmt.Errorf("remote_read: %v", err) } - // FromQueryResult sorts. - return FromQueryResult(res), nil, nil + return FromQueryResult(sortSeries, res), nil, nil } // LabelValues implements storage.Querier and is a noop. -func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) { +func (q *querier) LabelValues(string) ([]string, storage.Warnings, error) { // TODO implement? return nil, nil, nil } @@ -124,9 +116,9 @@ type externalLabelsQuerier struct { // Select adds equality matchers for all external labels to the list of matchers // before calling the wrapped storage.Queryable. The added external labels are // removed from the returned series sets. -func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q externalLabelsQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { m, added := q.addExternalLabels(matchers) - s, warnings, err := q.Querier.Select(p, m...) + s, warnings, err := q.Querier.Select(sortSeries, hints, m...) if err != nil { return nil, warnings, err } @@ -177,7 +169,7 @@ type requiredMatchersQuerier struct { // Select returns a NoopSeriesSet if the given matchers don't match the label // set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. -func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q requiredMatchersQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { ms := q.requiredMatchers for _, m := range matchers { for i, r := range ms { @@ -193,7 +185,7 @@ func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*la if len(ms) > 0 { return storage.NoopSeriesSet(), nil, nil } - return q.Querier.Select(p, matchers...) + return q.Querier.Select(sortSeries, hints, matchers...) } // addExternalLabels adds matchers for each external label. External labels diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 28dfa79fb..0706d7bff 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -117,7 +117,7 @@ func TestExternalLabelsQuerierSelect(t *testing.T) { }, } want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) - have, _, err := q.Select(nil, matchers...) + have, _, err := q.Select(false, nil, matchers...) if err != nil { t.Error(err) } @@ -219,7 +219,7 @@ func TestSeriesSetFilter(t *testing.T) { } for i, tc := range tests { - filtered := newSeriesSetFilter(FromQueryResult(tc.in), tc.toRemove) + filtered := newSeriesSetFilter(FromQueryResult(true, tc.in), tc.toRemove) have, err := ToQueryResult(filtered, 1e6) if err != nil { t.Fatal(err) @@ -242,7 +242,7 @@ type mockSeriesSet struct { storage.SeriesSet } -func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (mockQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { return mockSeriesSet{}, nil, nil } @@ -398,7 +398,7 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) { requiredMatchers: test.requiredMatchers, } - have, _, err := q.Select(nil, test.matchers...) + have, _, err := q.Select(false, nil, test.matchers...) if err != nil { t.Error(err) } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index d1ef6c524..f812d2970 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -203,7 +203,7 @@ func TestCorruptedChunk(t *testing.T) { querier, err := NewBlockQuerier(b, 0, 1) testutil.Ok(t, err) defer func() { testutil.Ok(t, querier.Close()) }() - set, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + set, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) diff --git a/tsdb/cmd/tsdb/main.go b/tsdb/cmd/tsdb/main.go index 458ed5351..3b2ebf296 100644 --- a/tsdb/cmd/tsdb/main.go +++ b/tsdb/cmd/tsdb/main.go @@ -617,7 +617,7 @@ func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) (err error) { err = merr.Err() }() - ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) + ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) if err != nil { return err } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index e824b0c40..545473f82 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -67,7 +67,7 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB, close func() // query runs a matcher query against the querier and fully expands its data. func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]tsdbutil.Sample { - ss, ws, err := q.Select(nil, matchers...) + ss, ws, err := q.Select(false, nil, matchers...) defer func() { testutil.Ok(t, q.Close()) }() @@ -315,7 +315,7 @@ Outer: q, err := db.Querier(context.TODO(), 0, numSamples) testutil.Ok(t, err) - res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -491,7 +491,7 @@ func TestDB_Snapshot(t *testing.T) { defer func() { testutil.Ok(t, querier.Close()) }() // sum values - seriesSet, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -546,7 +546,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { defer func() { testutil.Ok(t, querier.Close()) }() // Sum values. - seriesSet, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -618,7 +618,7 @@ Outer: testutil.Ok(t, err) defer func() { testutil.Ok(t, q.Close()) }() - res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -792,7 +792,7 @@ func TestDB_e2e(t *testing.T) { q, err := db.Querier(context.TODO(), mint, maxt) testutil.Ok(t, err) - ss, ws, err := q.Select(nil, qry.ms...) + ss, ws, err := q.Select(false, nil, qry.ms...) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -951,7 +951,7 @@ func TestTombstoneClean(t *testing.T) { testutil.Ok(t, err) defer q.Close() - res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -1289,7 +1289,7 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { defer func() { testutil.Ok(t, q.Close()) }() for _, c := range cases { - ss, ws, err := q.Select(nil, c.selector...) + ss, ws, err := q.Select(false, nil, c.selector...) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -2486,7 +2486,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { defer func() { testutil.Ok(t, querier.Close()) }() // Sum the values. - seriesSet, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush")) + seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -2551,7 +2551,7 @@ func TestDBCannotSeePartialCommits(t *testing.T) { testutil.Ok(t, err) defer querier.Close() - ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err := expandSeriesSet(ss) @@ -2591,13 +2591,13 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { defer querierAfterAddButBeforeCommit.Close() // None of the queriers should return anything after the Add but before the commit. - ss, _, err := querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err := querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err := expandSeriesSet(ss) testutil.Ok(t, err) testutil.Equals(t, map[string][]sample{}, seriesSet) - ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err = expandSeriesSet(ss) testutil.Ok(t, err) @@ -2608,14 +2608,14 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { testutil.Ok(t, err) // Nothing returned for querier created before the Add. - ss, _, err = querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err = querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err = expandSeriesSet(ss) testutil.Ok(t, err) testutil.Equals(t, map[string][]sample{}, seriesSet) // Series exists but has no samples for querier created after Add. - ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err = expandSeriesSet(ss) testutil.Ok(t, err) @@ -2626,7 +2626,7 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { defer querierAfterCommit.Close() // Samples are returned for querier created after Commit. - ss, _, err = querierAfterCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err = querierAfterCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err = expandSeriesSet(ss) testutil.Ok(t, err) diff --git a/tsdb/head_test.go b/tsdb/head_test.go index f7aa9975c..57e5c3a92 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -566,7 +566,7 @@ func TestHeadDeleteSimple(t *testing.T) { for _, h := range []*Head{head, reloadedHead} { q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) testutil.Ok(t, err) - actSeriesSet, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) + actSeriesSet, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -613,7 +613,7 @@ func TestDeleteUntilCurMax(t *testing.T) { // Test the series returns no samples. The series is cleared only after compaction. q, err := NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) - res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) testutil.Assert(t, res.Next(), "series is not present") @@ -628,7 +628,7 @@ func TestDeleteUntilCurMax(t *testing.T) { testutil.Ok(t, app.Commit()) q, err = NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) - res, ws, err = q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res, ws, err = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) testutil.Assert(t, res.Next(), "series don't exist") @@ -804,7 +804,7 @@ func TestDelete_e2e(t *testing.T) { q, err := NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) defer q.Close() - ss, ws, err := q.SelectSorted(nil, del.ms...) + ss, ws, err := q.Select(true, nil, del.ms...) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) // Build the mockSeriesSet. @@ -1092,7 +1092,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { testutil.Ok(t, err) defer q.Close() - ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) + ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -1120,7 +1120,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { testutil.Ok(t, err) defer q.Close() - ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) + ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -1405,7 +1405,7 @@ func TestHeadSeriesWithTimeBoundaries(t *testing.T) { seriesCount := 0 samplesCount := 0 - ss, _, err := q.Select(nil, matcher) + ss, _, err := q.Select(false, nil, matcher) testutil.Ok(t, err) for ss.Next() { i := ss.At().Iterator() @@ -1445,7 +1445,7 @@ func TestMemSeriesIsolation(t *testing.T) { testutil.Ok(t, err) defer querier.Close() - ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err := expandSeriesSet(ss) @@ -1630,7 +1630,7 @@ func testHeadSeriesChunkRace(t *testing.T) { h.gc() wg.Done() }() - ss, _, err := q.Select(nil, matcher) + ss, _, err := q.Select(false, nil, matcher) testutil.Ok(t, err) testutil.Ok(t, ss.Err()) wg.Wait() diff --git a/tsdb/querier.go b/tsdb/querier.go index 83f7b6ad6..ea1100a52 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -85,23 +85,21 @@ func (q *querier) lvals(qs []storage.Querier, n string) ([]string, storage.Warni return mergeStrings(s1, s2), ws, nil } -func (q *querier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - if len(q.blocks) != 1 { - return q.SelectSorted(p, ms...) - } - // Sorting Head series is slow, and unneeded when only the - // Head is being queried. Sorting blocks is a noop. - return q.blocks[0].Select(p, ms...) -} - -func (q *querier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { if len(q.blocks) == 0 { return storage.EmptySeriesSet(), nil, nil } + if len(q.blocks) == 1 { + // Sorting Head series is slow, and unneeded when only the + // Head is being queried. + return q.blocks[0].Select(sortSeries, hints, ms...) + } + ss := make([]storage.SeriesSet, len(q.blocks)) var ws storage.Warnings for i, b := range q.blocks { - s, w, err := b.SelectSorted(p, ms...) + // We have to sort if blocks > 1 as MergedSeriesSet requires it. + s, w, err := b.Select(true, hints, ms...) ws = append(ws, w...) if err != nil { return nil, ws, err @@ -127,30 +125,26 @@ type verticalQuerier struct { querier } -func (q *verticalQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return q.sel(p, q.blocks, ms) +func (q *verticalQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return q.sel(sortSeries, hints, q.blocks, ms) } -func (q *verticalQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return q.sel(p, q.blocks, ms) -} - -func (q *verticalQuerier) sel(p *storage.SelectParams, qs []storage.Querier, ms []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *verticalQuerier) sel(sortSeries bool, hints *storage.SelectHints, qs []storage.Querier, ms []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { if len(qs) == 0 { return storage.EmptySeriesSet(), nil, nil } if len(qs) == 1 { - return qs[0].SelectSorted(p, ms...) + return qs[0].Select(sortSeries, hints, ms...) } l := len(qs) / 2 var ws storage.Warnings - a, w, err := q.sel(p, qs[:l], ms) + a, w, err := q.sel(sortSeries, hints, qs[:l], ms) ws = append(ws, w...) if err != nil { return nil, ws, err } - b, w, err := q.sel(p, qs[l:], ms) + b, w, err := q.sel(sortSeries, hints, qs[l:], ms) ws = append(ws, w...) if err != nil { return nil, ws, err @@ -195,42 +189,24 @@ type blockQuerier struct { mint, maxt int64 } -func (q *blockQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - base, err := LookupChunkSeries(q.index, q.tombstones, ms...) +func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + var base storage.ChunkSeriesSet + var err error + + if sortSeries { + base, err = LookupChunkSeriesSorted(q.index, q.tombstones, ms...) + } else { + base, err = LookupChunkSeries(q.index, q.tombstones, ms...) + } if err != nil { return nil, nil, err } mint := q.mint maxt := q.maxt - if p != nil { - mint = p.Start - maxt = p.End - } - return &blockSeriesSet{ - set: &populatedChunkSeries{ - set: base, - chunks: q.chunks, - mint: mint, - maxt: maxt, - }, - - mint: mint, - maxt: maxt, - }, nil, nil -} - -func (q *blockQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - base, err := LookupChunkSeriesSorted(q.index, q.tombstones, ms...) - if err != nil { - return nil, nil, err - } - - mint := q.mint - maxt := q.maxt - if p != nil { - mint = p.Start - maxt = p.End + if hints != nil { + mint = hints.Start + maxt = hints.End } return &blockSeriesSet{ set: &populatedChunkSeries{ diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 765435203..95502be95 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -22,7 +22,6 @@ import ( "testing" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/testutil" ) @@ -147,12 +146,7 @@ func BenchmarkQuerierSelect(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - var ss storage.SeriesSet - if sorted { - ss, _, err = q.SelectSorted(nil, matcher) - } else { - ss, _, err = q.Select(nil, matcher) - } + ss, _, err := q.Select(sorted, nil, matcher) testutil.Ok(b, err) for ss.Next() { } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 52e316051..9668a21fe 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -373,7 +373,7 @@ Outer: maxt: c.maxt, } - res, ws, err := querier.Select(nil, c.ms...) + res, ws, err := querier.Select(false, nil, c.ms...) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -536,7 +536,7 @@ Outer: maxt: c.maxt, } - res, ws, err := querier.Select(nil, c.ms...) + res, ws, err := querier.Select(false, nil, c.ms...) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -1710,7 +1710,7 @@ func BenchmarkQuerySeek(b *testing.B) { b.ResetTimer() b.ReportAllocs() - ss, ws, err := sq.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + ss, ws, err := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) for ss.Next() { it := ss.At().Iterator() for t := mint; t <= maxt; t++ { @@ -1848,7 +1848,7 @@ func BenchmarkSetMatcher(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - _, ws, err := que.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern)) + _, ws, err := que.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern)) testutil.Ok(b, err) testutil.Equals(b, 0, len(ws)) } @@ -2297,7 +2297,7 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - ss, ws, err := q.Select(nil, selectors...) + ss, ws, err := q.Select(false, nil, selectors...) testutil.Ok(b, err) testutil.Equals(b, 0, len(ws)) var actualExpansions int diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 5ea367fa2..204b2ff38 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -531,7 +531,7 @@ func (api *API) series(r *http.Request) apiFuncResult { var sets []storage.SeriesSet var warnings storage.Warnings for _, mset := range matcherSets { - s, wrn, err := q.Select(nil, mset...) //TODO + s, wrn, err := q.Select(false, nil, mset...) warnings = append(warnings, wrn...) if err != nil { return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil} @@ -1161,10 +1161,9 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { return } for i, query := range req.Queries { - err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { - // The streaming API provides sorted series. - // TODO(bwplotka): Handle warnings via query log. - set, _, err := querier.SelectSorted(selectParams, filteredMatchers...) + err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error { + // The streaming API has to provide the series sorted. + set, _, err := querier.Select(true, hints, filteredMatchers...) if err != nil { return err } @@ -1195,8 +1194,8 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { Results: make([]*prompb.QueryResult, len(req.Queries)), } for i, query := range req.Queries { - err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { - set, _, err := querier.Select(selectParams, filteredMatchers...) + err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error { + set, _, err := querier.Select(false, hints, filteredMatchers...) if err != nil { return err } @@ -1254,7 +1253,7 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe return filteredMatchers, nil } -func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error) error { +func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error) error { filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) if err != nil { return err @@ -1264,23 +1263,25 @@ func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, extern if err != nil { return err } - - var selectParams *storage.SelectParams - if query.Hints != nil { - selectParams = &storage.SelectParams{ - Start: query.Hints.StartMs, - End: query.Hints.EndMs, - Step: query.Hints.StepMs, - Func: query.Hints.Func, - } - } - defer func() { if err := querier.Close(); err != nil { level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error()) } }() - return seriesHandleFn(querier, selectParams, filteredMatchers) + + var hints *storage.SelectHints + if query.Hints != nil { + hints = &storage.SelectHints{ + Start: query.Hints.StartMs, + End: query.Hints.EndMs, + Step: query.Hints.StepMs, + Func: query.Hints.Func, + Grouping: query.Hints.Grouping, + Range: query.Hints.RangeMs, + By: query.Hints.By, + } + } + return seriesHandleFn(querier, hints, filteredMatchers) } func (api *API) deleteSeries(r *http.Request) apiFuncResult { diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index bfac2d4f1..2c8a7c6ac 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -488,9 +488,9 @@ func setupRemote(s storage.Storage) *httptest.Server { return } - var selectParams *storage.SelectParams + var hints *storage.SelectHints if query.Hints != nil { - selectParams = &storage.SelectParams{ + hints = &storage.SelectHints{ Start: query.Hints.StartMs, End: query.Hints.EndMs, Step: query.Hints.StepMs, @@ -505,7 +505,7 @@ func setupRemote(s storage.Storage) *httptest.Server { } defer querier.Close() - set, _, err := querier.Select(selectParams, matchers...) + set, _, err := querier.Select(false, hints, matchers...) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -1615,7 +1615,7 @@ func TestSampledReadEndpoint(t *testing.T) { matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e") testutil.Ok(t, err) - query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"}) + query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"}) testutil.Ok(t, err) req := &prompb.ReadRequest{Queries: []*prompb.Query{query}} @@ -1714,7 +1714,7 @@ func TestStreamReadEndpoint(t *testing.T) { matcher3, err := labels.NewMatcher(labels.MatchEqual, "foo", "bar1") testutil.Ok(t, err) - query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{ + query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{ Step: 1, Func: "avg", Start: 0, @@ -1722,7 +1722,7 @@ func TestStreamReadEndpoint(t *testing.T) { }) testutil.Ok(t, err) - query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectParams{ + query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectHints{ Step: 1, Func: "avg", Start: 0, diff --git a/web/federate.go b/web/federate.go index e358a3dab..93c9aece3 100644 --- a/web/federate.go +++ b/web/federate.go @@ -81,14 +81,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { vec := make(promql.Vector, 0, 8000) - params := &storage.SelectParams{ - Start: mint, - End: maxt, - } + hints := &storage.SelectHints{Start: mint, End: maxt} var sets []storage.SeriesSet for _, mset := range matcherSets { - s, wrns, err := q.Select(params, mset...) + s, wrns, err := q.Select(false, hints, mset...) if wrns != nil { level.Debug(h.logger).Log("msg", "federation select returned warnings", "warnings", wrns) federationWarnings.Add(float64(len(wrns)))