mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
storage: Removed SelectSorted method; Simplified interface; Added requirement for remote read to sort response.
This is technically BREAKING CHANGE, but it was like this from the beginning: I just notice that we rely in Prometheus on remote read being sorted. This is because we use selected data from remote reads in MergeSeriesSet which rely on sorting. I found during work on https://github.com/prometheus/prometheus/pull/5882 that we do so many repetitions because of this, for not good reason. I think I found a good balance between convenience and readability with just one method. Smaller the interface = better. Also I don't know what TestSelectSorted was testing, but now it's testing sorting. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
parent
bc703b6456
commit
fe802f29c9
|
@ -656,7 +656,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa
|
|||
parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
|
||||
var set storage.SeriesSet
|
||||
var wrn storage.Warnings
|
||||
params := &storage.SelectParams{
|
||||
hints := &storage.SelectHints{
|
||||
Start: timestamp.FromTime(s.Start),
|
||||
End: timestamp.FromTime(s.End),
|
||||
Step: durationToInt64Millis(s.Interval),
|
||||
|
@ -668,29 +668,29 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa
|
|||
// from end also.
|
||||
subqOffset := ng.cumulativeSubqueryOffset(path)
|
||||
offsetMilliseconds := durationMilliseconds(subqOffset)
|
||||
params.Start = params.Start - offsetMilliseconds
|
||||
hints.Start = hints.Start - offsetMilliseconds
|
||||
|
||||
switch n := node.(type) {
|
||||
case *parser.VectorSelector:
|
||||
if evalRange == 0 {
|
||||
params.Start = params.Start - durationMilliseconds(ng.lookbackDelta)
|
||||
hints.Start = hints.Start - durationMilliseconds(ng.lookbackDelta)
|
||||
} else {
|
||||
params.Range = durationMilliseconds(evalRange)
|
||||
hints.Range = durationMilliseconds(evalRange)
|
||||
// For all matrix queries we want to ensure that we have (end-start) + range selected
|
||||
// this way we have `range` data before the start time
|
||||
params.Start = params.Start - durationMilliseconds(evalRange)
|
||||
hints.Start = hints.Start - durationMilliseconds(evalRange)
|
||||
evalRange = 0
|
||||
}
|
||||
|
||||
params.Func = extractFuncFromPath(path)
|
||||
params.By, params.Grouping = extractGroupsFromPath(path)
|
||||
hints.Func = extractFuncFromPath(path)
|
||||
hints.By, hints.Grouping = extractGroupsFromPath(path)
|
||||
if n.Offset > 0 {
|
||||
offsetMilliseconds := durationMilliseconds(n.Offset)
|
||||
params.Start = params.Start - offsetMilliseconds
|
||||
params.End = params.End - offsetMilliseconds
|
||||
hints.Start = hints.Start - offsetMilliseconds
|
||||
hints.End = hints.End - offsetMilliseconds
|
||||
}
|
||||
|
||||
set, wrn, err = querier.Select(params, n.LabelMatchers...)
|
||||
set, wrn, err = querier.Select(false, hints, n.LabelMatchers...)
|
||||
warnings = append(warnings, wrn...)
|
||||
if err != nil {
|
||||
level.Error(ng.logger).Log("msg", "error selecting series set", "err", err)
|
||||
|
|
|
@ -175,15 +175,12 @@ type errQuerier struct {
|
|||
err error
|
||||
}
|
||||
|
||||
func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
func (q *errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return errSeriesSet{err: q.err}, nil, q.err
|
||||
}
|
||||
func (q *errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return errSeriesSet{err: q.err}, nil, q.err
|
||||
}
|
||||
func (*errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return nil, nil, nil }
|
||||
func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
|
||||
func (*errQuerier) Close() error { return nil }
|
||||
func (*errQuerier) LabelValues(string) ([]string, storage.Warnings, error) { return nil, nil, nil }
|
||||
func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
|
||||
func (*errQuerier) Close() error { return nil }
|
||||
|
||||
// errSeriesSet implements storage.SeriesSet which always returns error.
|
||||
type errSeriesSet struct {
|
||||
|
@ -224,9 +221,9 @@ func TestQueryError(t *testing.T) {
|
|||
testutil.Equals(t, errStorage, res.Err)
|
||||
}
|
||||
|
||||
// paramCheckerQuerier implements storage.Querier which checks the start and end times
|
||||
// in params.
|
||||
type paramCheckerQuerier struct {
|
||||
// hintCheckerQuerier implements storage.Querier which checks the start and end times
|
||||
// in hints.
|
||||
type hintCheckerQuerier struct {
|
||||
start int64
|
||||
end int64
|
||||
grouping []string
|
||||
|
@ -237,10 +234,7 @@ type paramCheckerQuerier struct {
|
|||
t *testing.T
|
||||
}
|
||||
|
||||
func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, m ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return q.SelectSorted(sp, m...)
|
||||
}
|
||||
func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
func (q *hintCheckerQuerier) Select(_ bool, sp *storage.SelectHints, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
testutil.Equals(q.t, q.start, sp.Start)
|
||||
testutil.Equals(q.t, q.end, sp.End)
|
||||
testutil.Equals(q.t, q.grouping, sp.Grouping)
|
||||
|
@ -250,11 +244,11 @@ func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*label
|
|||
|
||||
return errSeriesSet{err: nil}, nil, nil
|
||||
}
|
||||
func (*paramCheckerQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
|
||||
func (*hintCheckerQuerier) LabelValues(string) ([]string, storage.Warnings, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
func (*paramCheckerQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
|
||||
func (*paramCheckerQuerier) Close() error { return nil }
|
||||
func (*hintCheckerQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
|
||||
func (*hintCheckerQuerier) Close() error { return nil }
|
||||
|
||||
func TestParamsSetCorrectly(t *testing.T) {
|
||||
opts := EngineOpts{
|
||||
|
@ -435,7 +429,7 @@ func TestParamsSetCorrectly(t *testing.T) {
|
|||
for _, tc := range cases {
|
||||
engine := NewEngine(opts)
|
||||
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
return ¶mCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, grouping: tc.paramGrouping, by: tc.paramBy, selRange: tc.paramRange, function: tc.paramFunc, t: t}, nil
|
||||
return &hintCheckerQuerier{start: tc.paramStart * 1000, end: tc.paramEnd * 1000, grouping: tc.paramGrouping, by: tc.paramBy, selRange: tc.paramRange, function: tc.paramFunc, t: t}, nil
|
||||
})
|
||||
|
||||
var (
|
||||
|
|
|
@ -133,7 +133,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
|
|||
}
|
||||
|
||||
// Get the series for the matcher.
|
||||
ss, _, err := querier.Select(nil, matchers...)
|
||||
ss, _, err := querier.Select(false, nil, matchers...)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Assert(t, ss.Next(), "")
|
||||
storageSeries := ss.At()
|
||||
|
|
|
@ -705,7 +705,7 @@ func (g *Group) RestoreForState(ts time.Time) {
|
|||
matchers = append(matchers, mt)
|
||||
}
|
||||
|
||||
sset, err, _ := q.Select(nil, matchers...)
|
||||
sset, err, _ := q.Select(false, nil, matchers...)
|
||||
if err != nil {
|
||||
level.Error(g.logger).Log("msg", "Failed to restore 'for' state",
|
||||
labels.AlertName, alertRule.Name(), "stage", "Select", "err", err)
|
||||
|
|
|
@ -512,8 +512,8 @@ func TestForStateRestore(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestStaleness(t *testing.T) {
|
||||
storage := teststorage.New(t)
|
||||
defer storage.Close()
|
||||
st := teststorage.New(t)
|
||||
defer st.Close()
|
||||
engineOpts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
|
@ -522,9 +522,9 @@ func TestStaleness(t *testing.T) {
|
|||
}
|
||||
engine := promql.NewEngine(engineOpts)
|
||||
opts := &ManagerOptions{
|
||||
QueryFunc: EngineQueryFunc(engine, storage),
|
||||
Appendable: storage,
|
||||
TSDB: storage,
|
||||
QueryFunc: EngineQueryFunc(engine, st),
|
||||
Appendable: st,
|
||||
TSDB: st,
|
||||
Context: context.Background(),
|
||||
Logger: log.NewNopLogger(),
|
||||
}
|
||||
|
@ -541,7 +541,7 @@ func TestStaleness(t *testing.T) {
|
|||
})
|
||||
|
||||
// A time series that has two samples and then goes stale.
|
||||
app := storage.Appender()
|
||||
app := st.Appender()
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1)
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN))
|
||||
|
@ -556,14 +556,14 @@ func TestStaleness(t *testing.T) {
|
|||
group.Eval(ctx, time.Unix(1, 0))
|
||||
group.Eval(ctx, time.Unix(2, 0))
|
||||
|
||||
querier, err := storage.Querier(context.Background(), 0, 2000)
|
||||
querier, err := st.Querier(context.Background(), 0, 2000)
|
||||
testutil.Ok(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
|
||||
testutil.Ok(t, err)
|
||||
|
||||
set, _, err := querier.Select(nil, matcher)
|
||||
set, _, err := querier.Select(false, nil, matcher)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
samples, err := readSeriesSet(set)
|
||||
|
@ -658,8 +658,8 @@ func TestCopyState(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestDeletedRuleMarkedStale(t *testing.T) {
|
||||
storage := teststorage.New(t)
|
||||
defer storage.Close()
|
||||
st := teststorage.New(t)
|
||||
defer st.Close()
|
||||
oldGroup := &Group{
|
||||
rules: []Rule{
|
||||
NewRecordingRule("rule1", nil, labels.Labels{{Name: "l1", Value: "v1"}}),
|
||||
|
@ -672,21 +672,21 @@ func TestDeletedRuleMarkedStale(t *testing.T) {
|
|||
rules: []Rule{},
|
||||
seriesInPreviousEval: []map[string]labels.Labels{},
|
||||
opts: &ManagerOptions{
|
||||
Appendable: storage,
|
||||
Appendable: st,
|
||||
},
|
||||
}
|
||||
newGroup.CopyState(oldGroup)
|
||||
|
||||
newGroup.Eval(context.Background(), time.Unix(0, 0))
|
||||
|
||||
querier, err := storage.Querier(context.Background(), 0, 2000)
|
||||
querier, err := st.Querier(context.Background(), 0, 2000)
|
||||
testutil.Ok(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, "l1", "v1")
|
||||
testutil.Ok(t, err)
|
||||
|
||||
set, _, err := querier.Select(nil, matcher)
|
||||
set, _, err := querier.Select(false, nil, matcher)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
samples, err := readSeriesSet(set)
|
||||
|
@ -704,8 +704,8 @@ func TestUpdate(t *testing.T) {
|
|||
expected := map[string]labels.Labels{
|
||||
"test": labels.FromStrings("name", "value"),
|
||||
}
|
||||
storage := teststorage.New(t)
|
||||
defer storage.Close()
|
||||
st := teststorage.New(t)
|
||||
defer st.Close()
|
||||
opts := promql.EngineOpts{
|
||||
Logger: nil,
|
||||
Reg: nil,
|
||||
|
@ -714,9 +714,9 @@ func TestUpdate(t *testing.T) {
|
|||
}
|
||||
engine := promql.NewEngine(opts)
|
||||
ruleManager := NewManager(&ManagerOptions{
|
||||
Appendable: storage,
|
||||
TSDB: storage,
|
||||
QueryFunc: EngineQueryFunc(engine, storage),
|
||||
Appendable: st,
|
||||
TSDB: st,
|
||||
QueryFunc: EngineQueryFunc(engine, st),
|
||||
Context: context.Background(),
|
||||
Logger: log.NewNopLogger(),
|
||||
})
|
||||
|
@ -1096,16 +1096,16 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) {
|
|||
testutil.Equals(t, 0, countStaleNaN(t, storage), "invalid count of staleness markers after stopping the engine")
|
||||
}
|
||||
|
||||
func countStaleNaN(t *testing.T, storage storage.Storage) int {
|
||||
func countStaleNaN(t *testing.T, st storage.Storage) int {
|
||||
var c int
|
||||
querier, err := storage.Querier(context.Background(), 0, time.Now().Unix()*1000)
|
||||
querier, err := st.Querier(context.Background(), 0, time.Now().Unix()*1000)
|
||||
testutil.Ok(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2")
|
||||
testutil.Ok(t, err)
|
||||
|
||||
set, _, err := querier.Select(nil, matcher)
|
||||
set, _, err := querier.Select(false, nil, matcher)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
samples, err := readSeriesSet(set)
|
||||
|
|
|
@ -1571,7 +1571,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
|
|||
|
||||
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
|
||||
testutil.Ok(t, err)
|
||||
series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
series, _, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, false, series.Next(), "series found in tsdb")
|
||||
|
||||
|
@ -1581,7 +1581,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
|
|||
|
||||
q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0)
|
||||
testutil.Ok(t, err)
|
||||
series, _, err = q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
|
||||
series, _, err = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, true, series.Next(), "series not found in tsdb")
|
||||
testutil.Equals(t, false, series.Next(), "more than one series found in tsdb")
|
||||
|
@ -1617,7 +1617,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
|
|||
|
||||
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
|
||||
testutil.Ok(t, err)
|
||||
series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
series, _, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, false, series.Next(), "series found in tsdb")
|
||||
}
|
||||
|
|
|
@ -221,20 +221,16 @@ func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier {
|
|||
}
|
||||
|
||||
// Select returns a set of series that matches the given label matchers.
|
||||
func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
|
||||
if len(q.queriers) != 1 {
|
||||
// We need to sort for NewMergeSeriesSet to work.
|
||||
return q.SelectSorted(params, matchers...)
|
||||
func (q *mergeQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
|
||||
if len(q.queriers) == 1 {
|
||||
return q.queriers[0].Select(sortSeries, hints, matchers...)
|
||||
}
|
||||
return q.queriers[0].Select(params, matchers...)
|
||||
}
|
||||
|
||||
// SelectSorted returns a set of sorted series that matches the given label matchers.
|
||||
func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
|
||||
seriesSets := make([]SeriesSet, 0, len(q.queriers))
|
||||
var warnings Warnings
|
||||
|
||||
var priErr error = nil
|
||||
var (
|
||||
seriesSets = make([]SeriesSet, 0, len(q.queriers))
|
||||
warnings Warnings
|
||||
priErr error
|
||||
)
|
||||
type queryResult struct {
|
||||
qr Querier
|
||||
set SeriesSet
|
||||
|
@ -242,9 +238,11 @@ func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Ma
|
|||
selectError error
|
||||
}
|
||||
queryResultChan := make(chan *queryResult)
|
||||
|
||||
for _, querier := range q.queriers {
|
||||
go func(qr Querier) {
|
||||
set, wrn, err := qr.SelectSorted(params, matchers...)
|
||||
// We need to sort for NewMergeSeriesSet to work.
|
||||
set, wrn, err := qr.Select(true, hints, matchers...)
|
||||
queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err}
|
||||
}(querier)
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ func TestSelectSorted(t *testing.T) {
|
|||
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
|
||||
testutil.Ok(t, err)
|
||||
|
||||
seriesSet, _, err := querier.SelectSorted(nil, matcher)
|
||||
seriesSet, _, err := querier.Select(false, nil, matcher)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
result := make(map[int64]float64)
|
||||
|
|
|
@ -60,10 +60,9 @@ type Queryable interface {
|
|||
// time range.
|
||||
type Querier interface {
|
||||
// Select returns a set of series that matches the given label matchers.
|
||||
Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error)
|
||||
|
||||
// SelectSorted returns a sorted set of series that matches the given label matchers.
|
||||
SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error)
|
||||
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
|
||||
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
|
||||
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error)
|
||||
|
||||
// LabelValues returns all potential values for a label name.
|
||||
// It is not safe to use the strings beyond the lifefime of the querier.
|
||||
|
@ -76,8 +75,9 @@ type Querier interface {
|
|||
Close() error
|
||||
}
|
||||
|
||||
// SelectParams specifies parameters passed to data selections.
|
||||
type SelectParams struct {
|
||||
// SelectHints specifies hints passed for data selections.
|
||||
// This is used only as an option for implementation to use.
|
||||
type SelectHints struct {
|
||||
Start int64 // Start time in milliseconds for this select.
|
||||
End int64 // End time in milliseconds for this select.
|
||||
|
||||
|
|
|
@ -24,11 +24,7 @@ func NoopQuerier() Querier {
|
|||
return noopQuerier{}
|
||||
}
|
||||
|
||||
func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) {
|
||||
return NoopSeriesSet(), nil, nil
|
||||
}
|
||||
|
||||
func (noopQuerier) SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) {
|
||||
func (noopQuerier) Select(bool, *SelectHints, ...*labels.Matcher) (SeriesSet, Warnings, error) {
|
||||
return NoopSeriesSet(), nil, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -79,22 +79,22 @@ func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error
|
|||
}
|
||||
|
||||
// ToQuery builds a Query proto.
|
||||
func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams) (*prompb.Query, error) {
|
||||
func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHints) (*prompb.Query, error) {
|
||||
ms, err := toLabelMatchers(matchers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var rp *prompb.ReadHints
|
||||
if p != nil {
|
||||
if hints != nil {
|
||||
rp = &prompb.ReadHints{
|
||||
StepMs: p.Step,
|
||||
Func: p.Func,
|
||||
StartMs: p.Start,
|
||||
EndMs: p.End,
|
||||
Grouping: p.Grouping,
|
||||
By: p.By,
|
||||
RangeMs: p.Range,
|
||||
StartMs: hints.Start,
|
||||
EndMs: hints.End,
|
||||
StepMs: hints.Step,
|
||||
Func: hints.Func,
|
||||
Grouping: hints.Grouping,
|
||||
By: hints.By,
|
||||
RangeMs: hints.Range,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -145,7 +145,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
|
|||
}
|
||||
|
||||
// FromQueryResult unpacks and sorts a QueryResult proto.
|
||||
func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet {
|
||||
func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet {
|
||||
series := make([]storage.Series, 0, len(res.Timeseries))
|
||||
for _, ts := range res.Timeseries {
|
||||
labels := labelProtosToLabels(ts.Labels)
|
||||
|
@ -158,7 +158,10 @@ func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet {
|
|||
samples: ts.Samples,
|
||||
})
|
||||
}
|
||||
sort.Sort(byLabel(series))
|
||||
|
||||
if sortSeries {
|
||||
sort.Sort(byLabel(series))
|
||||
}
|
||||
return &concreteSeriesSet{
|
||||
series: series,
|
||||
}
|
||||
|
|
|
@ -178,7 +178,7 @@ func TestFromQueryResultWithDuplicates(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
series := FromQueryResult(&res)
|
||||
series := FromQueryResult(false, &res)
|
||||
|
||||
errSeries, isErrSeriesSet := series.(errSeriesSet)
|
||||
|
||||
|
|
|
@ -57,16 +57,9 @@ type querier struct {
|
|||
client *Client
|
||||
}
|
||||
|
||||
// Select implements storage.Querier and uses the given matchers to read series
|
||||
// sets from the Client.
|
||||
func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return q.SelectSorted(p, matchers...)
|
||||
}
|
||||
|
||||
// SelectSorted implements storage.Querier and uses the given matchers to read series
|
||||
// sets from the Client.
|
||||
func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
query, err := ToQuery(q.mint, q.maxt, matchers, p)
|
||||
// Select implements storage.Querier and uses the given matchers to read series sets from the Client.
|
||||
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
query, err := ToQuery(q.mint, q.maxt, matchers, hints)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -80,12 +73,11 @@ func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matc
|
|||
return nil, nil, fmt.Errorf("remote_read: %v", err)
|
||||
}
|
||||
|
||||
// FromQueryResult sorts.
|
||||
return FromQueryResult(res), nil, nil
|
||||
return FromQueryResult(sortSeries, res), nil, nil
|
||||
}
|
||||
|
||||
// LabelValues implements storage.Querier and is a noop.
|
||||
func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
|
||||
func (q *querier) LabelValues(string) ([]string, storage.Warnings, error) {
|
||||
// TODO implement?
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
@ -124,9 +116,9 @@ type externalLabelsQuerier struct {
|
|||
// Select adds equality matchers for all external labels to the list of matchers
|
||||
// before calling the wrapped storage.Queryable. The added external labels are
|
||||
// removed from the returned series sets.
|
||||
func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
func (q externalLabelsQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
m, added := q.addExternalLabels(matchers)
|
||||
s, warnings, err := q.Querier.Select(p, m...)
|
||||
s, warnings, err := q.Querier.Select(sortSeries, hints, m...)
|
||||
if err != nil {
|
||||
return nil, warnings, err
|
||||
}
|
||||
|
@ -177,7 +169,7 @@ type requiredMatchersQuerier struct {
|
|||
|
||||
// Select returns a NoopSeriesSet if the given matchers don't match the label
|
||||
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
|
||||
func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
func (q requiredMatchersQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
ms := q.requiredMatchers
|
||||
for _, m := range matchers {
|
||||
for i, r := range ms {
|
||||
|
@ -193,7 +185,7 @@ func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*la
|
|||
if len(ms) > 0 {
|
||||
return storage.NoopSeriesSet(), nil, nil
|
||||
}
|
||||
return q.Querier.Select(p, matchers...)
|
||||
return q.Querier.Select(sortSeries, hints, matchers...)
|
||||
}
|
||||
|
||||
// addExternalLabels adds matchers for each external label. External labels
|
||||
|
|
|
@ -117,7 +117,7 @@ func TestExternalLabelsQuerierSelect(t *testing.T) {
|
|||
},
|
||||
}
|
||||
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
|
||||
have, _, err := q.Select(nil, matchers...)
|
||||
have, _, err := q.Select(false, nil, matchers...)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
@ -219,7 +219,7 @@ func TestSeriesSetFilter(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, tc := range tests {
|
||||
filtered := newSeriesSetFilter(FromQueryResult(tc.in), tc.toRemove)
|
||||
filtered := newSeriesSetFilter(FromQueryResult(true, tc.in), tc.toRemove)
|
||||
have, err := ToQueryResult(filtered, 1e6)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -242,7 +242,7 @@ type mockSeriesSet struct {
|
|||
storage.SeriesSet
|
||||
}
|
||||
|
||||
func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
func (mockQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return mockSeriesSet{}, nil, nil
|
||||
}
|
||||
|
||||
|
@ -398,7 +398,7 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) {
|
|||
requiredMatchers: test.requiredMatchers,
|
||||
}
|
||||
|
||||
have, _, err := q.Select(nil, test.matchers...)
|
||||
have, _, err := q.Select(false, nil, test.matchers...)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
|
|
@ -203,7 +203,7 @@ func TestCorruptedChunk(t *testing.T) {
|
|||
querier, err := NewBlockQuerier(b, 0, 1)
|
||||
testutil.Ok(t, err)
|
||||
defer func() { testutil.Ok(t, querier.Close()) }()
|
||||
set, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
set, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
|
|
@ -616,7 +616,7 @@ func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) (err error) {
|
|||
err = merr.Err()
|
||||
}()
|
||||
|
||||
ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
|
||||
ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -67,7 +67,7 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB, close func()
|
|||
|
||||
// query runs a matcher query against the querier and fully expands its data.
|
||||
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]tsdbutil.Sample {
|
||||
ss, ws, err := q.Select(nil, matchers...)
|
||||
ss, ws, err := q.Select(false, nil, matchers...)
|
||||
defer func() {
|
||||
testutil.Ok(t, q.Close())
|
||||
}()
|
||||
|
@ -315,7 +315,7 @@ Outer:
|
|||
q, err := db.Querier(context.TODO(), 0, numSamples)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
@ -491,7 +491,7 @@ func TestDB_Snapshot(t *testing.T) {
|
|||
defer func() { testutil.Ok(t, querier.Close()) }()
|
||||
|
||||
// sum values
|
||||
seriesSet, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
@ -546,7 +546,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
|
|||
defer func() { testutil.Ok(t, querier.Close()) }()
|
||||
|
||||
// Sum values.
|
||||
seriesSet, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
@ -618,7 +618,7 @@ Outer:
|
|||
testutil.Ok(t, err)
|
||||
defer func() { testutil.Ok(t, q.Close()) }()
|
||||
|
||||
res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
@ -792,7 +792,7 @@ func TestDB_e2e(t *testing.T) {
|
|||
q, err := db.Querier(context.TODO(), mint, maxt)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
ss, ws, err := q.Select(nil, qry.ms...)
|
||||
ss, ws, err := q.Select(false, nil, qry.ms...)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
@ -951,7 +951,7 @@ func TestTombstoneClean(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer q.Close()
|
||||
|
||||
res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
@ -1289,7 +1289,7 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
|
|||
defer func() { testutil.Ok(t, q.Close()) }()
|
||||
|
||||
for _, c := range cases {
|
||||
ss, ws, err := q.Select(nil, c.selector...)
|
||||
ss, ws, err := q.Select(false, nil, c.selector...)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
@ -2486,7 +2486,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
|
|||
defer func() { testutil.Ok(t, querier.Close()) }()
|
||||
|
||||
// Sum the values.
|
||||
seriesSet, ws, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush"))
|
||||
seriesSet, ws, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
@ -2551,7 +2551,7 @@ func TestDBCannotSeePartialCommits(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss, _, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
testutil.Ok(t, err)
|
||||
|
||||
_, seriesSet, err := expandSeriesSet(ss)
|
||||
|
@ -2591,13 +2591,13 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
|
|||
defer querierAfterAddButBeforeCommit.Close()
|
||||
|
||||
// None of the queriers should return anything after the Add but before the commit.
|
||||
ss, _, err := querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss, _, err := querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
testutil.Ok(t, err)
|
||||
_, seriesSet, err := expandSeriesSet(ss)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, map[string][]sample{}, seriesSet)
|
||||
|
||||
ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss, _, err = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
testutil.Ok(t, err)
|
||||
_, seriesSet, err = expandSeriesSet(ss)
|
||||
testutil.Ok(t, err)
|
||||
|
@ -2608,14 +2608,14 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
|
||||
// Nothing returned for querier created before the Add.
|
||||
ss, _, err = querierBeforeAdd.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss, _, err = querierBeforeAdd.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
testutil.Ok(t, err)
|
||||
_, seriesSet, err = expandSeriesSet(ss)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, map[string][]sample{}, seriesSet)
|
||||
|
||||
// Series exists but has no samples for querier created after Add.
|
||||
ss, _, err = querierAfterAddButBeforeCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss, _, err = querierAfterAddButBeforeCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
testutil.Ok(t, err)
|
||||
_, seriesSet, err = expandSeriesSet(ss)
|
||||
testutil.Ok(t, err)
|
||||
|
@ -2626,7 +2626,7 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
|
|||
defer querierAfterCommit.Close()
|
||||
|
||||
// Samples are returned for querier created after Commit.
|
||||
ss, _, err = querierAfterCommit.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss, _, err = querierAfterCommit.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
testutil.Ok(t, err)
|
||||
_, seriesSet, err = expandSeriesSet(ss)
|
||||
testutil.Ok(t, err)
|
||||
|
|
|
@ -565,7 +565,7 @@ func TestHeadDeleteSimple(t *testing.T) {
|
|||
for _, h := range []*Head{head, reloadedHead} {
|
||||
q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())
|
||||
testutil.Ok(t, err)
|
||||
actSeriesSet, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))
|
||||
actSeriesSet, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
@ -612,7 +612,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
|
|||
// Test the series returns no samples. The series is cleared only after compaction.
|
||||
q, err := NewBlockQuerier(hb, 0, 100000)
|
||||
testutil.Ok(t, err)
|
||||
res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
res, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
testutil.Assert(t, res.Next(), "series is not present")
|
||||
|
@ -627,7 +627,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
|
|||
testutil.Ok(t, app.Commit())
|
||||
q, err = NewBlockQuerier(hb, 0, 100000)
|
||||
testutil.Ok(t, err)
|
||||
res, ws, err = q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
res, ws, err = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
testutil.Assert(t, res.Next(), "series don't exist")
|
||||
|
@ -803,7 +803,7 @@ func TestDelete_e2e(t *testing.T) {
|
|||
q, err := NewBlockQuerier(hb, 0, 100000)
|
||||
testutil.Ok(t, err)
|
||||
defer q.Close()
|
||||
ss, ws, err := q.SelectSorted(nil, del.ms...)
|
||||
ss, ws, err := q.Select(true, nil, del.ms...)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
// Build the mockSeriesSet.
|
||||
|
@ -1091,7 +1091,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer q.Close()
|
||||
|
||||
ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
|
||||
ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
@ -1119,7 +1119,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer q.Close()
|
||||
|
||||
ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
|
||||
ss, ws, err := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
@ -1403,7 +1403,7 @@ func TestHeadSeriesWithTimeBoundaries(t *testing.T) {
|
|||
|
||||
seriesCount := 0
|
||||
samplesCount := 0
|
||||
ss, _, err := q.Select(nil, matcher)
|
||||
ss, _, err := q.Select(false, nil, matcher)
|
||||
testutil.Ok(t, err)
|
||||
for ss.Next() {
|
||||
i := ss.At().Iterator()
|
||||
|
@ -1443,7 +1443,7 @@ func TestMemSeriesIsolation(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
defer querier.Close()
|
||||
|
||||
ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
ss, _, err := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
|
||||
testutil.Ok(t, err)
|
||||
|
||||
_, seriesSet, err := expandSeriesSet(ss)
|
||||
|
|
|
@ -85,23 +85,21 @@ func (q *querier) lvals(qs []storage.Querier, n string) ([]string, storage.Warni
|
|||
return mergeStrings(s1, s2), ws, nil
|
||||
}
|
||||
|
||||
func (q *querier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
if len(q.blocks) != 1 {
|
||||
return q.SelectSorted(p, ms...)
|
||||
}
|
||||
// Sorting Head series is slow, and unneeded when only the
|
||||
// Head is being queried. Sorting blocks is a noop.
|
||||
return q.blocks[0].Select(p, ms...)
|
||||
}
|
||||
|
||||
func (q *querier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
func (q *querier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
if len(q.blocks) == 0 {
|
||||
return storage.EmptySeriesSet(), nil, nil
|
||||
}
|
||||
if len(q.blocks) == 1 {
|
||||
return q.blocks[0].Select(sortSeries, hints, ms...)
|
||||
}
|
||||
|
||||
ss := make([]storage.SeriesSet, len(q.blocks))
|
||||
var ws storage.Warnings
|
||||
for i, b := range q.blocks {
|
||||
s, w, err := b.SelectSorted(p, ms...)
|
||||
// Sorting Head series is slow, and unneeded when only the
|
||||
// Head is being queried. Sorting blocks is a noop.
|
||||
// Still we have to sort if blocks > 1 as Merged Series requires.
|
||||
s, w, err := b.Select(true, hints, ms...)
|
||||
ws = append(ws, w...)
|
||||
if err != nil {
|
||||
return nil, ws, err
|
||||
|
@ -127,30 +125,26 @@ type verticalQuerier struct {
|
|||
querier
|
||||
}
|
||||
|
||||
func (q *verticalQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return q.sel(p, q.blocks, ms)
|
||||
func (q *verticalQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return q.sel(sortSeries, hints, q.blocks, ms)
|
||||
}
|
||||
|
||||
func (q *verticalQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return q.sel(p, q.blocks, ms)
|
||||
}
|
||||
|
||||
func (q *verticalQuerier) sel(p *storage.SelectParams, qs []storage.Querier, ms []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
func (q *verticalQuerier) sel(sortSeries bool, hints *storage.SelectHints, qs []storage.Querier, ms []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
if len(qs) == 0 {
|
||||
return storage.EmptySeriesSet(), nil, nil
|
||||
}
|
||||
if len(qs) == 1 {
|
||||
return qs[0].SelectSorted(p, ms...)
|
||||
return qs[0].Select(sortSeries, hints, ms...)
|
||||
}
|
||||
l := len(qs) / 2
|
||||
|
||||
var ws storage.Warnings
|
||||
a, w, err := q.sel(p, qs[:l], ms)
|
||||
a, w, err := q.sel(sortSeries, hints, qs[:l], ms)
|
||||
ws = append(ws, w...)
|
||||
if err != nil {
|
||||
return nil, ws, err
|
||||
}
|
||||
b, w, err := q.sel(p, qs[l:], ms)
|
||||
b, w, err := q.sel(sortSeries, hints, qs[l:], ms)
|
||||
ws = append(ws, w...)
|
||||
if err != nil {
|
||||
return nil, ws, err
|
||||
|
@ -195,42 +189,24 @@ type blockQuerier struct {
|
|||
mint, maxt int64
|
||||
}
|
||||
|
||||
func (q *blockQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
base, err := LookupChunkSeries(q.index, q.tombstones, ms...)
|
||||
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
var base storage.ChunkSeriesSet
|
||||
var err error
|
||||
|
||||
if sortSeries {
|
||||
base, err = LookupChunkSeriesSorted(q.index, q.tombstones, ms...)
|
||||
} else {
|
||||
base, err = LookupChunkSeries(q.index, q.tombstones, ms...)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
mint := q.mint
|
||||
maxt := q.maxt
|
||||
if p != nil {
|
||||
mint = p.Start
|
||||
maxt = p.End
|
||||
}
|
||||
return &blockSeriesSet{
|
||||
set: &populatedChunkSeries{
|
||||
set: base,
|
||||
chunks: q.chunks,
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
},
|
||||
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
}, nil, nil
|
||||
}
|
||||
|
||||
func (q *blockQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
base, err := LookupChunkSeriesSorted(q.index, q.tombstones, ms...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
mint := q.mint
|
||||
maxt := q.maxt
|
||||
if p != nil {
|
||||
mint = p.Start
|
||||
maxt = p.End
|
||||
if hints != nil {
|
||||
mint = hints.Start
|
||||
maxt = hints.End
|
||||
}
|
||||
return &blockSeriesSet{
|
||||
set: &populatedChunkSeries{
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/util/testutil"
|
||||
)
|
||||
|
||||
|
@ -147,12 +146,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
|
|||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
var ss storage.SeriesSet
|
||||
if sorted {
|
||||
ss, _, err = q.SelectSorted(nil, matcher)
|
||||
} else {
|
||||
ss, _, err = q.Select(nil, matcher)
|
||||
}
|
||||
ss, _, err := q.Select(sorted, nil, matcher)
|
||||
testutil.Ok(b, err)
|
||||
for ss.Next() {
|
||||
}
|
||||
|
|
|
@ -373,7 +373,7 @@ Outer:
|
|||
maxt: c.maxt,
|
||||
}
|
||||
|
||||
res, ws, err := querier.Select(nil, c.ms...)
|
||||
res, ws, err := querier.Select(false, nil, c.ms...)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
@ -536,7 +536,7 @@ Outer:
|
|||
maxt: c.maxt,
|
||||
}
|
||||
|
||||
res, ws, err := querier.Select(nil, c.ms...)
|
||||
res, ws, err := querier.Select(false, nil, c.ms...)
|
||||
testutil.Ok(t, err)
|
||||
testutil.Equals(t, 0, len(ws))
|
||||
|
||||
|
@ -1710,7 +1710,7 @@ func BenchmarkQuerySeek(b *testing.B) {
|
|||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
ss, ws, err := sq.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
ss, ws, err := sq.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
|
||||
for ss.Next() {
|
||||
it := ss.At().Iterator()
|
||||
for t := mint; t <= maxt; t++ {
|
||||
|
@ -1848,7 +1848,7 @@ func BenchmarkSetMatcher(b *testing.B) {
|
|||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
for n := 0; n < b.N; n++ {
|
||||
_, ws, err := que.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern))
|
||||
_, ws, err := que.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern))
|
||||
testutil.Ok(b, err)
|
||||
testutil.Equals(b, 0, len(ws))
|
||||
}
|
||||
|
@ -2297,7 +2297,7 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la
|
|||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ss, ws, err := q.Select(nil, selectors...)
|
||||
ss, ws, err := q.Select(false, nil, selectors...)
|
||||
testutil.Ok(b, err)
|
||||
testutil.Equals(b, 0, len(ws))
|
||||
var actualExpansions int
|
||||
|
|
|
@ -531,7 +531,7 @@ func (api *API) series(r *http.Request) apiFuncResult {
|
|||
var sets []storage.SeriesSet
|
||||
var warnings storage.Warnings
|
||||
for _, mset := range matcherSets {
|
||||
s, wrn, err := q.Select(nil, mset...) //TODO
|
||||
s, wrn, err := q.Select(false, nil, mset...)
|
||||
warnings = append(warnings, wrn...)
|
||||
if err != nil {
|
||||
return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil}
|
||||
|
@ -1161,10 +1161,9 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
for i, query := range req.Queries {
|
||||
err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error {
|
||||
// The streaming API provides sorted series.
|
||||
// TODO(bwplotka): Handle warnings via query log.
|
||||
set, _, err := querier.SelectSorted(selectParams, filteredMatchers...)
|
||||
err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error {
|
||||
// The streaming API has to provide the series sorted.
|
||||
set, _, err := querier.Select(true, hints, filteredMatchers...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1195,8 +1194,8 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
|||
Results: make([]*prompb.QueryResult, len(req.Queries)),
|
||||
}
|
||||
for i, query := range req.Queries {
|
||||
err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error {
|
||||
set, _, err := querier.Select(selectParams, filteredMatchers...)
|
||||
err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error {
|
||||
set, _, err := querier.Select(false, hints, filteredMatchers...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1254,7 +1253,7 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe
|
|||
return filteredMatchers, nil
|
||||
}
|
||||
|
||||
func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error) error {
|
||||
func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, hints *storage.SelectHints, filteredMatchers []*labels.Matcher) error) error {
|
||||
filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1264,23 +1263,25 @@ func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, extern
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var selectParams *storage.SelectParams
|
||||
if query.Hints != nil {
|
||||
selectParams = &storage.SelectParams{
|
||||
Start: query.Hints.StartMs,
|
||||
End: query.Hints.EndMs,
|
||||
Step: query.Hints.StepMs,
|
||||
Func: query.Hints.Func,
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := querier.Close(); err != nil {
|
||||
level.Warn(api.logger).Log("msg", "error on querier close", "err", err.Error())
|
||||
}
|
||||
}()
|
||||
return seriesHandleFn(querier, selectParams, filteredMatchers)
|
||||
|
||||
var hints *storage.SelectHints
|
||||
if query.Hints != nil {
|
||||
hints = &storage.SelectHints{
|
||||
Start: query.Hints.StartMs,
|
||||
End: query.Hints.EndMs,
|
||||
Step: query.Hints.StepMs,
|
||||
Func: query.Hints.Func,
|
||||
Grouping: query.Hints.Grouping,
|
||||
Range: query.Hints.RangeMs,
|
||||
By: query.Hints.By,
|
||||
}
|
||||
}
|
||||
return seriesHandleFn(querier, hints, filteredMatchers)
|
||||
}
|
||||
|
||||
func (api *API) deleteSeries(r *http.Request) apiFuncResult {
|
||||
|
|
|
@ -488,9 +488,9 @@ func setupRemote(s storage.Storage) *httptest.Server {
|
|||
return
|
||||
}
|
||||
|
||||
var selectParams *storage.SelectParams
|
||||
var hints *storage.SelectHints
|
||||
if query.Hints != nil {
|
||||
selectParams = &storage.SelectParams{
|
||||
hints = &storage.SelectHints{
|
||||
Start: query.Hints.StartMs,
|
||||
End: query.Hints.EndMs,
|
||||
Step: query.Hints.StepMs,
|
||||
|
@ -505,7 +505,7 @@ func setupRemote(s storage.Storage) *httptest.Server {
|
|||
}
|
||||
defer querier.Close()
|
||||
|
||||
set, _, err := querier.Select(selectParams, matchers...)
|
||||
set, _, err := querier.Select(false, hints, matchers...)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
|
@ -1615,7 +1615,7 @@ func TestSampledReadEndpoint(t *testing.T) {
|
|||
matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e")
|
||||
testutil.Ok(t, err)
|
||||
|
||||
query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"})
|
||||
query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{Step: 0, Func: "avg"})
|
||||
testutil.Ok(t, err)
|
||||
|
||||
req := &prompb.ReadRequest{Queries: []*prompb.Query{query}}
|
||||
|
@ -1714,7 +1714,7 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
matcher3, err := labels.NewMatcher(labels.MatchEqual, "foo", "bar1")
|
||||
testutil.Ok(t, err)
|
||||
|
||||
query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{
|
||||
query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectHints{
|
||||
Step: 1,
|
||||
Func: "avg",
|
||||
Start: 0,
|
||||
|
@ -1722,7 +1722,7 @@ func TestStreamReadEndpoint(t *testing.T) {
|
|||
})
|
||||
testutil.Ok(t, err)
|
||||
|
||||
query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectParams{
|
||||
query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectHints{
|
||||
Step: 1,
|
||||
Func: "avg",
|
||||
Start: 0,
|
||||
|
|
|
@ -81,14 +81,11 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
|
|||
|
||||
vec := make(promql.Vector, 0, 8000)
|
||||
|
||||
params := &storage.SelectParams{
|
||||
Start: mint,
|
||||
End: maxt,
|
||||
}
|
||||
hints := &storage.SelectHints{Start: mint, End: maxt}
|
||||
|
||||
var sets []storage.SeriesSet
|
||||
for _, mset := range matcherSets {
|
||||
s, wrns, err := q.Select(params, mset...)
|
||||
s, wrns, err := q.Select(false, hints, mset...)
|
||||
if wrns != nil {
|
||||
level.Debug(h.logger).Log("msg", "federation select returned warnings", "warnings", wrns)
|
||||
federationWarnings.Add(float64(len(wrns)))
|
||||
|
|
Loading…
Reference in a new issue