[ENHANCEMENT] Storage: Short-circuit merge of single querier with no-op queriers

Filter before checking whether there is only one.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2024-07-22 15:35:09 +01:00
parent 90d793e8c5
commit 12bd92a25c

View file

@ -45,8 +45,11 @@ type mergeGenericQuerier struct {
// //
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used. // In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier { func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier {
primaries = filterQueriers(primaries)
secondaries = filterQueriers(secondaries)
switch { switch {
case len(primaries)+len(secondaries) == 0: case len(primaries) == 0 && len(secondaries) == 0:
return noopQuerier{} return noopQuerier{}
case len(primaries) == 1 && len(secondaries) == 0: case len(primaries) == 1 && len(secondaries) == 0:
return primaries[0] return primaries[0]
@ -56,15 +59,11 @@ func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMer
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries)) queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries { for _, q := range primaries {
if _, ok := q.(noopQuerier); !ok && q != nil {
queriers = append(queriers, newGenericQuerierFrom(q)) queriers = append(queriers, newGenericQuerierFrom(q))
} }
}
for _, q := range secondaries { for _, q := range secondaries {
if _, ok := q.(noopQuerier); !ok && q != nil {
queriers = append(queriers, newSecondaryQuerierFrom(q)) queriers = append(queriers, newSecondaryQuerierFrom(q))
} }
}
concurrentSelect := false concurrentSelect := false
if len(secondaries) > 0 { if len(secondaries) > 0 {
@ -77,12 +76,25 @@ func NewMergeQuerier(primaries, secondaries []Querier, mergeFn VerticalSeriesMer
}} }}
} }
func filterQueriers(qs []Querier) []Querier {
ret := make([]Querier, 0, len(qs))
for _, q := range qs {
if _, ok := q.(noopQuerier); !ok && q != nil {
ret = append(ret, q)
}
}
return ret
}
// NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers. // NewMergeChunkQuerier returns a new Chunk Querier that merges results of given primary and secondary chunk queriers.
// See NewFanout commentary to learn more about primary vs secondary differences. // See NewFanout commentary to learn more about primary vs secondary differences.
// //
// In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used. // In case of overlaps between the data given by primaries' and secondaries' Selects, merge function will be used.
// TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670 // TODO(bwplotka): Currently merge will compact overlapping chunks with bigger chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier { func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn VerticalChunkSeriesMergeFunc) ChunkQuerier {
primaries = filterChunkQueriers(primaries)
secondaries = filterChunkQueriers(secondaries)
switch { switch {
case len(primaries) == 0 && len(secondaries) == 0: case len(primaries) == 0 && len(secondaries) == 0:
return noopChunkQuerier{} return noopChunkQuerier{}
@ -94,14 +106,10 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica
queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries)) queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries))
for _, q := range primaries { for _, q := range primaries {
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
queriers = append(queriers, newGenericQuerierFromChunk(q)) queriers = append(queriers, newGenericQuerierFromChunk(q))
} }
} for _, q := range secondaries {
for _, querier := range secondaries { queriers = append(queriers, newSecondaryQuerierFromChunk(q))
if _, ok := querier.(noopChunkQuerier); !ok && querier != nil {
queriers = append(queriers, newSecondaryQuerierFromChunk(querier))
}
} }
concurrentSelect := false concurrentSelect := false
@ -115,6 +123,16 @@ func NewMergeChunkQuerier(primaries, secondaries []ChunkQuerier, mergeFn Vertica
}} }}
} }
func filterChunkQueriers(qs []ChunkQuerier) []ChunkQuerier {
ret := make([]ChunkQuerier, 0, len(qs))
for _, q := range qs {
if _, ok := q.(noopChunkQuerier); !ok && q != nil {
ret = append(ret, q)
}
}
return ret
}
// Select returns a set of series that matches the given label matchers. // Select returns a set of series that matches the given label matchers.
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
seriesSets := make([]genericSeriesSet, 0, len(q.queriers)) seriesSets := make([]genericSeriesSet, 0, len(q.queriers))