[tsdb] Improve mergeSeriesSet (#5920)

* refactor and simplify mergeSeriesSet

Signed-off-by: naivewong <867245430@qq.com>
This commit is contained in:
naivewong 2019-11-15 22:45:29 +08:00 committed by Krasimir Georgiev
parent 5973227434
commit 23c0299d85
2 changed files with 107 additions and 70 deletions

View file

@ -113,27 +113,21 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) {
} }
func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) { func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) {
return q.sel(q.blocks, ms) if len(q.blocks) == 0 {
}
func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) {
if len(qs) == 0 {
return EmptySeriesSet(), nil return EmptySeriesSet(), nil
} }
if len(qs) == 1 { ss := make([]SeriesSet, len(q.blocks))
return qs[0].Select(ms...) var s SeriesSet
var err error
for i, b := range q.blocks {
s, err = b.Select(ms...)
if err != nil {
return nil, err
}
ss[i] = s
} }
l := len(qs) / 2
a, err := q.sel(qs[:l], ms) return NewMergedSeriesSet(ss), nil
if err != nil {
return nil, err
}
b, err := q.sel(qs[l:], ms)
if err != nil {
return nil, err
}
return newMergedSeriesSet(a, b), nil
} }
func (q *querier) Close() error { func (q *querier) Close() error {
@ -532,29 +526,28 @@ func EmptySeriesSet() SeriesSet {
return emptySeriesSet return emptySeriesSet
} }
// mergedSeriesSet takes two series sets as a single series set. The input series sets // mergedSeriesSet returns a series sets slice as a single series set. The input series sets
// must be sorted and sequential in time, i.e. if they have the same label set, // must be sorted and sequential in time.
// the datapoints of a must be before the datapoints of b.
type mergedSeriesSet struct { type mergedSeriesSet struct {
a, b SeriesSet all []SeriesSet
buf []SeriesSet // A buffer for keeping the order of SeriesSet slice during forwarding the SeriesSet.
cur Series ids []int // The indices of chosen SeriesSet for the current run.
adone, bdone bool done bool
err error
cur Series
} }
// NewMergedSeriesSet takes two series sets as a single series set. The input series sets func NewMergedSeriesSet(all []SeriesSet) SeriesSet {
// must be sorted and sequential in time, i.e. if they have the same label set, if len(all) == 1 {
// the datapoints of a must be before the datapoints of b. return all[0]
func NewMergedSeriesSet(a, b SeriesSet) SeriesSet { }
return newMergedSeriesSet(a, b) s := &mergedSeriesSet{all: all}
} // Initialize first elements of all sets as Next() needs
func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet {
s := &mergedSeriesSet{a: a, b: b}
// Initialize first elements of both sets as Next() needs
// one element look-ahead. // one element look-ahead.
s.adone = !s.a.Next() s.nextAll()
s.bdone = !s.b.Next() if len(s.all) == 0 {
s.done = true
}
return s return s
} }
@ -564,40 +557,93 @@ func (s *mergedSeriesSet) At() Series {
} }
func (s *mergedSeriesSet) Err() error { func (s *mergedSeriesSet) Err() error {
if s.a.Err() != nil { return s.err
return s.a.Err()
}
return s.b.Err()
} }
func (s *mergedSeriesSet) compare() int { // nextAll is to call Next() for all SeriesSet.
if s.adone { // Because the order of the SeriesSet slice will affect the results,
return 1 // we need to use an buffer slice to hold the order.
func (s *mergedSeriesSet) nextAll() {
s.buf = s.buf[:0]
for _, ss := range s.all {
if ss.Next() {
s.buf = append(s.buf, ss)
} else if ss.Err() != nil {
s.done = true
s.err = ss.Err()
break
}
} }
if s.bdone { s.all, s.buf = s.buf, s.all
return -1 }
// nextWithID is to call Next() for the SeriesSet with the indices of s.ids.
// Because the order of the SeriesSet slice will affect the results,
// we need to use an buffer slice to hold the order.
func (s *mergedSeriesSet) nextWithID() {
if len(s.ids) == 0 {
return
} }
return labels.Compare(s.a.At().Labels(), s.b.At().Labels())
s.buf = s.buf[:0]
i1 := 0
i2 := 0
for i1 < len(s.all) {
if i2 < len(s.ids) && i1 == s.ids[i2] {
if !s.all[s.ids[i2]].Next() {
if s.all[s.ids[i2]].Err() != nil {
s.done = true
s.err = s.all[s.ids[i2]].Err()
break
}
i2++
i1++
continue
}
i2++
}
s.buf = append(s.buf, s.all[i1])
i1++
}
s.all, s.buf = s.buf, s.all
} }
func (s *mergedSeriesSet) Next() bool { func (s *mergedSeriesSet) Next() bool {
if s.adone && s.bdone || s.Err() != nil { if s.done {
return false return false
} }
d := s.compare() s.nextWithID()
if s.done {
return false
}
s.ids = s.ids[:0]
if len(s.all) == 0 {
s.done = true
return false
}
// Both sets contain the current series. Chain them into a single one. // Here we are looking for a set of series sets with the lowest labels,
if d > 0 { // and we will cache their indexes in s.ids.
s.cur = s.b.At() s.ids = append(s.ids, 0)
s.bdone = !s.b.Next() for i := 1; i < len(s.all); i++ {
} else if d < 0 { cmp := labels.Compare(s.all[s.ids[0]].At().Labels(), s.all[i].At().Labels())
s.cur = s.a.At() if cmp > 0 {
s.adone = !s.a.Next() s.ids = s.ids[:1]
s.ids[0] = i
} else if cmp == 0 {
s.ids = append(s.ids, i)
}
}
if len(s.ids) > 1 {
series := make([]Series, len(s.ids))
for i, idx := range s.ids {
series[i] = s.all[idx].At()
}
s.cur = &chainedSeries{series: series}
} else { } else {
s.cur = &chainedSeries{series: []Series{s.a.At(), s.b.At()}} s.cur = s.all[s.ids[0]].At()
s.adone = !s.a.Next()
s.bdone = !s.b.Next()
} }
return true return true
} }

View file

@ -157,7 +157,7 @@ func TestMergedSeriesSet(t *testing.T) {
Outer: Outer:
for _, c := range cases { for _, c := range cases {
res := newMergedSeriesSet(c.a, c.b) res := NewMergedSeriesSet([]SeriesSet{c.a, c.b})
for { for {
eok, rok := c.exp.Next(), res.Next() eok, rok := c.exp.Next(), res.Next()
@ -1172,17 +1172,8 @@ func (m *mockChunkSeriesSet) Err() error {
// Test the cost of merging series sets for different number of merged sets and their size. // Test the cost of merging series sets for different number of merged sets and their size.
// The subset are all equivalent so this does not capture merging of partial or non-overlapping sets well. // The subset are all equivalent so this does not capture merging of partial or non-overlapping sets well.
func BenchmarkMergedSeriesSet(b *testing.B) { func BenchmarkMergedSeriesSet(b *testing.B) {
var sel func(sets []SeriesSet) SeriesSet var sel = func(sets []SeriesSet) SeriesSet {
return NewMergedSeriesSet(sets)
sel = func(sets []SeriesSet) SeriesSet {
if len(sets) == 0 {
return EmptySeriesSet()
}
if len(sets) == 1 {
return sets[0]
}
l := len(sets) / 2
return newMergedSeriesSet(sel(sets[:l]), sel(sets[l:]))
} }
for _, k := range []int{ for _, k := range []int{