diff --git a/storage/merge.go b/storage/merge.go index 38897449b5..8f2dcb82ef 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -46,9 +46,15 @@ type mergeGenericQuerier struct { // // In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used. func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier { - if len(primaries)+len(secondaries) == 0 { - return NoopQuerier() + switch { + case len(primaries)+len(secondaries) == 0: + return noopQuerier{} + case len(primaries) == 1 && len(secondaries) == 0: + return primaries[0] + case len(primaries) == 0 && len(secondaries) == 1: + return secondaries[0] } + queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries)) for _, q := range primaries { if _, ok := q.(noopQuerier); !ok && q != nil { @@ -78,6 +84,15 @@ func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMer // In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used. // TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670 func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier { + switch { + case len(primaries) == 0 && len(secondaries) == 0: + return noopChunkQuerier{} + case len(primaries) == 1 && len(secondaries) == 0: + return primaries[0] + case len(primaries) == 0 && len(secondaries) == 1: + return secondaries[0] + } + queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries)) for _, q := range primaries { if _, ok := q.(noopChunkQuerier); !ok && q != nil { @@ -103,13 +118,6 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica // Select returns a set of series that matches the given label matchers. func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { - if len(q.queriers) == 0 { - return noopGenericSeriesSet{} - } - if len(q.queriers) == 1 { - return q.queriers[0].Select(ctx, sortSeries, hints, matchers...) - } - seriesSets := make([]genericSeriesSet, 0, len(q.queriers)) if !q.concurrentSelect { for _, querier := range q.queriers { diff --git a/storage/merge_test.go b/storage/merge_test.go index 05e1c75278..f42869d8ea 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -180,9 +180,9 @@ func TestMergeQuerierWithChainMerger(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - var p Querier + var p []Querier if tc.primaryQuerierSeries != nil { - p = &mockQuerier{toReturn: tc.primaryQuerierSeries} + p = append(p, &mockQuerier{toReturn: tc.primaryQuerierSeries}) } var qs []Querier for _, in := range tc.querierSeries { @@ -190,7 +190,7 @@ func TestMergeQuerierWithChainMerger(t *testing.T) { } qs = append(qs, tc.extraQueriers...) - mergedQuerier := NewMergeQuerier([]Querier{p}, qs, ChainedSeriesMerge).Select(context.Background(), false, nil) + mergedQuerier := NewMergeQuerier(p, qs, ChainedSeriesMerge).Select(context.Background(), false, nil) // Get all merged series upfront to make sure there are no incorrectly retained shared // buffers causing bugs. @@ -355,9 +355,9 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - var p ChunkQuerier + var p []ChunkQuerier if tc.primaryChkQuerierSeries != nil { - p = &mockChunkQurier{toReturn: tc.primaryChkQuerierSeries} + p = append(p, &mockChunkQurier{toReturn: tc.primaryChkQuerierSeries}) } var qs []ChunkQuerier @@ -366,7 +366,7 @@ func TestMergeChunkQuerierWithNoVerticalChunkSeriesMerger(t *testing.T) { } qs = append(qs, tc.extraQueriers...) - merged := NewMergeChunkQuerier([]ChunkQuerier{p}, qs, NewCompactingChunkSeriesMerger(nil)).Select(context.Background(), false, nil) + merged := NewMergeChunkQuerier(p, qs, NewCompactingChunkSeriesMerger(nil)).Select(context.Background(), false, nil) for merged.Next() { require.True(t, tc.expected.Next(), "Expected Next() to be true") actualSeries := merged.At() @@ -1444,6 +1444,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { expectedErrs [4]error }{ { + // NewMergeQuerier will not create a mergeGenericQuerier + // with just one querier inside, but we can test it anyway. name: "one successful primary querier", queriers: []genericQuerier{&mockGenericQuerier{resp: []string{"a", "b"}, warnings: nil, err: nil}}, expectedSelectsSeries: []labels.Labels{ @@ -1552,12 +1554,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) { for _, qr := range q.queriers { m := unwrapMockGenericQuerier(t, qr) - - exp := []bool{true} - if len(q.queriers) == 1 { - exp[0] = false - } - require.Equal(t, exp, m.sortedSeriesRequested) + // mergeGenericQuerier forces all Selects to be sorted. + require.Equal(t, []bool{true}, m.sortedSeriesRequested) } }) t.Run("LabelNames", func(t *testing.T) {