diff --git a/tsdb/querier.go b/tsdb/querier.go index 672a85b771..4672cba9b3 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -113,27 +113,21 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { } func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) { - return q.sel(q.blocks, ms) -} - -func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) { - if len(qs) == 0 { + if len(q.blocks) == 0 { return EmptySeriesSet(), nil } - if len(qs) == 1 { - return qs[0].Select(ms...) + ss := make([]SeriesSet, len(q.blocks)) + var s SeriesSet + var err error + for i, b := range q.blocks { + s, err = b.Select(ms...) + if err != nil { + return nil, err + } + ss[i] = s } - l := len(qs) / 2 - a, err := q.sel(qs[:l], ms) - if err != nil { - return nil, err - } - b, err := q.sel(qs[l:], ms) - if err != nil { - return nil, err - } - return newMergedSeriesSet(a, b), nil + return NewMergedSeriesSet(ss), nil } func (q *querier) Close() error { @@ -532,29 +526,28 @@ func EmptySeriesSet() SeriesSet { return emptySeriesSet } -// mergedSeriesSet takes two series sets as a single series set. The input series sets -// must be sorted and sequential in time, i.e. if they have the same label set, -// the datapoints of a must be before the datapoints of b. +// mergedSeriesSet returns a series sets slice as a single series set. The input series sets +// must be sorted and sequential in time. type mergedSeriesSet struct { - a, b SeriesSet - - cur Series - adone, bdone bool + all []SeriesSet + buf []SeriesSet // A buffer for keeping the order of SeriesSet slice during forwarding the SeriesSet. + ids []int // The indices of chosen SeriesSet for the current run. + done bool + err error + cur Series } -// NewMergedSeriesSet takes two series sets as a single series set. The input series sets -// must be sorted and sequential in time, i.e. if they have the same label set, -// the datapoints of a must be before the datapoints of b. -func NewMergedSeriesSet(a, b SeriesSet) SeriesSet { - return newMergedSeriesSet(a, b) -} - -func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet { - s := &mergedSeriesSet{a: a, b: b} - // Initialize first elements of both sets as Next() needs +func NewMergedSeriesSet(all []SeriesSet) SeriesSet { + if len(all) == 1 { + return all[0] + } + s := &mergedSeriesSet{all: all} + // Initialize first elements of all sets as Next() needs // one element look-ahead. - s.adone = !s.a.Next() - s.bdone = !s.b.Next() + s.nextAll() + if len(s.all) == 0 { + s.done = true + } return s } @@ -564,40 +557,93 @@ func (s *mergedSeriesSet) At() Series { } func (s *mergedSeriesSet) Err() error { - if s.a.Err() != nil { - return s.a.Err() - } - return s.b.Err() + return s.err } -func (s *mergedSeriesSet) compare() int { - if s.adone { - return 1 +// nextAll is to call Next() for all SeriesSet. +// Because the order of the SeriesSet slice will affect the results, +// we need to use an buffer slice to hold the order. +func (s *mergedSeriesSet) nextAll() { + s.buf = s.buf[:0] + for _, ss := range s.all { + if ss.Next() { + s.buf = append(s.buf, ss) + } else if ss.Err() != nil { + s.done = true + s.err = ss.Err() + break + } } - if s.bdone { - return -1 + s.all, s.buf = s.buf, s.all +} + +// nextWithID is to call Next() for the SeriesSet with the indices of s.ids. +// Because the order of the SeriesSet slice will affect the results, +// we need to use an buffer slice to hold the order. +func (s *mergedSeriesSet) nextWithID() { + if len(s.ids) == 0 { + return } - return labels.Compare(s.a.At().Labels(), s.b.At().Labels()) + + s.buf = s.buf[:0] + i1 := 0 + i2 := 0 + for i1 < len(s.all) { + if i2 < len(s.ids) && i1 == s.ids[i2] { + if !s.all[s.ids[i2]].Next() { + if s.all[s.ids[i2]].Err() != nil { + s.done = true + s.err = s.all[s.ids[i2]].Err() + break + } + i2++ + i1++ + continue + } + i2++ + } + s.buf = append(s.buf, s.all[i1]) + i1++ + } + s.all, s.buf = s.buf, s.all } func (s *mergedSeriesSet) Next() bool { - if s.adone && s.bdone || s.Err() != nil { + if s.done { return false } - d := s.compare() + s.nextWithID() + if s.done { + return false + } + s.ids = s.ids[:0] + if len(s.all) == 0 { + s.done = true + return false + } - // Both sets contain the current series. Chain them into a single one. - if d > 0 { - s.cur = s.b.At() - s.bdone = !s.b.Next() - } else if d < 0 { - s.cur = s.a.At() - s.adone = !s.a.Next() + // Here we are looking for a set of series sets with the lowest labels, + // and we will cache their indexes in s.ids. + s.ids = append(s.ids, 0) + for i := 1; i < len(s.all); i++ { + cmp := labels.Compare(s.all[s.ids[0]].At().Labels(), s.all[i].At().Labels()) + if cmp > 0 { + s.ids = s.ids[:1] + s.ids[0] = i + } else if cmp == 0 { + s.ids = append(s.ids, i) + } + } + + if len(s.ids) > 1 { + series := make([]Series, len(s.ids)) + for i, idx := range s.ids { + series[i] = s.all[idx].At() + } + s.cur = &chainedSeries{series: series} } else { - s.cur = &chainedSeries{series: []Series{s.a.At(), s.b.At()}} - s.adone = !s.a.Next() - s.bdone = !s.b.Next() + s.cur = s.all[s.ids[0]].At() } return true } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index aba3418b1b..e1cda90157 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -157,7 +157,7 @@ func TestMergedSeriesSet(t *testing.T) { Outer: for _, c := range cases { - res := newMergedSeriesSet(c.a, c.b) + res := NewMergedSeriesSet([]SeriesSet{c.a, c.b}) for { eok, rok := c.exp.Next(), res.Next() @@ -1172,17 +1172,8 @@ func (m *mockChunkSeriesSet) Err() error { // Test the cost of merging series sets for different number of merged sets and their size. // The subset are all equivalent so this does not capture merging of partial or non-overlapping sets well. func BenchmarkMergedSeriesSet(b *testing.B) { - var sel func(sets []SeriesSet) SeriesSet - - sel = func(sets []SeriesSet) SeriesSet { - if len(sets) == 0 { - return EmptySeriesSet() - } - if len(sets) == 1 { - return sets[0] - } - l := len(sets) / 2 - return newMergedSeriesSet(sel(sets[:l]), sel(sets[l:])) + var sel = func(sets []SeriesSet) SeriesSet { + return NewMergedSeriesSet(sets) } for _, k := range []int{