From 186fdab7320ae0b18803bed63e191e8aab0ab46f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Fri, 13 Sep 2024 12:09:51 -0700 Subject: [PATCH] tsdb: Implement limit in block querier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- tsdb/querier.go | 29 ++++++++++++++ tsdb/querier_test.go | 90 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 119 insertions(+) diff --git a/tsdb/querier.go b/tsdb/querier.go index 912c95032..61316f4ae 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -79,11 +79,13 @@ func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, er func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { res, err := q.index.SortedLabelValues(ctx, name, matchers...) + res = truncateToLimit(res, hints) return res, nil, err } func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { res, err := q.index.LabelNames(ctx, matchers...) + res = truncateToLimit(res, hints) return res, nil, err } @@ -101,6 +103,13 @@ func (q *blockBaseQuerier) Close() error { return errs.Err() } +func truncateToLimit(s []string, hints *storage.LabelHints) []string { + if hints != nil && hints.Limit > 0 && len(s) > hints.Limit { + s = s[:hints.Limit] + } + return s +} + type blockQuerier struct { *blockBaseQuerier } @@ -139,6 +148,7 @@ func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.Select mint = hints.Start maxt = hints.End disableTrimming = hints.DisableTrimming + p = truncatePostingsToLimit(hints.Limit, p) if hints.Func == "series" { // When you're only looking up metadata (for example series API), you don't need to load any chunks. return newBlockSeriesSet(index, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming) @@ -148,6 +158,25 @@ func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.Select return newBlockSeriesSet(index, chunks, tombstones, p, mint, maxt, disableTrimming) } +// truncatePostingsToLimit truncates the postings returned by p to the limit. +func truncatePostingsToLimit(limit int, p index.Postings) index.Postings { + if limit <= 0 { + return p + } + + out := make([]storage.SeriesRef, 0, 128) + + for p.Next() { + id := p.At() + out = append(out, id) + if len(out) >= limit { + break + } + } + + return index.NewListPostings(out) +} + // blockChunkQuerier provides chunk querying access to a single block database. type blockChunkQuerier struct { *blockBaseQuerier diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 0821b2b37..14c6d3ec0 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -684,6 +684,96 @@ func TestBlockQuerierDelete(t *testing.T) { } } +func TestBlockQuerierLimit(t *testing.T) { + tmpdir := t.TempDir() + ctx := context.Background() + var ( + allValues []string + allNames = []string{"__name__"} + seriesEntries []storage.Series + ) + + for i := 0; i < 5; i++ { + value := fmt.Sprintf("value%d", i) + name := fmt.Sprintf("labelName%d", i) + allValues = append(allValues, value) + allNames = append(allNames, name) + + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings( + "__name__", value, name, value, + ), []chunks.Sample{sample{100, 0, nil, nil}})) + } + + blockDir := createBlock(t, tmpdir, seriesEntries) + + // Check open err. + block, err := OpenBlock(nil, blockDir, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, block.Close()) }) + + q, err := NewBlockQuerier(block, 0, 100) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, q.Close()) }) + + type testCase struct { + limit int + expectedSeries []storage.Series + expectedLabelValues []string + expectedLabelNames []string + } + + testCases := map[string]testCase{ + "without limit": { + expectedSeries: seriesEntries, + expectedLabelValues: allValues, + expectedLabelNames: allNames, + }, + "with limit": { + limit: 2, + expectedSeries: seriesEntries[:2], + expectedLabelValues: allValues[:2], + expectedLabelNames: allNames[:2], + }, + } + + for tName, tc := range testCases { + t.Run(fmt.Sprintf("label values %s", tName), func(t *testing.T) { + values, _, err := q.LabelValues(ctx, "__name__", &storage.LabelHints{ + Limit: tc.limit, + }) + require.NoError(t, err) + require.Equal(t, tc.expectedLabelValues, values) + }) + + t.Run(fmt.Sprintf("label names %s", tName), func(t *testing.T) { + names, _, err := q.LabelNames(ctx, &storage.LabelHints{ + Limit: tc.limit, + }) + require.NoError(t, err) + require.Equal(t, tc.expectedLabelNames, names) + }) + + t.Run(fmt.Sprintf("select %s", tName), func(t *testing.T) { + matcher := labels.MustNewMatcher(labels.MatchRegexp, "__name__", "value.*") + set := q.Select(ctx, true, &storage.SelectHints{ + Start: 0, + End: 100, + Limit: tc.limit, + }, matcher) + + var s []storage.Series + for set.Next() { + s = append(s, set.At()) + } + require.NoError(t, err) + require.Equal(t, len(tc.expectedSeries), len(s)) + for i, exp := range tc.expectedSeries { + require.True(t, labels.Equal(exp.Labels(), s[i].Labels())) + } + }) + } +} + type fakeChunksReader struct { ChunkReader chks map[chunks.ChunkRef]chunkenc.Chunk