mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Merge pull request #13434 from bboreham/elide-queriers
storage: don't wrap single querier in merge-queriers
This commit is contained in:
commit
f24ce00320
|
@ -45,9 +45,15 @@ type mergeGenericQuerier struct {
|
||||||
//
|
//
|
||||||
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
|
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
|
||||||
func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
|
func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
|
||||||
if len(primaries)+len(secondaries) == 0 {
|
switch {
|
||||||
return NoopQuerier()
|
case len(primaries)+len(secondaries) == 0:
|
||||||
|
return noopQuerier{}
|
||||||
|
case len(primaries) == 1 && len(secondaries) == 0:
|
||||||
|
return primaries[0]
|
||||||
|
case len(primaries) == 0 && len(secondaries) == 1:
|
||||||
|
return secondaries[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
|
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
|
||||||
for _, q := range primaries {
|
for _, q := range primaries {
|
||||||
if _, ok := q.(noopQuerier); !ok && q != nil {
|
if _, ok := q.(noopQuerier); !ok && q != nil {
|
||||||
|
@ -77,6 +83,15 @@ func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMer
|
||||||
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
|
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
|
||||||
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
|
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
|
||||||
func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
|
func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
|
||||||
|
switch {
|
||||||
|
case len(primaries) == 0 && len(secondaries) == 0:
|
||||||
|
return noopChunkQuerier{}
|
||||||
|
case len(primaries) == 1 && len(secondaries) == 0:
|
||||||
|
return primaries[0]
|
||||||
|
case len(primaries) == 0 && len(secondaries) == 1:
|
||||||
|
return secondaries[0]
|
||||||
|
}
|
||||||
|
|
||||||
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
|
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
|
||||||
for _, q := range primaries {
|
for _, q := range primaries {
|
||||||
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
|
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
|
||||||
|
@ -102,13 +117,6 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica
|
||||||
|
|
||||||
// Select returns a set of series that matches the given label matchers.
|
// Select returns a set of series that matches the given label matchers.
|
||||||
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
|
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
|
||||||
if len(q.queriers) == 0 {
|
|
||||||
return noopGenericSeriesSet{}
|
|
||||||
}
|
|
||||||
if len(q.queriers) == 1 {
|
|
||||||
return q.queriers[0].Select(ctx, sortSeries, hints, matchers...)
|
|
||||||
}
|
|
||||||
|
|
||||||
seriesSets := make([]genericSeriesSet, 0, len(q.queriers))
|
seriesSets := make([]genericSeriesSet, 0, len(q.queriers))
|
||||||
if !q.concurrentSelect {
|
if !q.concurrentSelect {
|
||||||
for _, querier := range q.queriers {
|
for _, querier := range q.queriers {
|
||||||
|
|
|
@ -180,9 +180,9 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
var p Querier
|
var p []Querier
|
||||||
if tc.primaryQuerierSeries != nil {
|
if tc.primaryQuerierSeries != nil {
|
||||||
p = &mockQuerier{toReturn: tc.primaryQuerierSeries}
|
p = append(p, &mockQuerier{toReturn: tc.primaryQuerierSeries})
|
||||||
}
|
}
|
||||||
var qs []Querier
|
var qs []Querier
|
||||||
for _, in := range tc.querierSeries {
|
for _, in := range tc.querierSeries {
|
||||||
|
@ -190,7 +190,7 @@ func TestMergeQuerierWithChainMerger(t *testing.T) {
|
||||||
}
|
}
|
||||||
qs = append(qs, tc.extraQueriers...)
|
qs = append(qs, tc.extraQueriers...)
|
||||||
|
|
||||||
mergedQuerier := NewMergeQuerier([]Querier{p}, qs, ChainedSeriesMerge).Select(context.Background(), false, nil)
|
mergedQuerier := NewMergeQuerier(p, qs, ChainedSeriesMerge).Select(context.Background(), false, nil)
|
||||||
|
|
||||||
// Get all merged series upfront to make sure there are no incorrectly retained shared
|
// Get all merged series upfront to make sure there are no incorrectly retained shared
|
||||||
// buffers causing bugs.
|
// buffers causing bugs.
|
||||||
|
@ -355,9 +355,9 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
var p ChunkQuerier
|
var p []ChunkQuerier
|
||||||
if tc.primaryChkQuerierSeries != nil {
|
if tc.primaryChkQuerierSeries != nil {
|
||||||
p = &mockChunkQuerier{toReturn: tc.primaryChkQuerierSeries}
|
p = append(p, &mockChunkQuerier{toReturn: tc.primaryChkQuerierSeries})
|
||||||
}
|
}
|
||||||
|
|
||||||
var qs []ChunkQuerier
|
var qs []ChunkQuerier
|
||||||
|
@ -366,7 +366,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) {
|
||||||
}
|
}
|
||||||
qs = append(qs, tc.extraQueriers...)
|
qs = append(qs, tc.extraQueriers...)
|
||||||
|
|
||||||
merged := NewMergeChunkQuerier([]ChunkQuerier{p}, qs, NewCompactingChunkSeriesMerger(nil)).Select(context.Background(), false, nil)
|
merged := NewMergeChunkQuerier(p, qs, NewCompactingChunkSeriesMerger(nil)).Select(context.Background(), false, nil)
|
||||||
for merged.Next() {
|
for merged.Next() {
|
||||||
require.True(t, tc.expected.Next(), "Expected Next() to be true")
|
require.True(t, tc.expected.Next(), "Expected Next() to be true")
|
||||||
actualSeries := merged.At()
|
actualSeries := merged.At()
|
||||||
|
@ -1443,6 +1443,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
|
||||||
expectedErrs [4]error
|
expectedErrs [4]error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
// NewMergeQuerier will not create a mergeGenericQuerier
|
||||||
|
// with just one querier inside, but we can test it anyway.
|
||||||
name: "one successful primary querier",
|
name: "one successful primary querier",
|
||||||
queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}},
|
queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}},
|
||||||
expectedSelectsSeries: []labels.Labels{
|
expectedSelectsSeries: []labels.Labels{
|
||||||
|
@ -1551,12 +1553,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
|
||||||
|
|
||||||
for _, qr := range q.queriers {
|
for _, qr := range q.queriers {
|
||||||
m := unwrapMockGenericQuerier(t, qr)
|
m := unwrapMockGenericQuerier(t, qr)
|
||||||
|
// mergeGenericQuerier forces all Selects to be sorted.
|
||||||
exp := []bool{true}
|
require.Equal(t, []bool{true}, m.sortedSeriesRequested)
|
||||||
if len(q.queriers) == 1 {
|
|
||||||
exp[0] = false
|
|
||||||
}
|
|
||||||
require.Equal(t, exp, m.sortedSeriesRequested)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.Run("LabelNames", func(t *testing.T) {
|
t.Run("LabelNames", func(t *testing.T) {
|
||||||
|
|
Loading…
Reference in a new issue