Storage: Implement limit in mergeGenericQuerier

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
This commit is contained in:
🌲 Harry 🌊 John 🏔 2024-07-18 10:49:41 -07:00
parent 536d9f9ce9
commit 53f8da58b3
3 changed files with 75 additions and 4 deletions

View file

@ -4,6 +4,7 @@
* [FEATURE] OTLP receiver: Add new option `otlp.promote_resource_attributes`, for any OTel resource attributes that should be promoted to metric labels. #14200
* [ENHANCEMENT] OTLP receiver: Warn when encountering exponential histograms with zero count and non-zero sum. #14706
* [ENHANCEMENT] Storage: Implement limit in mergeGenericQuerier. #14489
* [BUGFIX] tsdb/wlog.Watcher.readSegmentForGC: Only count unknown record types against record_decode_failures_total metric. #14042
## 2.54.1 / 2024-08-27
@ -75,6 +76,7 @@ This release changes the default for GOGC, the Go runtime control for the trade-
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` metric to measure the time it takes to restore a rule group. #13974
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991
* [ENHANCEMENT] TSDB: Optimize querying with regexp matchers. #13620
* [ENHANCEMENT] Storage: Implement limit in Querier. #14109
* [BUGFIX] OTLP: Don't generate target_info unless there are metrics and at least one identifying label is defined. #13991
* [BUGFIX] Scrape: Do no try to ingest native histograms when the native histograms feature is turned off. This happened when protobuf scrape was enabled by for example the created time feature. #13987
* [BUGFIX] Scaleway SD: Use the instance's public IP if no private IP is available as the `__address__` meta label. #13941

View file

@ -216,7 +216,14 @@ func (q *mergeGenericQuerier) lvals(ctx context.Context, lq labelGenericQueriers
if err != nil {
return nil, ws, err
}
return mergeStrings(s1, s2), ws, nil
s1 = truncateToLimit(s1, hints)
s2 = truncateToLimit(s2, hints)
merged := mergeStrings(s1, s2)
merged = truncateToLimit(merged, hints)
return merged, ws, nil
}
func mergeStrings(a, b []string) []string {
@ -261,6 +268,7 @@ func (q *mergeGenericQuerier) LabelNames(ctx context.Context, hints *LabelHints,
if err != nil {
return nil, nil, fmt.Errorf("LabelNames() from merge generic querier: %w", err)
}
names = truncateToLimit(names, hints)
for _, name := range names {
labelNamesMap[name] = struct{}{}
}
@ -274,6 +282,7 @@ func (q *mergeGenericQuerier) LabelNames(ctx context.Context, hints *LabelHints,
labelNames = append(labelNames, name)
}
slices.Sort(labelNames)
labelNames = truncateToLimit(labelNames, hints)
return labelNames, warnings, nil
}
@ -288,6 +297,13 @@ func (q *mergeGenericQuerier) Close() error {
return errs.Err()
}
func truncateToLimit(s []string, hints *LabelHints) []string {
if hints != nil && hints.Limit > 0 && len(s) > hints.Limit {
s = s[:hints.Limit]
}
return s
}
// VerticalSeriesMergeFunc returns merged series implementation that merges series with same labels together.
// It has to handle time-overlapped series as well.
type VerticalSeriesMergeFunc func(...Series) Series

View file

@ -1390,6 +1390,34 @@ func BenchmarkMergeSeriesSet(b *testing.B) {
}
}
func BenchmarkMergeLabelValuesWithLimit(b *testing.B) {
var queriers []genericQuerier
for i := 0; i < 5; i++ {
var lbls []string
for j := 0; j < 100000; j++ {
lbls = append(lbls, fmt.Sprintf("querier_%d_label_%d", i, j))
}
q := &mockQuerier{resp: lbls}
queriers = append(queriers, newGenericQuerierFrom(q))
}
mergeQuerier := &mergeGenericQuerier{
queriers: queriers, // Assume querying 5 blocks.
mergeFn: func(l ...Labels) Labels {
return l[0]
},
}
b.Run("benchmark", func(b *testing.B) {
ctx := context.Background()
hints := &LabelHints{
Limit: 1000,
}
mergeQuerier.LabelValues(ctx, "name", hints)
})
}
func visitMockQueriers(t *testing.T, qr Querier, f func(t *testing.T, q *mockQuerier)) int {
count := 0
switch x := qr.(type) {
@ -1428,6 +1456,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
name string
primaries []Querier
secondaries []Querier
limit int
expectedSelectsSeries []labels.Labels
expectedLabels []string
@ -1553,7 +1582,31 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
expectedLabels: []string{"a", "b"},
expectedWarnings: annotations.New().Add(warnStorage),
},
{
name: "successful queriers with limit",
primaries: []Querier{
&mockQuerier{resp: []string{"a", "d"}, warnings: annotations.New().Add(warnStorage), err: nil},
},
secondaries: []Querier{
&mockQuerier{resp: []string{"b", "c"}, warnings: annotations.New().Add(warnStorage), err: nil},
},
limit: 1,
expectedSelectsSeries: []labels.Labels{
labels.FromStrings("test", "a"),
labels.FromStrings("test", "b"),
labels.FromStrings("test", "c"),
labels.FromStrings("test", "d"),
},
expectedLabels: []string{"a"},
expectedWarnings: annotations.New().Add(warnStorage),
},
} {
var labelHints *LabelHints
if tcase.limit > 0 {
labelHints = &LabelHints{
Limit: tcase.limit,
}
}
t.Run(tcase.name, func(t *testing.T) {
q := NewMergeQuerier(tcase.primaries, tcase.secondaries, func(s ...Series) Series { return s[0] })
@ -1577,7 +1630,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
require.Equal(t, len(tcase.primaries)+len(tcase.secondaries), n)
})
t.Run("LabelNames", func(t *testing.T) {
res, w, err := q.LabelNames(ctx, nil)
res, w, err := q.LabelNames(ctx, labelHints)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[1], "expected error doesn't match")
requireEqualSlice(t, tcase.expectedLabels, res)
@ -1590,7 +1643,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
})
})
t.Run("LabelValues", func(t *testing.T) {
res, w, err := q.LabelValues(ctx, "test", nil)
res, w, err := q.LabelValues(ctx, "test", labelHints)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[2], "expected error doesn't match")
requireEqualSlice(t, tcase.expectedLabels, res)
@ -1604,7 +1657,7 @@ func TestMergeQuerierWithSecondaries_ErrorHandling(t *testing.T) {
})
t.Run("LabelValuesWithMatchers", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue")
res, w, err := q.LabelValues(ctx, "test2", nil, matcher)
res, w, err := q.LabelValues(ctx, "test2", labelHints, matcher)
require.Subset(t, tcase.expectedWarnings, w)
require.ErrorIs(t, err, tcase.expectedErrs[3], "expected error doesn't match")
requireEqualSlice(t, tcase.expectedLabels, res)