storage: Removed SelectSorted method; Simplified interface; Added requirement for remote read to sort response.

This is technically BREAKING CHANGE, but it was like this from the beginning: I just notice that we rely in
Prometheus on remote read being sorted. This is because we use selected data from remote reads in MergeSeriesSet
which rely on sorting.

I found during work on https://github.com/prometheus/prometheus/pull/5882 that
we do so many repetitions because of this, for not good reason. I think
I found a good balance between convenience and readability with just one method.
Smaller the interface = better.

Also I don't know what TestSelectSorted was testing, but now it's testing sorting.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2020-03-12 09:36:09 +00:00 committed by Julien Pivotto
parent d6ad5551c9
commit c4eefd1b3a
24 changed files with 182 additions and 231 deletions

View file

@ -655,7 +655,7 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s
parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error { parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
var set storage.SeriesSet var set storage.SeriesSet
var wrn storage.Warnings var wrn storage.Warnings
params := &storage.SelectParams{ hints := &storage.SelectHints{
Start: timestamp.FromTime(s.Start), Start: timestamp.FromTime(s.Start),
End: timestamp.FromTime(s.End), End: timestamp.FromTime(s.End),
Step: durationToInt64Millis(s.Interval), Step: durationToInt64Millis(s.Interval),
@ -667,29 +667,29 @@ func (ng *Engine) populateSeries(ctx context.Context, querier storage.Querier, s
// from end also. // from end also.
subqOffset := ng.cumulativeSubqueryOffset(path) subqOffset := ng.cumulativeSubqueryOffset(path)
offsetMilliseconds := durationMilliseconds(subqOffset) offsetMilliseconds := durationMilliseconds(subqOffset)
params.Start = params.Start - offsetMilliseconds hints.Start = hints.Start - offsetMilliseconds
switch n := node.(type) { switch n := node.(type) {
case *parser.VectorSelector: case *parser.VectorSelector:
if evalRange == 0 { if evalRange == 0 {
params.Start = params.Start - durationMilliseconds(ng.lookbackDelta) hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta)
} else { } else {
params.Range = durationMilliseconds(evalRange) hints.Range = durationMilliseconds(evalRange)
// For all matrix queries we want to ensure that we have (end-start) + range selected // For all matrix queries we want to ensure that we have (end-start) + range selected
// this way we have `range` data before the start time // this way we have `range` data before the start time
params.Start = params.Start - durationMilliseconds(evalRange) hints.Start = hints.Start - durationMilliseconds(evalRange)
evalRange = 0 evalRange = 0
} }
params.Func = extractFuncFromPath(path) hints.Func = extractFuncFromPath(path)
params.By, params.Grouping = extractGroupsFromPath(path) hints.By, hints.Grouping = extractGroupsFromPath(path)
if n.Offset > 0 { if n.Offset > 0 {
offsetMilliseconds := durationMilliseconds(n.Offset) offsetMilliseconds := durationMilliseconds(n.Offset)
params.Start = params.Start - offsetMilliseconds hints.Start = hints.Start - offsetMilliseconds
params.End = params.End - offsetMilliseconds hints.End = hints.End - offsetMilliseconds
} }
set, wrn, err = querier.Select(params, n.LabelMatchers...) set, wrn, err = querier.Select(false, hints, n.LabelMatchers...)
warnings = append(warnings, wrn...) warnings = append(warnings, wrn...)
if err != nil { if err != nil {
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)

View file

@ -175,15 +175,12 @@ type errQuerier struct {
err error err error
} }
func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { func (q *errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return errSeriesSet{err: q.err}, nil, q.err return errSeriesSet{err: q.err}, nil, q.err
} }
func (q *errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { func (*errQuerier) LabelValues(string) ([]string, storage.Warnings, error) { return nil, nil, nil }
return errSeriesSet{err: q.err}, nil, q.err func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
} func (*errQuerier) Close() error { return nil }
func (*errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) Close() error { return nil }
// errSeriesSet implements storage.SeriesSet which always returns error. // errSeriesSet implements storage.SeriesSet which always returns error.
type errSeriesSet struct { type errSeriesSet struct {
@ -224,9 +221,9 @@ func TestQueryError(t *testing.T) {
testutil.Equals(t, errStorage, res.Err) testutil.Equals(t, errStorage, res.Err)
} }
// paramCheckerQuerier implements storage.Querier which checks the start and end times // hintCheckerQuerier implements storage.Querier which checks the start and end times
// in params. // in hints.
type paramCheckerQuerier struct { type hintCheckerQuerier struct {
start int64 start int64
end int64 end int64
grouping []string grouping []string
@ -237,10 +234,7 @@ type paramCheckerQuerier struct {
t *testing.T t *testing.T
} }
func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, m ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { func (q *hintCheckerQuerier) Select(_ bool, sp *storage.SelectHints, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return q.SelectSorted(sp, m...)
}
func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
testutil.Equals(q.t, q.start, sp.Start) testutil.Equals(q.t, q.start, sp.Start)
testutil.Equals(q.t, q.end, sp.End) testutil.Equals(q.t, q.end, sp.End)
testutil.Equals(q.t, q.grouping, sp.Grouping) testutil.Equals(q.t, q.grouping, sp.Grouping)
@ -250,11 +244,11 @@ func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*label
return errSeriesSet{err: nil}, nil, nil return errSeriesSet{err: nil}, nil, nil
} }
func (*paramCheckerQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { func (*hintCheckerQuerier) LabelValues(string) ([]string, storage.Warnings, error) {
return nil, nil, nil return nil, nil, nil
} }
func (*paramCheckerQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } func (*hintCheckerQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*paramCheckerQuerier) Close() error { return nil } func (*hintCheckerQuerier) Close() error { return nil }
func TestParamsSetCorrectly(t *testing.T) { func TestParamsSetCorrectly(t *testing.T) {
opts := EngineOpts{ opts := EngineOpts{
@ -435,7 +429,7 @@ func TestParamsSetCorrectly(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
engine := NewEngine(opts) engine := NewEngine(opts)
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &paramCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, grouping: tc.paramGrouping, by: tc.paramBy, selRange: tc.paramRange, function: tc.paramFunc, t: t}, nil return &hintCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, grouping: tc.paramGrouping, by: tc.paramBy, selRange: tc.paramRange, function: tc.paramFunc, t: t}, nil
}) })
var ( var (

View file

@ -133,7 +133,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
} }
// Get the series for the matcher. // Get the series for the matcher.
ss, _, err := querier.Select(nil, matchers...) ss, _, err := querier.Select(false, nil, matchers...)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Assert(t, ss.Next(), "") testutil.Assert(t, ss.Next(), "")
storageSeries := ss.At() storageSeries := ss.At()

View file

@ -707,7 +707,7 @@ func (g *Group) RestoreForState(ts time.Time) {
matchers = append(matchers, mt) matchers = append(matchers, mt)
} }
sset, err, _ := q.Select(nil, matchers...) sset, err, _ := q.Select(false, nil, matchers...)
if err != nil { if err != nil {
level.Error(g.logger).Log("msg", "Failed to restore 'for' state", level.Error(g.logger).Log("msg", "Failed to restore 'for' state",
labels.AlertName, alertRule.Name(), "stage", "Select", "err", err) labels.AlertName, alertRule.Name(), "stage", "Select", "err", err)

View file

@ -512,8 +512,8 @@ func TestForStateRestore(t *testing.T) {
} }
func TestStaleness(t *testing.T) { func TestStaleness(t *testing.T) {
storage := teststorage.New(t) st := teststorage.New(t)
defer storage.Close() defer st.Close()
engineOpts := promql.EngineOpts{ engineOpts := promql.EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
@ -522,9 +522,9 @@ func TestStaleness(t *testing.T) {
} }
engine := promql.NewEngine(engineOpts) engine := promql.NewEngine(engineOpts)
opts := &ManagerOptions{ opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(engine, storage), QueryFunc: EngineQueryFunc(engine, st),
Appendable: storage, Appendable: st,
TSDB: storage, TSDB: st,
Context: context.Background(), Context: context.Background(),
Logger: log.NewNopLogger(), Logger: log.NewNopLogger(),
} }
@ -541,7 +541,7 @@ func TestStaleness(t *testing.T) {
}) })
// A time series that has two samples and then goes stale. // A time series that has two samples and then goes stale.
app := storage.Appender() app := st.Appender()
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN)) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN))
@ -556,14 +556,14 @@ func TestStaleness(t *testing.T) {
group.Eval(ctx, time.Unix(1, 0)) group.Eval(ctx, time.Unix(1, 0))
group.Eval(ctx, time.Unix(2, 0)) group.Eval(ctx, time.Unix(2, 0))
querier, err := storage.Querier(context.Background(), 0, 2000) querier, err := st.Querier(context.Background(), 0, 2000)
testutil.Ok(t, err) testutil.Ok(t, err)
defer querier.Close() defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
testutil.Ok(t, err) testutil.Ok(t, err)
set, _, err := querier.Select(nil, matcher) set, _, err := querier.Select(false, nil, matcher)
testutil.Ok(t, err) testutil.Ok(t, err)
samples, err := readSeriesSet(set) samples, err := readSeriesSet(set)
@ -658,8 +658,8 @@ func TestCopyState(t *testing.T) {
} }
func TestDeletedRuleMarkedStale(t *testing.T) { func TestDeletedRuleMarkedStale(t *testing.T) {
storage := teststorage.New(t) st := teststorage.New(t)
defer storage.Close() defer st.Close()
oldGroup := &Group{ oldGroup := &Group{
rules: []Rule{ rules: []Rule{
NewRecordingRule("rule1", nil, labels.Labels{{Name: "l1", Value: "v1"}}), NewRecordingRule("rule1", nil, labels.Labels{{Name: "l1", Value: "v1"}}),
@ -672,21 +672,21 @@ func TestDeletedRuleMarkedStale(t *testing.T) {
rules: []Rule{}, rules: []Rule{},
seriesInPreviousEval: []map[string]labels.Labels{}, seriesInPreviousEval: []map[string]labels.Labels{},
opts: &ManagerOptions{ opts: &ManagerOptions{
Appendable: storage, Appendable: st,
}, },
} }
newGroup.CopyState(oldGroup) newGroup.CopyState(oldGroup)
newGroup.Eval(context.Background(), time.Unix(0, 0)) newGroup.Eval(context.Background(), time.Unix(0, 0))
querier, err := storage.Querier(context.Background(), 0, 2000) querier, err := st.Querier(context.Background(), 0, 2000)
testutil.Ok(t, err) testutil.Ok(t, err)
defer querier.Close() defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, "l1", "v1") matcher, err := labels.NewMatcher(labels.MatchEqual, "l1", "v1")
testutil.Ok(t, err) testutil.Ok(t, err)
set, _, err := querier.Select(nil, matcher) set, _, err := querier.Select(false, nil, matcher)
testutil.Ok(t, err) testutil.Ok(t, err)
samples, err := readSeriesSet(set) samples, err := readSeriesSet(set)
@ -704,8 +704,8 @@ func TestUpdate(t *testing.T) {
expected := map[string]labels.Labels{ expected := map[string]labels.Labels{
"test": labels.FromStrings("name", "value"), "test": labels.FromStrings("name", "value"),
} }
storage := teststorage.New(t) st := teststorage.New(t)
defer storage.Close() defer st.Close()
opts := promql.EngineOpts{ opts := promql.EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
@ -714,9 +714,9 @@ func TestUpdate(t *testing.T) {
} }
engine := promql.NewEngine(opts) engine := promql.NewEngine(opts)
ruleManager := NewManager(&ManagerOptions{ ruleManager := NewManager(&ManagerOptions{
Appendable: storage, Appendable: st,
TSDB: storage, TSDB: st,
QueryFunc: EngineQueryFunc(engine, storage), QueryFunc: EngineQueryFunc(engine, st),
Context: context.Background(), Context: context.Background(),
Logger: log.NewNopLogger(), Logger: log.NewNopLogger(),
}) })
@ -1096,16 +1096,16 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) {
testutil.Equals(t, 0, countStaleNaN(t, storage), "invalid count of staleness markers after stopping the engine") testutil.Equals(t, 0, countStaleNaN(t, storage), "invalid count of staleness markers after stopping the engine")
} }
func countStaleNaN(t *testing.T, storage storage.Storage) int { func countStaleNaN(t *testing.T, st storage.Storage) int {
var c int var c int
querier, err := storage.Querier(context.Background(), 0, time.Now().Unix()*1000) querier, err := st.Querier(context.Background(), 0, time.Now().Unix()*1000)
testutil.Ok(t, err) testutil.Ok(t, err)
defer querier.Close() defer querier.Close()
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2") matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2")
testutil.Ok(t, err) testutil.Ok(t, err)
set, _, err := querier.Select(nil, matcher) set, _, err := querier.Select(false, nil, matcher)
testutil.Ok(t, err) testutil.Ok(t, err)
samples, err := readSeriesSet(set) samples, err := readSeriesSet(set)

View file

@ -1574,7 +1574,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err) testutil.Ok(t, err)
series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) series, _, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, false, series.Next(), "series found in tsdb") testutil.Equals(t, false, series.Next(), "series found in tsdb")
@ -1584,7 +1584,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0) q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err) testutil.Ok(t, err)
series, _, err = q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "le", "500")) series, _, err = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, true, series.Next(), "series not found in tsdb") testutil.Equals(t, true, series.Next(), "series not found in tsdb")
testutil.Equals(t, false, series.Next(), "more than one series found in tsdb") testutil.Equals(t, false, series.Next(), "more than one series found in tsdb")
@ -1620,7 +1620,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err) testutil.Ok(t, err)
series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) series, _, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, false, series.Next(), "series found in tsdb") testutil.Equals(t, false, series.Next(), "series found in tsdb")
} }

View file

@ -221,20 +221,16 @@ func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier {
} }
// Select returns a set of series that matches the given label matchers. // Select returns a set of series that matches the given label matchers.
func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { func (q *mergeQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
if len(q.queriers) != 1 { if len(q.queriers) == 1 {
// We need to sort for NewMergeSeriesSet to work. return q.queriers[0].Select(sortSeries, hints, matchers...)
return q.SelectSorted(params, matchers...)
} }
return q.queriers[0].Select(params, matchers...)
}
// SelectSorted returns a set of sorted series that matches the given label matchers. var (
func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { seriesSets = make([]SeriesSet, 0, len(q.queriers))
seriesSets := make([]SeriesSet, 0, len(q.queriers)) warnings Warnings
var warnings Warnings priErr error
)
var priErr error = nil
type queryResult struct { type queryResult struct {
qr Querier qr Querier
set SeriesSet set SeriesSet
@ -242,9 +238,11 @@ func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Ma
selectError error selectError error
} }
queryResultChan := make(chan *queryResult) queryResultChan := make(chan *queryResult)
for _, querier := range q.queriers { for _, querier := range q.queriers {
go func(qr Querier) { go func(qr Querier) {
set, wrn, err := qr.SelectSorted(params, matchers...) // We need to sort for NewMergeSeriesSet to work.
set, wrn, err := qr.Select(true, hints, matchers...)
queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err} queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err}
}(querier) }(querier)
} }

View file

@ -79,7 +79,7 @@ func TestSelectSorted(t *testing.T) {
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
testutil.Ok(t, err) testutil.Ok(t, err)
seriesSet, _, err := querier.SelectSorted(nil, matcher) seriesSet, _, err := querier.Select(true, nil, matcher)
testutil.Ok(t, err) testutil.Ok(t, err)
result := make(map[int64]float64) result := make(map[int64]float64)

View file

@ -60,10 +60,9 @@ type Queryable interface {
// time range. // time range.
type Querier interface { type Querier interface {
// Select returns a set of series that matches the given label matchers. // Select returns a set of series that matches the given label matchers.
Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) // Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
// SelectSorted returns a sorted set of series that matches the given label matchers. Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error)
SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error)
// LabelValues returns all potential values for a label name. // LabelValues returns all potential values for a label name.
// It is not safe to use the strings beyond the lifefime of the querier. // It is not safe to use the strings beyond the lifefime of the querier.
@ -76,8 +75,9 @@ type Querier interface {
Close() error Close() error
} }
// SelectParams specifies parameters passed to data selections. // SelectHints specifies hints passed for data selections.
type SelectParams struct { // This is used only as an option for implementation to use.
type SelectHints struct {
Start int64 // Start time in milliseconds for this select. Start int64 // Start time in milliseconds for this select.
End int64 // End time in milliseconds for this select. End int64 // End time in milliseconds for this select.

View file

@ -24,11 +24,7 @@ func NoopQuerier() Querier {
return noopQuerier{} return noopQuerier{}
} }
func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) { func (noopQuerier) Select(bool, *SelectHints, ...*labels.Matcher) (SeriesSet, Warnings, error) {
return NoopSeriesSet(), nil, nil
}
func (noopQuerier) SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) {
return NoopSeriesSet(), nil, nil return NoopSeriesSet(), nil, nil
} }

View file

@ -79,22 +79,22 @@ func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error
} }
// ToQuery builds a Query proto. // ToQuery builds a Query proto.
func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams) (*prompb.Query, error) { func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHints) (*prompb.Query, error) {
ms, err := toLabelMatchers(matchers) ms, err := toLabelMatchers(matchers)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var rp *prompb.ReadHints var rp *prompb.ReadHints
if p != nil { if hints != nil {
rp = &prompb.ReadHints{ rp = &prompb.ReadHints{
StepMs: p.Step, StartMs: hints.Start,
Func: p.Func, EndMs: hints.End,
StartMs: p.Start, StepMs: hints.Step,
EndMs: p.End, Func: hints.Func,
Grouping: p.Grouping, Grouping: hints.Grouping,
By: p.By, By: hints.By,
RangeMs: p.Range, RangeMs: hints.Range,
} }
} }
@ -145,7 +145,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
} }
// FromQueryResult unpacks and sorts a QueryResult proto. // FromQueryResult unpacks and sorts a QueryResult proto.
func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet { func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet {
series := make([]storage.Series, 0, len(res.Timeseries)) series := make([]storage.Series, 0, len(res.Timeseries))
for _, ts := range res.Timeseries { for _, ts := range res.Timeseries {
labels := labelProtosToLabels(ts.Labels) labels := labelProtosToLabels(ts.Labels)
@ -158,7 +158,10 @@ func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet {
samples: ts.Samples, samples: ts.Samples,
}) })
} }
sort.Sort(byLabel(series))
if sortSeries {
sort.Sort(byLabel(series))
}
return &concreteSeriesSet{ return &concreteSeriesSet{
series: series, series: series,
} }

View file

@ -178,7 +178,7 @@ func TestFromQueryResultWithDuplicates(t *testing.T) {
}, },
} }
series := FromQueryResult(&res) series := FromQueryResult(false, &res)
errSeries, isErrSeriesSet := series.(errSeriesSet) errSeries, isErrSeriesSet := series.(errSeriesSet)

View file

@ -57,16 +57,9 @@ type querier struct {
client *Client client *Client
} }
// Select implements storage.Querier and uses the given matchers to read series // Select implements storage.Querier and uses the given matchers to read series sets from the Client.
// sets from the Client. func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { query, err := ToQuery(q.mint, q.maxt, matchers, hints)
return q.SelectSorted(p, matchers...)
}
// SelectSorted implements storage.Querier and uses the given matchers to read series
// sets from the Client.
func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
query, err := ToQuery(q.mint, q.maxt, matchers, p)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -80,12 +73,11 @@ func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matc
return nil, nil, fmt.Errorf("remote_read: %v", err) return nil, nil, fmt.Errorf("remote_read: %v", err)
} }
// FromQueryResult sorts. return FromQueryResult(sortSeries, res), nil, nil
return FromQueryResult(res), nil, nil
} }
// LabelValues implements storage.Querier and is a noop. // LabelValues implements storage.Querier and is a noop.
func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) { func (q *querier) LabelValues(string) ([]string, storage.Warnings, error) {
// TODO implement? // TODO implement?
return nil, nil, nil return nil, nil, nil
} }
@ -124,9 +116,9 @@ type externalLabelsQuerier struct {
// Select adds equality matchers for all external labels to the list of matchers // Select adds equality matchers for all external labels to the list of matchers
// before calling the wrapped storage.Queryable. The added external labels are // before calling the wrapped storage.Queryable. The added external labels are
// removed from the returned series sets. // removed from the returned series sets.
func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { func (q externalLabelsQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
m, added := q.addExternalLabels(matchers) m, added := q.addExternalLabels(matchers)
s, warnings, err := q.Querier.Select(p, m...) s, warnings, err := q.Querier.Select(sortSeries, hints, m...)
if err != nil { if err != nil {
return nil, warnings, err return nil, warnings, err
} }
@ -177,7 +169,7 @@ type requiredMatchersQuerier struct {
// Select returns a NoopSeriesSet if the given matchers don't match the label // Select returns a NoopSeriesSet if the given matchers don't match the label
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. // set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { func (q requiredMatchersQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
ms := q.requiredMatchers ms := q.requiredMatchers
for _, m := range matchers { for _, m := range matchers {
for i, r := range ms { for i, r := range ms {
@ -193,7 +185,7 @@ func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*la
if len(ms) > 0 { if len(ms) > 0 {
return storage.NoopSeriesSet(), nil, nil return storage.NoopSeriesSet(), nil, nil
} }
return q.Querier.Select(p, matchers...) return q.Querier.Select(sortSeries, hints, matchers...)
} }
// addExternalLabels adds matchers for each external label. External labels // addExternalLabels adds matchers for each external label. External labels

View file

@ -117,7 +117,7 @@ func TestExternalLabelsQuerierSelect(t *testing.T) {
}, },
} }
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
have, _, err := q.Select(nil, matchers...) have, _, err := q.Select(false, nil, matchers...)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -219,7 +219,7 @@ func TestSeriesSetFilter(t *testing.T) {
} }
for i, tc := range tests { for i, tc := range tests {
filtered := newSeriesSetFilter(FromQueryResult(tc.in), tc.toRemove) filtered := newSeriesSetFilter(FromQueryResult(true, tc.in), tc.toRemove)
have, err := ToQueryResult(filtered, 1e6) have, err := ToQueryResult(filtered, 1e6)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -242,7 +242,7 @@ type mockSeriesSet struct {
storage.SeriesSet storage.SeriesSet
} }
func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { func (mockQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return mockSeriesSet{}, nil, nil return mockSeriesSet{}, nil, nil
} }
@ -398,7 +398,7 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) {
requiredMatchers: test.requiredMatchers, requiredMatchers: test.requiredMatchers,
} }
have, _, err := q.Select(nil, test.matchers...) have, _, err := q.Select(false, nil, test.matchers...)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }

View file

@ -203,7 +203,7 @@ func TestCorruptedChunk(t *testing.T) {
querier, err := NewBlockQuerier(b, 0, 1) querier, err := NewBlockQuerier(b, 0, 1)
testutil.Ok(t, err) testutil.Ok(t, err)
defer func() { testutil.Ok(t, querier.Close()) }() defer func() { testutil.Ok(t, querier.Close()) }()
set, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) set, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))

View file

@ -617,7 +617,7 @@ func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) (err error) {
err = merr.Err() err = merr.Err()
}() }()
ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
if err != nil { if err != nil {
return err return err
} }

View file

@ -67,7 +67,7 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB, close func()
// query runs a matcher query against the querier and fully expands its data. // query runs a matcher query against the querier and fully expands its data.
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]tsdbutil.Sample { func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]tsdbutil.Sample {
ss, ws, err := q.Select(nil, matchers...) ss, ws, err := q.Select(false, nil, matchers...)
defer func() { defer func() {
testutil.Ok(t, q.Close()) testutil.Ok(t, q.Close())
}() }()
@ -315,7 +315,7 @@ Outer:
q, err := db.Querier(context.TODO(), 0, numSamples) q, err := db.Querier(context.TODO(), 0, numSamples)
testutil.Ok(t, err) testutil.Ok(t, err)
res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
@ -491,7 +491,7 @@ func TestDB_Snapshot(t *testing.T) {
defer func() { testutil.Ok(t, querier.Close()) }() defer func() { testutil.Ok(t, querier.Close()) }()
// sum values // sum values
seriesSet, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
@ -546,7 +546,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
defer func() { testutil.Ok(t, querier.Close()) }() defer func() { testutil.Ok(t, querier.Close()) }()
// Sum values. // Sum values.
seriesSet, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
@ -618,7 +618,7 @@ Outer:
testutil.Ok(t, err) testutil.Ok(t, err)
defer func() { testutil.Ok(t, q.Close()) }() defer func() { testutil.Ok(t, q.Close()) }()
res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
@ -792,7 +792,7 @@ func TestDB_e2e(t *testing.T) {
q, err := db.Querier(context.TODO(), mint, maxt) q, err := db.Querier(context.TODO(), mint, maxt)
testutil.Ok(t, err) testutil.Ok(t, err)
ss, ws, err := q.Select(nil, qry.ms...) ss, ws, err := q.Select(false, nil, qry.ms...)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
@ -951,7 +951,7 @@ func TestTombstoneClean(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer q.Close() defer q.Close()
res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
@ -1289,7 +1289,7 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
defer func() { testutil.Ok(t, q.Close()) }() defer func() { testutil.Ok(t, q.Close()) }()
for _, c := range cases { for _, c := range cases {
ss, ws, err := q.Select(nil, c.selector...) ss, ws, err := q.Select(false, nil, c.selector...)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
@ -2486,7 +2486,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
defer func() { testutil.Ok(t, querier.Close()) }() defer func() { testutil.Ok(t, querier.Close()) }()
// Sum the values. // Sum the values.
seriesSet, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush")) seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
@ -2551,7 +2551,7 @@ func TestDBCannotSeePartialCommits(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer querier.Close() defer querier.Close()
ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) ss, _, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err) testutil.Ok(t, err)
_, seriesSet, err := expandSeriesSet(ss) _, seriesSet, err := expandSeriesSet(ss)
@ -2591,13 +2591,13 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
defer querierAfterAddButBeforeCommit.Close() defer querierAfterAddButBeforeCommit.Close()
// None of the queriers should return anything after the Add but before the commit. // None of the queriers should return anything after the Add but before the commit.
ss, _, err := querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) ss, _, err := querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err) testutil.Ok(t, err)
_, seriesSet, err := expandSeriesSet(ss) _, seriesSet, err := expandSeriesSet(ss)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, map[string][]sample{}, seriesSet) testutil.Equals(t, map[string][]sample{}, seriesSet)
ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) ss, _, err = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err) testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss) _, seriesSet, err = expandSeriesSet(ss)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -2608,14 +2608,14 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
// Nothing returned for querier created before the Add. // Nothing returned for querier created before the Add.
ss, _, err = querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) ss, _, err = querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err) testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss) _, seriesSet, err = expandSeriesSet(ss)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, map[string][]sample{}, seriesSet) testutil.Equals(t, map[string][]sample{}, seriesSet)
// Series exists but has no samples for querier created after Add. // Series exists but has no samples for querier created after Add.
ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) ss, _, err = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err) testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss) _, seriesSet, err = expandSeriesSet(ss)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -2626,7 +2626,7 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
defer querierAfterCommit.Close() defer querierAfterCommit.Close()
// Samples are returned for querier created after Commit. // Samples are returned for querier created after Commit.
ss, _, err = querierAfterCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) ss, _, err = querierAfterCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err) testutil.Ok(t, err)
_, seriesSet, err = expandSeriesSet(ss) _, seriesSet, err = expandSeriesSet(ss)
testutil.Ok(t, err) testutil.Ok(t, err)

View file

@ -566,7 +566,7 @@ func TestHeadDeleteSimple(t *testing.T) {
for _, h := range []*Head{head, reloadedHead} { for _, h := range []*Head{head, reloadedHead} {
q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())
testutil.Ok(t, err) testutil.Ok(t, err)
actSeriesSet, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) actSeriesSet, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
@ -613,7 +613,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
// Test the series returns no samples. The series is cleared only after compaction. // Test the series returns no samples. The series is cleared only after compaction.
q, err := NewBlockQuerier(hb, 0, 100000) q, err := NewBlockQuerier(hb, 0, 100000)
testutil.Ok(t, err) testutil.Ok(t, err)
res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
testutil.Assert(t, res.Next(), "series is not present") testutil.Assert(t, res.Next(), "series is not present")
@ -628,7 +628,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
q, err = NewBlockQuerier(hb, 0, 100000) q, err = NewBlockQuerier(hb, 0, 100000)
testutil.Ok(t, err) testutil.Ok(t, err)
res, ws, err = q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) res, ws, err = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
testutil.Assert(t, res.Next(), "series don't exist") testutil.Assert(t, res.Next(), "series don't exist")
@ -804,7 +804,7 @@ func TestDelete_e2e(t *testing.T) {
q, err := NewBlockQuerier(hb, 0, 100000) q, err := NewBlockQuerier(hb, 0, 100000)
testutil.Ok(t, err) testutil.Ok(t, err)
defer q.Close() defer q.Close()
ss, ws, err := q.SelectSorted(nil, del.ms...) ss, ws, err := q.Select(true, nil, del.ms...)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
// Build the mockSeriesSet. // Build the mockSeriesSet.
@ -1092,7 +1092,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer q.Close() defer q.Close()
ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
@ -1120,7 +1120,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer q.Close() defer q.Close()
ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
@ -1405,7 +1405,7 @@ func TestHeadSeriesWithTimeBoundaries(t *testing.T) {
seriesCount := 0 seriesCount := 0
samplesCount := 0 samplesCount := 0
ss, _, err := q.Select(nil, matcher) ss, _, err := q.Select(false, nil, matcher)
testutil.Ok(t, err) testutil.Ok(t, err)
for ss.Next() { for ss.Next() {
i := ss.At().Iterator() i := ss.At().Iterator()
@ -1445,7 +1445,7 @@ func TestMemSeriesIsolation(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer querier.Close() defer querier.Close()
ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) ss, _, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Ok(t, err) testutil.Ok(t, err)
_, seriesSet, err := expandSeriesSet(ss) _, seriesSet, err := expandSeriesSet(ss)

View file

@ -85,23 +85,21 @@ func (q *querier) lvals(qs []storage.Querier, n string) ([]string, storage.Warni
return mergeStrings(s1, s2), ws, nil return mergeStrings(s1, s2), ws, nil
} }
func (q *querier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
if len(q.blocks) != 1 {
return q.SelectSorted(p, ms...)
}
// Sorting Head series is slow, and unneeded when only the
// Head is being queried. Sorting blocks is a noop.
return q.blocks[0].Select(p, ms...)
}
func (q *querier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
if len(q.blocks) == 0 { if len(q.blocks) == 0 {
return storage.EmptySeriesSet(), nil, nil return storage.EmptySeriesSet(), nil, nil
} }
if len(q.blocks) == 1 {
// Sorting Head series is slow, and unneeded when only the
// Head is being queried.
return q.blocks[0].Select(sortSeries, hints, ms...)
}
ss := make([]storage.SeriesSet, len(q.blocks)) ss := make([]storage.SeriesSet, len(q.blocks))
var ws storage.Warnings var ws storage.Warnings
for i, b := range q.blocks { for i, b := range q.blocks {
s, w, err := b.SelectSorted(p, ms...) // We have to sort if blocks > 1 as MergedSeriesSet requires it.
s, w, err := b.Select(true, hints, ms...)
ws = append(ws, w...) ws = append(ws, w...)
if err != nil { if err != nil {
return nil, ws, err return nil, ws, err
@ -127,30 +125,26 @@ type verticalQuerier struct {
querier querier
} }
func (q *verticalQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { func (q *verticalQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return q.sel(p, q.blocks, ms) return q.sel(sortSeries, hints, q.blocks, ms)
} }
func (q *verticalQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { func (q *verticalQuerier) sel(sortSeries bool, hints *storage.SelectHints, qs []storage.Querier, ms []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return q.sel(p, q.blocks, ms)
}
func (q *verticalQuerier) sel(p *storage.SelectParams, qs []storage.Querier, ms []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
if len(qs) == 0 { if len(qs) == 0 {
return storage.EmptySeriesSet(), nil, nil return storage.EmptySeriesSet(), nil, nil
} }
if len(qs) == 1 { if len(qs) == 1 {
return qs[0].SelectSorted(p, ms...) return qs[0].Select(sortSeries, hints, ms...)
} }
l := len(qs) / 2 l := len(qs) / 2
var ws storage.Warnings var ws storage.Warnings
a, w, err := q.sel(p, qs[:l], ms) a, w, err := q.sel(sortSeries, hints, qs[:l], ms)
ws = append(ws, w...) ws = append(ws, w...)
if err != nil { if err != nil {
return nil, ws, err return nil, ws, err
} }
b, w, err := q.sel(p, qs[l:], ms) b, w, err := q.sel(sortSeries, hints, qs[l:], ms)
ws = append(ws, w...) ws = append(ws, w...)
if err != nil { if err != nil {
return nil, ws, err return nil, ws, err
@ -195,42 +189,24 @@ type blockQuerier struct {
mint, maxt int64 mint, maxt int64
} }
func (q *blockQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
base, err := LookupChunkSeries(q.index, q.tombstones, ms...) var base storage.ChunkSeriesSet
var err error
if sortSeries {
base, err = LookupChunkSeriesSorted(q.index, q.tombstones, ms...)
} else {
base, err = LookupChunkSeries(q.index, q.tombstones, ms...)
}
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
mint := q.mint mint := q.mint
maxt := q.maxt maxt := q.maxt
if p != nil { if hints != nil {
mint = p.Start mint = hints.Start
maxt = p.End maxt = hints.End
}
return &blockSeriesSet{
set: &populatedChunkSeries{
set: base,
chunks: q.chunks,
mint: mint,
maxt: maxt,
},
mint: mint,
maxt: maxt,
}, nil, nil
}
func (q *blockQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
base, err := LookupChunkSeriesSorted(q.index, q.tombstones, ms...)
if err != nil {
return nil, nil, err
}
mint := q.mint
maxt := q.maxt
if p != nil {
mint = p.Start
maxt = p.End
} }
return &blockSeriesSet{ return &blockSeriesSet{
set: &populatedChunkSeries{ set: &populatedChunkSeries{

View file

@ -22,7 +22,6 @@ import (
"testing" "testing"
"github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/testutil" "github.com/prometheus/prometheus/util/testutil"
) )
@ -147,12 +146,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
var ss storage.SeriesSet ss, _, err := q.Select(sorted, nil, matcher)
if sorted {
ss, _, err = q.SelectSorted(nil, matcher)
} else {
ss, _, err = q.Select(nil, matcher)
}
testutil.Ok(b, err) testutil.Ok(b, err)
for ss.Next() { for ss.Next() {
} }

View file

@ -373,7 +373,7 @@ Outer:
maxt: c.maxt, maxt: c.maxt,
} }
res, ws, err := querier.Select(nil, c.ms...) res, ws, err := querier.Select(false, nil, c.ms...)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
@ -536,7 +536,7 @@ Outer:
maxt: c.maxt, maxt: c.maxt,
} }
res, ws, err := querier.Select(nil, c.ms...) res, ws, err := querier.Select(false, nil, c.ms...)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws)) testutil.Equals(t, 0, len(ws))
@ -1710,7 +1710,7 @@ func BenchmarkQuerySeek(b *testing.B) {
b.ResetTimer() b.ResetTimer()
b.ReportAllocs() b.ReportAllocs()
ss, ws, err := sq.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) ss, ws, err := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
for ss.Next() { for ss.Next() {
it := ss.At().Iterator() it := ss.At().Iterator()
for t := mint; t <= maxt; t++ { for t := mint; t <= maxt; t++ {
@ -1848,7 +1848,7 @@ func BenchmarkSetMatcher(b *testing.B) {
b.ResetTimer() b.ResetTimer()
b.ReportAllocs() b.ReportAllocs()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
_, ws, err := que.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern)) _, ws, err := que.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern))
testutil.Ok(b, err) testutil.Ok(b, err)
testutil.Equals(b, 0, len(ws)) testutil.Equals(b, 0, len(ws))
} }
@ -2297,7 +2297,7 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la
b.ResetTimer() b.ResetTimer()
b.ReportAllocs() b.ReportAllocs()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
ss, ws, err := q.Select(nil, selectors...) ss, ws, err := q.Select(false, nil, selectors...)
testutil.Ok(b, err) testutil.Ok(b, err)
testutil.Equals(b, 0, len(ws)) testutil.Equals(b, 0, len(ws))
var actualExpansions int var actualExpansions int

View file

@ -531,7 +531,7 @@ func (api *API) series(r *http.Request) apiFuncResult {
var sets []storage.SeriesSet var sets []storage.SeriesSet
var warnings storage.Warnings var warnings storage.Warnings
for _, mset := range matcherSets { for _, mset := range matcherSets {
s, wrn, err := q.Select(nil, mset...) //TODO s, wrn, err := q.Select(false, nil, mset...)
warnings = append(warnings, wrn...) warnings = append(warnings, wrn...)
if err != nil { if err != nil {
return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil} return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil}
@ -1161,10 +1161,9 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
return return
} }
for i, query := range req.Queries { for i, query := range req.Queries {
err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error {
// The streaming API provides sorted series. // The streaming API has to provide the series sorted.
// TODO(bwplotka): Handle warnings via query log. set, _, err := querier.Select(true, hints, filteredMatchers...)
set, _, err := querier.SelectSorted(selectParams, filteredMatchers...)
if err != nil { if err != nil {
return err return err
} }
@ -1195,8 +1194,8 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
Results: make([]*prompb.QueryResult, len(req.Queries)), Results: make([]*prompb.QueryResult, len(req.Queries)),
} }
for i, query := range req.Queries { for i, query := range req.Queries {
err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error {
set, _, err := querier.Select(selectParams, filteredMatchers...) set, _, err := querier.Select(false, hints, filteredMatchers...)
if err != nil { if err != nil {
return err return err
} }
@ -1254,7 +1253,7 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe
return filteredMatchers, nil return filteredMatchers, nil
} }
func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error) error { func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error) error {
filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
if err != nil { if err != nil {
return err return err
@ -1264,23 +1263,25 @@ func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, extern
if err != nil { if err != nil {
return err return err
} }
var selectParams *storage.SelectParams
if query.Hints != nil {
selectParams = &storage.SelectParams{
Start: query.Hints.StartMs,
End: query.Hints.EndMs,
Step: query.Hints.StepMs,
Func: query.Hints.Func,
}
}
defer func() { defer func() {
if err := querier.Close(); err != nil { if err := querier.Close(); err != nil {
level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error()) level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error())
} }
}() }()
return seriesHandleFn(querier, selectParams, filteredMatchers)
var hints *storage.SelectHints
if query.Hints != nil {
hints = &storage.SelectHints{
Start: query.Hints.StartMs,
End: query.Hints.EndMs,
Step: query.Hints.StepMs,
Func: query.Hints.Func,
Grouping: query.Hints.Grouping,
Range: query.Hints.RangeMs,
By: query.Hints.By,
}
}
return seriesHandleFn(querier, hints, filteredMatchers)
} }
func (api *API) deleteSeries(r *http.Request) apiFuncResult { func (api *API) deleteSeries(r *http.Request) apiFuncResult {

View file

@ -488,9 +488,9 @@ func setupRemote(s storage.Storage) *httptest.Server {
return return
} }
var selectParams *storage.SelectParams var hints *storage.SelectHints
if query.Hints != nil { if query.Hints != nil {
selectParams = &storage.SelectParams{ hints = &storage.SelectHints{
Start: query.Hints.StartMs, Start: query.Hints.StartMs,
End: query.Hints.EndMs, End: query.Hints.EndMs,
Step: query.Hints.StepMs, Step: query.Hints.StepMs,
@ -505,7 +505,7 @@ func setupRemote(s storage.Storage) *httptest.Server {
} }
defer querier.Close() defer querier.Close()
set, _, err := querier.Select(selectParams, matchers...) set, _, err := querier.Select(false, hints, matchers...)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -1615,7 +1615,7 @@ func TestSampledReadEndpoint(t *testing.T) {
matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e") matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e")
testutil.Ok(t, err) testutil.Ok(t, err)
query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"}) query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"})
testutil.Ok(t, err) testutil.Ok(t, err)
req := &prompb.ReadRequest{Queries: []*prompb.Query{query}} req := &prompb.ReadRequest{Queries: []*prompb.Query{query}}
@ -1714,7 +1714,7 @@ func TestStreamReadEndpoint(t *testing.T) {
matcher3, err := labels.NewMatcher(labels.MatchEqual, "foo", "bar1") matcher3, err := labels.NewMatcher(labels.MatchEqual, "foo", "bar1")
testutil.Ok(t, err) testutil.Ok(t, err)
query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{ query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{
Step: 1, Step: 1,
Func: "avg", Func: "avg",
Start: 0, Start: 0,
@ -1722,7 +1722,7 @@ func TestStreamReadEndpoint(t *testing.T) {
}) })
testutil.Ok(t, err) testutil.Ok(t, err)
query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectParams{ query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectHints{
Step: 1, Step: 1,
Func: "avg", Func: "avg",
Start: 0, Start: 0,

View file

@ -81,14 +81,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
vec := make(promql.Vector, 0, 8000) vec := make(promql.Vector, 0, 8000)
params := &storage.SelectParams{ hints := &storage.SelectHints{Start: mint, End: maxt}
Start: mint,
End: maxt,
}
var sets []storage.SeriesSet var sets []storage.SeriesSet
for _, mset := range matcherSets { for _, mset := range matcherSets {
s, wrns, err := q.Select(params, mset...) s, wrns, err := q.Select(false, hints, mset...)
if wrns != nil { if wrns != nil {
level.Debug(h.logger).Log("msg", "federation select returned warnings", "warnings", wrns) level.Debug(h.logger).Log("msg", "federation select returned warnings", "warnings", wrns)
federationWarnings.Add(float64(len(wrns))) federationWarnings.Add(float64(len(wrns)))