From 53f8da58b3acb1b62715629c9e38a067e7c06ffa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Thu, 18 Jul 2024 10:49:41 -0700 Subject: [PATCH 1/4] Storage: Implement limit in mergeGenericQuerier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- CHANGELOG.md | 2 ++ storage/merge.go | 18 ++++++++++++- storage/merge_test.go | 59 ++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 75 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37cbea6ef..fcf23e3c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200 * [ENHANCEMENT] OTLP receiver: Warn when encountering exponential histograms with zero count and non-zero sum. #14706 +* [ENHANCEMENT] Storage: Implement limit in mergeGenericQuerier. #14489 * [BUGFIX] tsdb/wlog.Watcher.readSegmentForGC: Only count unknown record types against record_decode_failures_total metric. #14042 ## 2.54.1 / 2024-08-27 @@ -75,6 +76,7 @@ This release changes the default for GOGC, the Go runtime control for the trade- * [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` metric to measure the time it takes to restore a rule group. #13974 * [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991 * [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620 +* [ENHANCEMENT] Storage: Implement limit in Querier. #14109 * [BUGFIX] OTLP: Don't generate target_info unless there are metrics and at least one identifying label is defined. #13991 * [BUGFIX] Scrape: Do no try to ingest native histograms when the native histograms feature is turned off. This happened when protobuf scrape was enabled by for example the created time feature. #13987 * [BUGFIX] Scaleway SD: Use the instance's public IP if no private IP is available as the `__address__` meta label. #13941 diff --git a/storage/merge.go b/storage/merge.go index 2424b26ab..ec8661f54 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -216,7 +216,14 @@ func (q *mergeGenericQuerier) lvals(ctx context.Context, lq labelGenericQueriers if err != nil { return nil, ws, err } - return mergeStrings(s1, s2), ws, nil + + s1 = truncateToLimit(s1, hints) + s2 = truncateToLimit(s2, hints) + + merged := mergeStrings(s1, s2) + merged = truncateToLimit(merged, hints) + + return merged, ws, nil } func mergeStrings(a, b []string) []string { @@ -261,6 +268,7 @@ func (q *mergeGenericQuerier) LabelNames(ctx context.Context, hints *LabelHints, if err != nil { return nil, nil, fmt.Errorf("LabelNames() from merge generic querier: %w", err) } + names = truncateToLimit(names, hints) for _, name := range names { labelNamesMap[name] = struct{}{} } @@ -274,6 +282,7 @@ func (q *mergeGenericQuerier) LabelNames(ctx context.Context, hints *LabelHints, labelNames = append(labelNames, name) } slices.Sort(labelNames) + labelNames = truncateToLimit(labelNames, hints) return labelNames, warnings, nil } @@ -288,6 +297,13 @@ func (q *mergeGenericQuerier) Close() error { return errs.Err() } +func truncateToLimit(s []string, hints *LabelHints) []string { + if hints != nil && hints.Limit > 0 && len(s) > hints.Limit { + s = s[:hints.Limit] + } + return s +} + // VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together. // It has to handle time-overlapped series as well. type VerticalSeriesMergeFunc func(...Series) Series diff --git a/storage/merge_test.go b/storage/merge_test.go index b145743c8..5237275e5 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1390,6 +1390,34 @@ func BenchmarkMergeSeriesSet(b *testing.B) { } } +func BenchmarkMergeLabelValuesWithLimit(b *testing.B) { + var queriers []genericQuerier + + for i := 0; i < 5; i++ { + var lbls []string + for j := 0; j < 100000; j++ { + lbls = append(lbls, fmt.Sprintf("querier_%d_label_%d", i, j)) + } + q := &mockQuerier{resp: lbls} + queriers = append(queriers, newGenericQuerierFrom(q)) + } + + mergeQuerier := &mergeGenericQuerier{ + queriers: queriers, // Assume querying 5 blocks. + mergeFn: func(l ...Labels) Labels { + return l[0] + }, + } + + b.Run("benchmark", func(b *testing.B) { + ctx := context.Background() + hints := &LabelHints{ + Limit: 1000, + } + mergeQuerier.LabelValues(ctx, "name", hints) + }) +} + func visitMockQueriers(t *testing.T, qr Querier, f func(t *testing.T, q *mockQuerier)) int { count := 0 switch x := qr.(type) { @@ -1428,6 +1456,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) { name string primaries []Querier secondaries []Querier + limit int expectedSelectsSeries []labels.Labels expectedLabels []string @@ -1553,7 +1582,31 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) { expectedLabels: []string{"a", "b"}, expectedWarnings: annotations.New().Add(warnStorage), }, + { + name: "successful queriers with limit", + primaries: []Querier{ + &mockQuerier{resp: []string{"a", "d"}, warnings: annotations.New().Add(warnStorage), err: nil}, + }, + secondaries: []Querier{ + &mockQuerier{resp: []string{"b", "c"}, warnings: annotations.New().Add(warnStorage), err: nil}, + }, + limit: 1, + expectedSelectsSeries: []labels.Labels{ + labels.FromStrings("test", "a"), + labels.FromStrings("test", "b"), + labels.FromStrings("test", "c"), + labels.FromStrings("test", "d"), + }, + expectedLabels: []string{"a"}, + expectedWarnings: annotations.New().Add(warnStorage), + }, } { + var labelHints *LabelHints + if tcase.limit > 0 { + labelHints = &LabelHints{ + Limit: tcase.limit, + } + } t.Run(tcase.name, func(t *testing.T) { q := NewMergeQuerier(tcase.primaries, tcase.secondaries, func(s ...Series) Series { return s[0] }) @@ -1577,7 +1630,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) { require.Equal(t, len(tcase.primaries)+len(tcase.secondaries), n) }) t.Run("LabelNames", func(t *testing.T) { - res, w, err := q.LabelNames(ctx, nil) + res, w, err := q.LabelNames(ctx, labelHints) require.Subset(t, tcase.expectedWarnings, w) require.ErrorIs(t, err, tcase.expectedErrs[1], "expected error doesn't match") requireEqualSlice(t, tcase.expectedLabels, res) @@ -1590,7 +1643,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) { }) }) t.Run("LabelValues", func(t *testing.T) { - res, w, err := q.LabelValues(ctx, "test", nil) + res, w, err := q.LabelValues(ctx, "test", labelHints) require.Subset(t, tcase.expectedWarnings, w) require.ErrorIs(t, err, tcase.expectedErrs[2], "expected error doesn't match") requireEqualSlice(t, tcase.expectedLabels, res) @@ -1604,7 +1657,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) { }) t.Run("LabelValuesWithMatchers", func(t *testing.T) { matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue") - res, w, err := q.LabelValues(ctx, "test2", nil, matcher) + res, w, err := q.LabelValues(ctx, "test2", labelHints, matcher) require.Subset(t, tcase.expectedWarnings, w) require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match") requireEqualSlice(t, tcase.expectedLabels, res) From f743f7e6f2266758bfc35b6ccb901b12f6600847 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Sat, 24 Aug 2024 14:01:20 -0700 Subject: [PATCH 2/4] Implement merge sort for LabelNames() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- storage/merge.go | 49 ++++++++++++++---------------------------------- 1 file changed, 14 insertions(+), 35 deletions(-) diff --git a/storage/merge.go b/storage/merge.go index ec8661f54..60b369e20 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -19,7 +19,6 @@ import ( "context" "fmt" "math" - "slices" "sync" "github.com/prometheus/prometheus/model/histogram" @@ -188,30 +187,32 @@ func (l labelGenericQueriers) SplitByHalf() (labelGenericQueriers, labelGenericQ // If matchers are specified the returned result set is reduced // to label values of metrics matching the matchers. func (q *mergeGenericQuerier) LabelValues(ctx context.Context, name string, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { - res, ws, err := q.lvals(ctx, q.queriers, name, hints, matchers...) + res, ws, err := q.mergeResults(q.queriers, hints, func(q LabelQuerier) ([]string, annotations.Annotations, error) { + return q.LabelValues(ctx, name, hints, matchers...) + }) if err != nil { return nil, nil, fmt.Errorf("LabelValues() from merge generic querier for label %s: %w", name, err) } return res, ws, nil } -// lvals performs merge sort for LabelValues from multiple queriers. -func (q *mergeGenericQuerier) lvals(ctx context.Context, lq labelGenericQueriers, n string, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { +// mergeResults performs merge sort for results from multiple queriers. +func (q *mergeGenericQuerier) mergeResults(lq labelGenericQueriers, hints *LabelHints, resultsFn func(q LabelQuerier) ([]string, annotations.Annotations, error)) ([]string, annotations.Annotations, error) { if lq.Len() == 0 { return nil, nil, nil } if lq.Len() == 1 { - return lq.Get(0).LabelValues(ctx, n, hints, matchers...) + return resultsFn(lq.Get(0)) } a, b := lq.SplitByHalf() var ws annotations.Annotations - s1, w, err := q.lvals(ctx, a, n, hints, matchers...) + s1, w, err := q.mergeResults(a, hints, resultsFn) ws.Merge(w) if err != nil { return nil, ws, err } - s2, ws, err := q.lvals(ctx, b, n, hints, matchers...) + s2, ws, err := q.mergeResults(b, hints, resultsFn) ws.Merge(w) if err != nil { return nil, ws, err @@ -255,35 +256,13 @@ func mergeStrings(a, b []string) []string { // LabelNames returns all the unique label names present in all queriers in sorted order. func (q *mergeGenericQuerier) LabelNames(ctx context.Context, hints *LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { - var ( - labelNamesMap = make(map[string]struct{}) - warnings annotations.Annotations - ) - for _, querier := range q.queriers { - names, wrn, err := querier.LabelNames(ctx, hints, matchers...) - if wrn != nil { - // TODO(bwplotka): We could potentially wrap warnings. - warnings.Merge(wrn) - } - if err != nil { - return nil, nil, fmt.Errorf("LabelNames() from merge generic querier: %w", err) - } - names = truncateToLimit(names, hints) - for _, name := range names { - labelNamesMap[name] = struct{}{} - } + res, ws, err := q.mergeResults(q.queriers, hints, func(q LabelQuerier) ([]string, annotations.Annotations, error) { + return q.LabelNames(ctx, hints, matchers...) + }) + if err != nil { + return nil, nil, fmt.Errorf("LabelNames() from merge generic querier: %w", err) } - if len(labelNamesMap) == 0 { - return nil, warnings, nil - } - - labelNames := make([]string, 0, len(labelNamesMap)) - for name := range labelNamesMap { - labelNames = append(labelNames, name) - } - slices.Sort(labelNames) - labelNames = truncateToLimit(labelNames, hints) - return labelNames, warnings, nil + return res, ws, nil } // Close releases the resources of the generic querier. From e6678e463708ab74c66931004cc8151ecad084c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Tue, 3 Sep 2024 10:31:29 -0700 Subject: [PATCH 3/4] Implement limit in select MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- CHANGELOG.md | 2 -- cmd/promtool/tsdb.go | 2 +- storage/merge.go | 41 ++++++++++++++++++++++++++++------------- storage/merge_test.go | 15 +++++++++------ tsdb/compact.go | 2 +- tsdb/querier_test.go | 2 +- web/api/v1/api.go | 2 +- web/federate.go | 2 +- 8 files changed, 42 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fcf23e3c7..37cbea6ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,6 @@ * [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200 * [ENHANCEMENT] OTLP receiver: Warn when encountering exponential histograms with zero count and non-zero sum. #14706 -* [ENHANCEMENT] Storage: Implement limit in mergeGenericQuerier. #14489 * [BUGFIX] tsdb/wlog.Watcher.readSegmentForGC: Only count unknown record types against record_decode_failures_total metric. #14042 ## 2.54.1 / 2024-08-27 @@ -76,7 +75,6 @@ This release changes the default for GOGC, the Go runtime control for the trade- * [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` metric to measure the time it takes to restore a rule group. #13974 * [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991 * [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620 -* [ENHANCEMENT] Storage: Implement limit in Querier. #14109 * [BUGFIX] OTLP: Don't generate target_info unless there are metrics and at least one identifying label is defined. #13991 * [BUGFIX] Scrape: Do no try to ingest native histograms when the native histograms feature is turned off. This happened when protobuf scrape was enabled by for example the created time feature. #13987 * [BUGFIX] Scaleway SD: Use the instance's public IP if no private IP is available as the `__address__` meta label. #13941 diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 971ea8ab0..42dcb2ca8 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -733,7 +733,7 @@ func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt i for _, mset := range matcherSets { sets = append(sets, q.Select(ctx, true, nil, mset...)) } - ss = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) + ss = storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge) } else { ss = q.Select(ctx, false, nil, matcherSets[0]...) } diff --git a/storage/merge.go b/storage/merge.go index 60b369e20..0f39dfb1d 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -135,13 +135,17 @@ func filterChunkQueriers(qs []ChunkQuerier) []ChunkQuerier { // Select returns a set of series that matches the given label matchers. func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { seriesSets := make([]genericSeriesSet, 0, len(q.queriers)) + var limit int + if hints != nil { + limit = hints.Limit + } if !q.concurrentSelect { for _, querier := range q.queriers { // We need to sort for merge to work. seriesSets = append(seriesSets, querier.Select(ctx, true, hints, matchers...)) } return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) { - s := newGenericMergeSeriesSet(seriesSets, q.mergeFn) + s := newGenericMergeSeriesSet(seriesSets, limit, q.mergeFn) return s, s.Next() }} } @@ -169,7 +173,7 @@ func (q *mergeGenericQuerier) Select(ctx context.Context, sortSeries bool, hints seriesSets = append(seriesSets, r) } return &lazyGenericSeriesSet{init: func() (genericSeriesSet, bool) { - s := newGenericMergeSeriesSet(seriesSets, q.mergeFn) + s := newGenericMergeSeriesSet(seriesSets, limit, q.mergeFn) return s, s.Next() }} } @@ -288,12 +292,12 @@ func truncateToLimit(s []string, hints *LabelHints) []string { type VerticalSeriesMergeFunc func(...Series) Series // NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together. -func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) SeriesSet { +func NewMergeSeriesSet(sets []SeriesSet, limit int, mergeFunc VerticalSeriesMergeFunc) SeriesSet { genericSets := make([]genericSeriesSet, 0, len(sets)) for _, s := range sets { genericSets = append(genericSets, &genericSeriesSetAdapter{s}) } - return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)} + return &seriesSetAdapter{newGenericMergeSeriesSet(genericSets, limit, (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFunc}).Merge)} } // VerticalChunkSeriesMergeFunc returns merged chunk series implementation that merges potentially time-overlapping @@ -303,12 +307,12 @@ func NewMergeSeriesSet(sets []SeriesSet, mergeFunc VerticalSeriesMergeFunc) Seri type VerticalChunkSeriesMergeFunc func(...ChunkSeries) ChunkSeries // NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges many SeriesSet together. -func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet { +func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, limit int, mergeFunc VerticalChunkSeriesMergeFunc) ChunkSeriesSet { genericSets := make([]genericSeriesSet, 0, len(sets)) for _, s := range sets { genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s}) } - return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)} + return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, limit, (&chunkSeriesMergerAdapter{VerticalChunkSeriesMergeFunc: mergeFunc}).Merge)} } // genericMergeSeriesSet implements genericSeriesSet. @@ -316,9 +320,11 @@ type genericMergeSeriesSet struct { currentLabels labels.Labels mergeFunc genericSeriesMergeFunc - heap genericSeriesSetHeap - sets []genericSeriesSet - currentSets []genericSeriesSet + heap genericSeriesSetHeap + sets []genericSeriesSet + currentSets []genericSeriesSet + seriesLimit int + mergedSeries int // tracks the total number of series merged and returned. } // newGenericMergeSeriesSet returns a new genericSeriesSet that merges (and deduplicates) @@ -326,7 +332,8 @@ type genericMergeSeriesSet struct { // Each series set must return its series in labels order, otherwise // merged series set will be incorrect. // Overlapped situations are merged using provided mergeFunc. -func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMergeFunc) genericSeriesSet { +// If seriesLimit is set, only limited series are returned. +func newGenericMergeSeriesSet(sets []genericSeriesSet, seriesLimit int, mergeFunc genericSeriesMergeFunc) genericSeriesSet { if len(sets) == 1 { return sets[0] } @@ -346,13 +353,19 @@ func newGenericMergeSeriesSet(sets []genericSeriesSet, mergeFunc genericSeriesMe } } return &genericMergeSeriesSet{ - mergeFunc: mergeFunc, - sets: sets, - heap: h, + mergeFunc: mergeFunc, + sets: sets, + heap: h, + seriesLimit: seriesLimit, } } func (c *genericMergeSeriesSet) Next() bool { + if c.seriesLimit > 0 && c.mergedSeries >= c.seriesLimit { + // Exit early if seriesLimit is set. + return false + } + // Run in a loop because the "next" series sets may not be valid anymore. // If, for the current label set, all the next series sets come from // failed remote storage sources, we want to keep trying with the next label set. @@ -388,12 +401,14 @@ func (c *genericMergeSeriesSet) Next() bool { func (c *genericMergeSeriesSet) At() Labels { if len(c.currentSets) == 1 { + c.mergedSeries++ return c.currentSets[0].At() } series := make([]Labels, 0, len(c.currentSets)) for _, seriesSet := range c.currentSets { series = append(series, seriesSet.At()) } + c.mergedSeries++ return c.mergeFunc(series...) } diff --git a/storage/merge_test.go b/storage/merge_test.go index 5237275e5..04d4e9207 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1345,7 +1345,7 @@ func makeMergeSeriesSet(serieses [][]Series) SeriesSet { for i, s := range serieses { seriesSets[i] = &genericSeriesSetAdapter{NewMockSeriesSet(s...)} } - return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)} + return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, 0, (&seriesMergerAdapter{VerticalSeriesMergeFunc: ChainedSeriesMerge}).Merge)} } func benchmarkDrain(b *testing.B, makeSeriesSet func() SeriesSet) { @@ -1590,28 +1590,31 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) { secondaries: []Querier{ &mockQuerier{resp: []string{"b", "c"}, warnings: annotations.New().Add(warnStorage), err: nil}, }, - limit: 1, + limit: 2, expectedSelectsSeries: []labels.Labels{ labels.FromStrings("test", "a"), labels.FromStrings("test", "b"), - labels.FromStrings("test", "c"), - labels.FromStrings("test", "d"), }, - expectedLabels: []string{"a"}, + expectedLabels: []string{"a", "b"}, expectedWarnings: annotations.New().Add(warnStorage), }, } { var labelHints *LabelHints + var selectHints *SelectHints if tcase.limit > 0 { labelHints = &LabelHints{ Limit: tcase.limit, } + selectHints = &SelectHints{ + Limit: tcase.limit, + } } + t.Run(tcase.name, func(t *testing.T) { q := NewMergeQuerier(tcase.primaries, tcase.secondaries, func(s ...Series) Series { return s[0] }) t.Run("Select", func(t *testing.T) { - res := q.Select(context.Background(), false, nil) + res := q.Select(context.Background(), false, selectHints) var lbls []labels.Labels for res.Next() { lbls = append(lbls, res.At().Labels()) diff --git a/tsdb/compact.go b/tsdb/compact.go index 9ef42b339..8248138e5 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -831,7 +831,7 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa if len(sets) > 1 { // Merge series using specified chunk series merger. // The default one is the compacting series merger. - set = storage.NewMergeChunkSeriesSet(sets, mergeFunc) + set = storage.NewMergeChunkSeriesSet(sets, 0, mergeFunc) } // Iterate over all sorted chunk series. diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 50525f65f..d12117814 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2030,7 +2030,7 @@ func TestPopulateWithDelSeriesIterator_NextWithMinTime(t *testing.T) { // TODO(bwplotka): Merge with storage merged series set benchmark. func BenchmarkMergedSeriesSet(b *testing.B) { sel := func(sets []storage.SeriesSet) storage.SeriesSet { - return storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) + return storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge) } for _, k := range []int{ diff --git a/web/api/v1/api.go b/web/api/v1/api.go index d58be211f..bdb40bba2 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -884,7 +884,7 @@ func (api *API) series(r *http.Request) (result apiFuncResult) { s := q.Select(ctx, true, hints, mset...) sets = append(sets, s) } - set = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) + set = storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge) } else { // At this point at least one match exists. set = q.Select(ctx, false, hints, matcherSets[0]...) diff --git a/web/federate.go b/web/federate.go index 8176eba36..8215f820b 100644 --- a/web/federate.go +++ b/web/federate.go @@ -101,7 +101,7 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { sets = append(sets, s) } - set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) + set := storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge) it := storage.NewBuffer(int64(h.lookbackDelta / 1e6)) var chkIter chunkenc.Iterator Loop: From 1949baffe3dc32bcad2de12b0c072d09be6bf6a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Fri, 6 Sep 2024 08:26:43 -0700 Subject: [PATCH 4/4] Update comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- storage/merge.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/storage/merge.go b/storage/merge.go index 0f39dfb1d..7b938a308 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -200,7 +200,7 @@ func (q *mergeGenericQuerier) LabelValues(ctx context.Context, name string, hint return res, ws, nil } -// mergeResults performs merge sort for results from multiple queriers. +// mergeResults performs merge sort on the results of invoking the resultsFn against multiple queriers. func (q *mergeGenericQuerier) mergeResults(lq labelGenericQueriers, hints *LabelHints, resultsFn func(q LabelQuerier) ([]string, annotations.Annotations, error)) ([]string, annotations.Annotations, error) { if lq.Len() == 0 { return nil, nil, nil @@ -216,7 +216,7 @@ func (q *mergeGenericQuerier) mergeResults(lq labelGenericQueriers, hints *Label if err != nil { return nil, ws, err } - s2, ws, err := q.mergeResults(b, hints, resultsFn) + s2, w, err := q.mergeResults(b, hints, resultsFn) ws.Merge(w) if err != nil { return nil, ws, err @@ -292,6 +292,7 @@ func truncateToLimit(s []string, hints *LabelHints) []string { type VerticalSeriesMergeFunc func(...Series) Series // NewMergeSeriesSet returns a new SeriesSet that merges many SeriesSets together. +// If limit is set, the SeriesSet will be limited up-to the limit. 0 means disabled. func NewMergeSeriesSet(sets []SeriesSet, limit int, mergeFunc VerticalSeriesMergeFunc) SeriesSet { genericSets := make([]genericSeriesSet, 0, len(sets)) for _, s := range sets {