mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Hotfix missing sorting of head block series index
This commit is contained in:
parent
ec99f99d3d
commit
40cf215fba
58
querier.go
58
querier.go
|
@ -6,6 +6,7 @@ import (
|
|||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/bradfitz/slice"
|
||||
"github.com/fabxc/tsdb/chunks"
|
||||
"github.com/fabxc/tsdb/labels"
|
||||
)
|
||||
|
@ -237,7 +238,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
|
|||
its = append(its, q.selectSingle(m))
|
||||
}
|
||||
|
||||
return &blockSeriesSet{
|
||||
set := &blockSeriesSet{
|
||||
index: q.index,
|
||||
chunks: q.series,
|
||||
it: Intersect(its...),
|
||||
|
@ -245,6 +246,28 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
|
|||
mint: q.mint,
|
||||
maxt: q.maxt,
|
||||
}
|
||||
// TODO(fabxc): the head block indexes new series in order they come in.
|
||||
// SeriesSets are expected to emit labels in order of their label sets.
|
||||
// We expand the set and sort it for now. This is not a scalable approach
|
||||
// however, and the head block should re-sort itself eventually.
|
||||
// This comes with an initial cost as long as new series come in but should
|
||||
// flatten out quickly after a warump.
|
||||
// When cutting new head blocks, the index would ideally be transferred to
|
||||
// the new head.
|
||||
var all []Series
|
||||
for set.Next() {
|
||||
all = append(all, set.At())
|
||||
}
|
||||
if set.Err() != nil {
|
||||
return errSeriesSet{err: set.Err()}
|
||||
}
|
||||
slice.Sort(all, func(i, j int) bool {
|
||||
return labels.Compare(all[i].Labels(), all[j].Labels()) < 0
|
||||
})
|
||||
|
||||
// TODO(fabxc): additionally bad because this static set uses function pointers
|
||||
// in a mock series set.
|
||||
return newListSeriesSet(all)
|
||||
}
|
||||
|
||||
func (q *blockQuerier) selectSingle(m labels.Matcher) Postings {
|
||||
|
@ -390,6 +413,7 @@ func (s *shardSeriesSet) Next() bool {
|
|||
}
|
||||
|
||||
d := s.compare()
|
||||
|
||||
// Both sets contain the current series. Chain them into a single one.
|
||||
if d > 0 {
|
||||
s.cur = s.b.At()
|
||||
|
@ -851,3 +875,35 @@ func (r *sampleRing) samples() []sample {
|
|||
|
||||
return res
|
||||
}
|
||||
|
||||
type mockSeriesSet struct {
|
||||
next func() bool
|
||||
series func() Series
|
||||
err func() error
|
||||
}
|
||||
|
||||
func (m *mockSeriesSet) Next() bool { return m.next() }
|
||||
func (m *mockSeriesSet) At() Series { return m.series() }
|
||||
func (m *mockSeriesSet) Err() error { return m.err() }
|
||||
|
||||
func newListSeriesSet(list []Series) *mockSeriesSet {
|
||||
i := -1
|
||||
return &mockSeriesSet{
|
||||
next: func() bool {
|
||||
i++
|
||||
return i < len(list)
|
||||
},
|
||||
series: func() Series {
|
||||
return list[i]
|
||||
},
|
||||
err: func() error { return nil },
|
||||
}
|
||||
}
|
||||
|
||||
type errSeriesSet struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (s errSeriesSet) Next() bool { return false }
|
||||
func (s errSeriesSet) At() Series { return nil }
|
||||
func (s errSeriesSet) Err() error { return s.err }
|
||||
|
|
|
@ -65,30 +65,6 @@ func (it *listSeriesIterator) Err() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type mockSeriesSet struct {
|
||||
next func() bool
|
||||
series func() Series
|
||||
err func() error
|
||||
}
|
||||
|
||||
func (m *mockSeriesSet) Next() bool { return m.next() }
|
||||
func (m *mockSeriesSet) At() Series { return m.series() }
|
||||
func (m *mockSeriesSet) Err() error { return m.err() }
|
||||
|
||||
func newListSeriesSet(list []Series) *mockSeriesSet {
|
||||
i := -1
|
||||
return &mockSeriesSet{
|
||||
next: func() bool {
|
||||
i++
|
||||
return i < len(list)
|
||||
},
|
||||
series: func() Series {
|
||||
return list[i]
|
||||
},
|
||||
err: func() error { return nil },
|
||||
}
|
||||
}
|
||||
|
||||
func TestShardSeriesSet(t *testing.T) {
|
||||
newSeries := func(l map[string]string, s []sample) Series {
|
||||
return &mockSeries{
|
||||
|
|
Loading…
Reference in a new issue