diff --git a/storage/merge.go b/storage/merge.go index 66c4c3ed3..2424b26ab 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -45,8 +45,11 @@ type mergeGenericQuerier struct { // // In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used. func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier { + primaries = filterQueriers(primaries) + secondaries = filterQueriers(secondaries) + switch { - case len(primaries)+len(secondaries) == 0: + case len(primaries) == 0 && len(secondaries) == 0: return noopQuerier{} case len(primaries) == 1 && len(secondaries) == 0: return primaries[0] @@ -56,14 +59,10 @@ func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMer queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries)) for _, q := range primaries { - if _, ok := q.(noopQuerier); !ok && q != nil { - queriers = append(queriers, newGenericQuerierFrom(q)) - } + queriers = append(queriers, newGenericQuerierFrom(q)) } for _, q := range secondaries { - if _, ok := q.(noopQuerier); !ok && q != nil { - queriers = append(queriers, newSecondaryQuerierFrom(q)) - } + queriers = append(queriers, newSecondaryQuerierFrom(q)) } concurrentSelect := false @@ -77,12 +76,25 @@ func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMer }} } +func filterQueriers(qs []Querier) []Querier { + ret := make([]Querier, 0, len(qs)) + for _, q := range qs { + if _, ok := q.(noopQuerier); !ok && q != nil { + ret = append(ret, q) + } + } + return ret +} + // NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers. // See NewFanout commentary to learn more about primary vs secondary differences. // // In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used. // TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670 func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier { + primaries = filterChunkQueriers(primaries) + secondaries = filterChunkQueriers(secondaries) + switch { case len(primaries) == 0 && len(secondaries) == 0: return noopChunkQuerier{} @@ -94,14 +106,10 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries)) for _, q := range primaries { - if _, ok := q.(noopChunkQuerier); !ok && q != nil { - queriers = append(queriers, newGenericQuerierFromChunk(q)) - } + queriers = append(queriers, newGenericQuerierFromChunk(q)) } - for _, querier := range secondaries { - if _, ok := querier.(noopChunkQuerier); !ok && querier != nil { - queriers = append(queriers, newSecondaryQuerierFromChunk(querier)) - } + for _, q := range secondaries { + queriers = append(queriers, newSecondaryQuerierFromChunk(q)) } concurrentSelect := false @@ -115,6 +123,16 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica }} } +func filterChunkQueriers(qs []ChunkQuerier) []ChunkQuerier { + ret := make([]ChunkQuerier, 0, len(qs)) + for _, q := range qs { + if _, ok := q.(noopChunkQuerier); !ok && q != nil { + ret = append(ret, q) + } + } + return ret +} + // Select returns a set of series that matches the given label matchers. func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { seriesSets := make([]genericSeriesSet, 0, len(q.queriers))