mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 22:37:27 -08:00
Merge pull request #6651 from prometheus/sort-block
Don't sort postings if we only have one block.
This commit is contained in:
commit
540dc7dfb0
|
@ -340,7 +340,7 @@ func (m *QueryResult) GetTimeseries() []*TimeSeries {
|
|||
// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS.
|
||||
// We strictly stream full series after series, optionally split by time. This means that a single frame can contain
|
||||
// partition of the single series, but once a new series is started to be streamed it means that no more chunks will
|
||||
// be sent for previous one.
|
||||
// be sent for previous one. Series are returned sorted in the same way TSDB block are internally.
|
||||
type ChunkedReadResponse struct {
|
||||
ChunkedSeries []*ChunkedSeries `protobuf:"bytes,1,rep,name=chunked_series,json=chunkedSeries,proto3" json:"chunked_series,omitempty"`
|
||||
// query_index represents an index of the query from ReadRequest.queries these chunks relates to.
|
||||
|
|
|
@ -73,7 +73,7 @@ message QueryResult {
|
|||
// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS.
|
||||
// We strictly stream full series after series, optionally split by time. This means that a single frame can contain
|
||||
// partition of the single series, but once a new series is started to be streamed it means that no more chunks will
|
||||
// be sent for previous one.
|
||||
// be sent for previous one. Series are returned sorted in the same way TSDB block are internally.
|
||||
message ChunkedReadResponse {
|
||||
repeated prometheus.ChunkedSeries chunked_series = 1;
|
||||
|
||||
|
|
|
@ -596,7 +596,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
|
|||
return nil, warnings, err
|
||||
}
|
||||
|
||||
// TODO(fabxc): order ensured by storage?
|
||||
// TODO(fabxc): where to ensure metric labels are a copy from the storage internals.
|
||||
sortSpanTimer, _ := query.stats.GetSpanTimer(ctx, stats.ResultSortTime, ng.metrics.queryResultSort)
|
||||
sort.Sort(mat)
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"errors"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -179,6 +180,9 @@ type errQuerier struct {
|
|||
func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return errSeriesSet{err: q.err}, nil, q.err
|
||||
}
|
||||
func (q *errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return errSeriesSet{err: q.err}, nil, q.err
|
||||
}
|
||||
func (*errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return nil, nil, nil }
|
||||
func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
|
||||
func (*errQuerier) Close() error { return nil }
|
||||
|
@ -236,7 +240,10 @@ type paramCheckerQuerier struct {
|
|||
t *testing.T
|
||||
}
|
||||
|
||||
func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, m ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return q.SelectSorted(sp, m...)
|
||||
}
|
||||
func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
testutil.Equals(q.t, q.start, sp.Start)
|
||||
testutil.Equals(q.t, q.end, sp.End)
|
||||
testutil.Equals(q.t, q.grouping, sp.Grouping)
|
||||
|
@ -1111,7 +1118,9 @@ func TestSubquerySelector(t *testing.T) {
|
|||
|
||||
res := qry.Exec(test.Context())
|
||||
testutil.Equals(t, c.Result.Err, res.Err)
|
||||
testutil.Equals(t, c.Result.Value, res.Value)
|
||||
mat := res.Value.(Matrix)
|
||||
sort.Sort(mat)
|
||||
testutil.Equals(t, c.Result.Value, mat)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -229,10 +229,19 @@ func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier {
|
|||
|
||||
// Select returns a set of series that matches the given label matchers.
|
||||
func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
|
||||
if len(q.queriers) != 1 {
|
||||
// We need to sort for NewMergeSeriesSet to work.
|
||||
return q.SelectSorted(params, matchers...)
|
||||
}
|
||||
return q.queriers[0].Select(params, matchers...)
|
||||
}
|
||||
|
||||
// SelectSorted returns a set of sorted series that matches the given label matchers.
|
||||
func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
|
||||
seriesSets := make([]SeriesSet, 0, len(q.queriers))
|
||||
var warnings Warnings
|
||||
for _, querier := range q.queriers {
|
||||
set, wrn, err := querier.Select(params, matchers...)
|
||||
set, wrn, err := querier.SelectSorted(params, matchers...)
|
||||
q.setQuerierMap[set] = querier
|
||||
if wrn != nil {
|
||||
warnings = append(warnings, wrn...)
|
||||
|
|
|
@ -54,6 +54,9 @@ type Querier interface {
|
|||
// Select returns a set of series that matches the given label matchers.
|
||||
Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error)
|
||||
|
||||
// SelectSorted returns a sorted set of series that matches the given label matchers.
|
||||
SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error)
|
||||
|
||||
// LabelValues returns all potential values for a label name.
|
||||
LabelValues(name string) ([]string, Warnings, error)
|
||||
|
||||
|
|
|
@ -30,6 +30,10 @@ func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warning
|
|||
return NoopSeriesSet(), nil, nil
|
||||
}
|
||||
|
||||
func (noopQuerier) SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) {
|
||||
return NoopSeriesSet(), nil, nil
|
||||
}
|
||||
|
||||
func (noopQuerier) LabelValues(name string) ([]string, Warnings, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
|
|
@ -144,7 +144,7 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
// FromQueryResult unpacks a QueryResult proto.
|
||||
// FromQueryResult unpacks and sorts a QueryResult proto.
|
||||
func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet {
|
||||
series := make([]storage.Series, 0, len(res.Timeseries))
|
||||
for _, ts := range res.Timeseries {
|
||||
|
|
|
@ -60,6 +60,12 @@ type querier struct {
|
|||
// Select implements storage.Querier and uses the given matchers to read series
|
||||
// sets from the Client.
|
||||
func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
return q.SelectSorted(p, matchers...)
|
||||
}
|
||||
|
||||
// SelectSorted implements storage.Querier and uses the given matchers to read series
|
||||
// sets from the Client.
|
||||
func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
query, err := ToQuery(q.mint, q.maxt, matchers, p)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -74,6 +80,7 @@ func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (
|
|||
return nil, nil, fmt.Errorf("remote_read: %v", err)
|
||||
}
|
||||
|
||||
// FromQueryResult sorts.
|
||||
return FromQueryResult(res), nil, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -250,6 +250,14 @@ func (q querier) Select(_ *storage.SelectParams, ms ...*labels.Matcher) (storage
|
|||
return seriesSet{set: set}, nil, nil
|
||||
}
|
||||
|
||||
func (q querier) SelectSorted(_ *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
|
||||
set, err := q.q.SelectSorted(ms...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return seriesSet{set: set}, nil, nil
|
||||
}
|
||||
|
||||
func (q querier) LabelValues(name string) ([]string, storage.Warnings, error) {
|
||||
v, err := q.q.LabelValues(name)
|
||||
return v, nil, err
|
||||
|
|
|
@ -790,7 +790,7 @@ func TestDelete_e2e(t *testing.T) {
|
|||
q, err := NewBlockQuerier(hb, 0, 100000)
|
||||
testutil.Ok(t, err)
|
||||
defer q.Close()
|
||||
ss, err := q.Select(del.ms...)
|
||||
ss, err := q.SelectSorted(del.ms...)
|
||||
testutil.Ok(t, err)
|
||||
// Build the mockSeriesSet.
|
||||
matchedSeries := make([]Series, 0, len(matched))
|
||||
|
|
|
@ -34,6 +34,9 @@ type Querier interface {
|
|||
// Select returns a set of series that matches the given label matchers.
|
||||
Select(...*labels.Matcher) (SeriesSet, error)
|
||||
|
||||
// SelectSorted returns a sorted set of series that matches the given label matcher.
|
||||
SelectSorted(...*labels.Matcher) (SeriesSet, error)
|
||||
|
||||
// LabelValues returns all potential values for a label name.
|
||||
// It is not safe to use the strings beyond the lifefime of the querier.
|
||||
LabelValues(string) ([]string, error)
|
||||
|
@ -106,14 +109,21 @@ func (q *querier) lvals(qs []Querier, n string) ([]string, error) {
|
|||
}
|
||||
|
||||
func (q *querier) Select(ms ...*labels.Matcher) (SeriesSet, error) {
|
||||
if len(q.blocks) != 1 {
|
||||
return q.SelectSorted(ms...)
|
||||
}
|
||||
// Sorting Head series is slow, and unneeded when only the
|
||||
// Head is being queried. Sorting blocks is a noop.
|
||||
return q.blocks[0].Select(ms...)
|
||||
}
|
||||
|
||||
func (q *querier) SelectSorted(ms ...*labels.Matcher) (SeriesSet, error) {
|
||||
if len(q.blocks) == 0 {
|
||||
return EmptySeriesSet(), nil
|
||||
}
|
||||
ss := make([]SeriesSet, len(q.blocks))
|
||||
var s SeriesSet
|
||||
var err error
|
||||
for i, b := range q.blocks {
|
||||
s, err = b.Select(ms...)
|
||||
s, err := b.SelectSorted(ms...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -142,12 +152,16 @@ func (q *verticalQuerier) Select(ms ...*labels.Matcher) (SeriesSet, error) {
|
|||
return q.sel(q.blocks, ms)
|
||||
}
|
||||
|
||||
func (q *verticalQuerier) SelectSorted(ms ...*labels.Matcher) (SeriesSet, error) {
|
||||
return q.sel(q.blocks, ms)
|
||||
}
|
||||
|
||||
func (q *verticalQuerier) sel(qs []Querier, ms []*labels.Matcher) (SeriesSet, error) {
|
||||
if len(qs) == 0 {
|
||||
return EmptySeriesSet(), nil
|
||||
}
|
||||
if len(qs) == 1 {
|
||||
return qs[0].Select(ms...)
|
||||
return qs[0].SelectSorted(ms...)
|
||||
}
|
||||
l := len(qs) / 2
|
||||
|
||||
|
@ -217,6 +231,24 @@ func (q *blockQuerier) Select(ms ...*labels.Matcher) (SeriesSet, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (q *blockQuerier) SelectSorted(ms ...*labels.Matcher) (SeriesSet, error) {
|
||||
base, err := LookupChunkSeriesSorted(q.index, q.tombstones, ms...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &blockSeriesSet{
|
||||
set: &populatedChunkSeries{
|
||||
set: base,
|
||||
chunks: q.chunks,
|
||||
mint: q.mint,
|
||||
maxt: q.maxt,
|
||||
},
|
||||
|
||||
mint: q.mint,
|
||||
maxt: q.maxt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (q *blockQuerier) LabelValues(name string) ([]string, error) {
|
||||
return q.index.LabelValues(name)
|
||||
}
|
||||
|
@ -299,7 +331,7 @@ func findSetMatches(pattern string) []string {
|
|||
}
|
||||
|
||||
// PostingsForMatchers assembles a single postings iterator against the index reader
|
||||
// based on the given matchers.
|
||||
// based on the given matchers. The resulting postings are not ordered by series.
|
||||
func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings, error) {
|
||||
var its, notIts []index.Postings
|
||||
// See which label must be non-empty.
|
||||
|
@ -379,7 +411,7 @@ func PostingsForMatchers(ix IndexReader, ms ...*labels.Matcher) (index.Postings,
|
|||
it = index.Without(it, n)
|
||||
}
|
||||
|
||||
return ix.SortedPostings(it), nil
|
||||
return it, nil
|
||||
}
|
||||
|
||||
func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, error) {
|
||||
|
@ -689,6 +721,16 @@ type baseChunkSeries struct {
|
|||
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
|
||||
// over them. It drops chunks based on tombstones in the given reader.
|
||||
func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (ChunkSeriesSet, error) {
|
||||
return lookupChunkSeries(false, ir, tr, ms...)
|
||||
}
|
||||
|
||||
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
|
||||
// over them. It drops chunks based on tombstones in the given reader. Series will be in order.
|
||||
func LookupChunkSeriesSorted(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (ChunkSeriesSet, error) {
|
||||
return lookupChunkSeries(true, ir, tr, ms...)
|
||||
}
|
||||
|
||||
func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (ChunkSeriesSet, error) {
|
||||
if tr == nil {
|
||||
tr = tombstones.NewMemTombstones()
|
||||
}
|
||||
|
@ -696,6 +738,9 @@ func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Match
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if sorted {
|
||||
p = ir.SortedPostings(p)
|
||||
}
|
||||
return &baseChunkSeries{
|
||||
p: p,
|
||||
index: ir,
|
||||
|
|
|
@ -136,7 +136,7 @@ func BenchmarkQuerierSelect(b *testing.B) {
|
|||
}
|
||||
testutil.Ok(b, app.Commit())
|
||||
|
||||
bench := func(b *testing.B, br BlockReader) {
|
||||
bench := func(b *testing.B, br BlockReader, sorted bool) {
|
||||
matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")
|
||||
for s := 1; s <= numSeries; s *= 10 {
|
||||
b.Run(fmt.Sprintf("%dof%d", s, numSeries), func(b *testing.B) {
|
||||
|
@ -145,7 +145,12 @@ func BenchmarkQuerierSelect(b *testing.B) {
|
|||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ss, err := q.Select(matcher)
|
||||
var ss SeriesSet
|
||||
if sorted {
|
||||
ss, err = q.SelectSorted(matcher)
|
||||
} else {
|
||||
ss, err = q.Select(matcher)
|
||||
}
|
||||
testutil.Ok(b, err)
|
||||
for ss.Next() {
|
||||
}
|
||||
|
@ -157,7 +162,10 @@ func BenchmarkQuerierSelect(b *testing.B) {
|
|||
}
|
||||
|
||||
b.Run("Head", func(b *testing.B) {
|
||||
bench(b, h)
|
||||
bench(b, h, false)
|
||||
})
|
||||
b.Run("SortedHead", func(b *testing.B) {
|
||||
bench(b, h, true)
|
||||
})
|
||||
|
||||
tmpdir, err := ioutil.TempDir("", "test_benchquerierselect")
|
||||
|
@ -174,6 +182,6 @@ func BenchmarkQuerierSelect(b *testing.B) {
|
|||
}()
|
||||
|
||||
b.Run("Block", func(b *testing.B) {
|
||||
bench(b, block)
|
||||
bench(b, block, false)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1121,7 +1121,12 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
for i, query := range req.Queries {
|
||||
err := api.remoteReadQuery(ctx, query, externalLabels, func(set storage.SeriesSet) error {
|
||||
err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error {
|
||||
// The streaming API provides sorted series.
|
||||
set, _, err := querier.SelectSorted(selectParams, filteredMatchers...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return remote.StreamChunkedReadResponses(
|
||||
remote.NewChunkedWriter(w, f),
|
||||
|
@ -1149,7 +1154,11 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
|||
Results: make([]*prompb.QueryResult, len(req.Queries)),
|
||||
}
|
||||
for i, query := range req.Queries {
|
||||
err := api.remoteReadQuery(ctx, query, externalLabels, func(set storage.SeriesSet) error {
|
||||
err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error {
|
||||
set, _, err := querier.Select(selectParams, filteredMatchers...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp.Results[i], err = remote.ToQueryResult(set, api.remoteReadSampleLimit)
|
||||
if err != nil {
|
||||
|
@ -1204,7 +1213,7 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe
|
|||
return filteredMatchers, nil
|
||||
}
|
||||
|
||||
func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(set storage.SeriesSet) error) error {
|
||||
func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error) error {
|
||||
filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1231,11 +1240,7 @@ func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, extern
|
|||
}
|
||||
}()
|
||||
|
||||
set, _, err := querier.Select(selectParams, filteredMatchers...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return seriesHandleFn(set)
|
||||
return seriesHandleFn(querier, selectParams, filteredMatchers)
|
||||
}
|
||||
|
||||
func (api *API) deleteSeries(r *http.Request) apiFuncResult {
|
||||
|
|
Loading…
Reference in a new issue