Implement limit in select

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
This commit is contained in:
🌲 Harry 🌊 John 🏔 2024-09-03 10:31:29 -07:00
parent f743f7e6f2
commit e6678e4637
8 changed files with 42 additions and 26 deletions

View file

@ -4,7 +4,6 @@
* [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200
* [ENHANCEMENT] OTLP receiver: Warn when encountering exponential histograms with zero count and non-zero sum. #14706
* [ENHANCEMENT] Storage: Implement limit in mergeGenericQuerier. #14489
* [BUGFIX] tsdb/wlog.Watcher.readSegmentForGC: Only count unknown record types against record_decode_failures_total metric. #14042
## 2.54.1 / 2024-08-27
@ -76,7 +75,6 @@ This release changes the default for GOGC, the Go runtime control for the trade-
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` metric to measure the time it takes to restore a rule group. #13974
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991
* [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620
* [ENHANCEMENT] Storage: Implement limit in Querier. #14109
* [BUGFIX] OTLP: Don't generate target_info unless there are metrics and at least one identifying label is defined. #13991
* [BUGFIX] Scrape: Do no try to ingest native histograms when the native histograms feature is turned off. This happened when protobuf scrape was enabled by for example the created time feature. #13987
* [BUGFIX] Scaleway SD: Use the instance's public IP if no private IP is available as the `__address__` meta label. #13941

View file

@ -733,7 +733,7 @@ func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt i
for _, mset := range matcherSets {
sets = append(sets, q.Select(ctx, true, nil, mset...))
}
ss = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
ss = storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge)
} else {
ss = q.Select(ctx, false, nil, matcherSets[0]...)
}

View file

@ -135,13 +135,17 @@ func filterChunkQueriers(qs []ChunkQuerier) []ChunkQuerier {
// Select returns a set of series that matches the given label matchers.
func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet {
seriesSets := make([]genericSeriesSet, 0, len(q.queriers))
var limit int
if hints != nil {
limit = hints.Limit
}
if !q.concurrentSelect {
for _, querier := range q.queriers {
// We need to sort for merge to work.
seriesSets = append(seriesSets, querier.Select(ctx, true, hints, matchers...))
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
s := newGenericMergeSeriesSet(seriesSets, limit, q.mergeFn)
return s, s.Next()
}}
}
@ -169,7 +173,7 @@ func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints
seriesSets = append(seriesSets, r)
}
return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) {
s := newGenericMergeSeriesSet(seriesSets, q.mergeFn)
s := newGenericMergeSeriesSet(seriesSets, limit, q.mergeFn)
return s, s.Next()
}}
}
@ -288,12 +292,12 @@ func truncateToLimit(s []string, hints *LabelHints) []string {
type VerticalSeriesMergeFunc func(...Series) Series
// NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together.
func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
func NewMergeSeriesSet(sets []SeriesSet, limit int, mergeFunc VerticalSeriesMergeFunc) SeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericSeriesSetAdapter{s})
}
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, limit, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)}
}
// VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping
@ -303,12 +307,12 @@ func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) Seri
type VerticalChunkSeriesMergeFunc func(...ChunkSeries) ChunkSeries
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together.
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, limit int, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, limit, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)}
}
// genericMergeSeriesSet implements genericSeriesSet.
@ -316,9 +320,11 @@ type genericMergeSeriesSet struct {
currentLabels labels.Labels
mergeFunc genericSeriesMergeFunc
heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
seriesLimit int
mergedSeries int // tracks the total number of series merged and returned.
}
// newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates)
@ -326,7 +332,8 @@ type genericMergeSeriesSet struct {
// Each series set must return its series in labels order, otherwise
// merged series set will be incorrect.
// Overlapped situations are merged using provided mergeFunc.
func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
// If seriesLimit is set, only limited series are returned.
func newGenericMergeSeriesSet(sets []genericSeriesSet, seriesLimit int, mergeFunc genericSeriesMergeFunc) genericSeriesSet {
if len(sets) == 1 {
return sets[0]
}
@ -346,13 +353,19 @@ func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMe
}
}
return &genericMergeSeriesSet{
mergeFunc: mergeFunc,
sets: sets,
heap: h,
mergeFunc: mergeFunc,
sets: sets,
heap: h,
seriesLimit: seriesLimit,
}
}
func (c *genericMergeSeriesSet) Next() bool {
if c.seriesLimit > 0 && c.mergedSeries >= c.seriesLimit {
// Exit early if seriesLimit is set.
return false
}
// Run in a loop because the "next" series sets may not be valid anymore.
// If, for the current label set, all the next series sets come from
// failed remote storage sources, we want to keep trying with the next label set.
@ -388,12 +401,14 @@ func (c *genericMergeSeriesSet) Next() bool {
func (c *genericMergeSeriesSet) At() Labels {
if len(c.currentSets) == 1 {
c.mergedSeries++
return c.currentSets[0].At()
}
series := make([]Labels, 0, len(c.currentSets))
for _, seriesSet := range c.currentSets {
series = append(series, seriesSet.At())
}
c.mergedSeries++
return c.mergeFunc(series...)
}

View file

@ -1345,7 +1345,7 @@ func makeMergeSeriesSet(serieses [][]Series) SeriesSet {
for i, s := range serieses {
seriesSets[i] = &genericSeriesSetAdapter{NewMockSeriesSet(s...)}
}
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)}
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, 0, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)}
}
func benchmarkDrain(b *testing.B, makeSeriesSet func() SeriesSet) {
@ -1590,28 +1590,31 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
secondaries: []Querier{
&mockQuerier{resp: []string{"b", "c"}, warnings: annotations.New().Add(warnStorage), err: nil},
},
limit: 1,
limit: 2,
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
labels.FromStrings("test", "c"),
labels.FromStrings("test", "d"),
},
expectedLabels: []string{"a"},
expectedLabels: []string{"a", "b"},
expectedWarnings: annotations.New().Add(warnStorage),
},
} {
var labelHints *LabelHints
var selectHints *SelectHints
if tcase.limit > 0 {
labelHints = &LabelHints{
Limit: tcase.limit,
}
selectHints = &SelectHints{
Limit: tcase.limit,
}
}
t.Run(tcase.name, func(t *testing.T) {
q := NewMergeQuerier(tcase.primaries, tcase.secondaries, func(s ...Series) Series { return s[0] })
t.Run("Select", func(t *testing.T) {
res := q.Select(context.Background(), false, nil)
res := q.Select(context.Background(), false, selectHints)
var lbls []labels.Labels
for res.Next() {
lbls = append(lbls, res.At().Labels())

View file

@ -831,7 +831,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
if len(sets) > 1 {
// Merge series using specified chunk series merger.
// The default one is the compacting series merger.
set = storage.NewMergeChunkSeriesSet(sets, mergeFunc)
set = storage.NewMergeChunkSeriesSet(sets, 0, mergeFunc)
}
// Iterate over all sorted chunk series.

View file

@ -2030,7 +2030,7 @@ func TestPopulateWithDelSeriesIterator_NextWithMinTime(t *testing.T) {
// TODO(bwplotka): Merge with storage merged series set benchmark.
func BenchmarkMergedSeriesSet(b *testing.B) {
sel := func(sets []storage.SeriesSet) storage.SeriesSet {
return storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
return storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge)
}
for _, k := range []int{

View file

@ -884,7 +884,7 @@ func (api *API) series(r *http.Request) (result apiFuncResult) {
s := q.Select(ctx, true, hints, mset...)
sets = append(sets, s)
}
set = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
set = storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge)
} else {
// At this point at least one match exists.
set = q.Select(ctx, false, hints, matcherSets[0]...)

View file

@ -101,7 +101,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
sets = append(sets, s)
}
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
set := storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge)
it := storage.NewBuffer(int64(h.lookbackDelta / 1e6))
var chkIter chunkenc.Iterator
Loop: