diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 5ed41f7698..779567229f 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -778,6 +778,56 @@ func (it *ListPostings) Err() error { return nil } +// LimitedPostings wraps Postings and limits its iteration. +type LimitedPostings struct { + p Postings + limit int + i int +} + +// NewLimitedPostings returns Postings that can only be iterated to the limit. 0 means limit is disabled. +func NewLimitedPostings(p Postings, limit int) Postings { + if limit <= 0 { + return p + } + return newLimitedPostings(p, limit) +} + +func newLimitedPostings(p Postings, l int) *LimitedPostings { + return &LimitedPostings{p: p, limit: l} +} + +func (it *LimitedPostings) At() storage.SeriesRef { + return it.p.At() +} + +func (it *LimitedPostings) Next() bool { + if it.i >= it.limit { + return false + } + it.i++ + return it.p.Next() +} + +func (it *LimitedPostings) Seek(x storage.SeriesRef) bool { + // If the current value satisfies, then return. + if it.At() >= x { + return true + } + + for it.Next() { + if it.At() >= x { + return true + } + } + + return false +} + +func (it *LimitedPostings) Err() error { + return nil +} + // bigEndianPostings implements the Postings interface over a byte stream of // big endian numbers. type bigEndianPostings struct { diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 96c9ed124b..754433d1a0 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1363,6 +1363,134 @@ func TestListPostings(t *testing.T) { }) } +func TestLimitPostings(t *testing.T) { + t.Run("empty postings", func(t *testing.T) { + p := NewListPostings(nil) + p = NewLimitedPostings(p, 10) + require.False(t, p.Next()) + require.False(t, p.Seek(10)) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + + t.Run("one posting", func(t *testing.T) { + t.Run("next", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10}) + p = NewLimitedPostings(p, 10) + require.True(t, p.Next()) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + t.Run("seek less", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10}) + p = NewLimitedPostings(p, 1) + require.True(t, p.Seek(5)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.True(t, p.Seek(5)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + t.Run("seek equal", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10}) + p = NewLimitedPostings(p, 1) + require.True(t, p.Seek(10)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + t.Run("seek more", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10}) + p = NewLimitedPostings(p, 1) + require.False(t, p.Seek(15)) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + t.Run("seek after next", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10}) + p = NewLimitedPostings(p, 1) + require.True(t, p.Next()) + require.False(t, p.Seek(15)) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + }) + + t.Run("multiple postings", func(t *testing.T) { + t.Run("next no limit", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 20}) + p = NewLimitedPostings(p, 0) + require.True(t, p.Next()) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.True(t, p.Next()) + require.Equal(t, storage.SeriesRef(20), p.At()) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + t.Run("next with limit", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 20}) + p = NewLimitedPostings(p, 1) + require.True(t, p.Next()) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + t.Run("seek", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 20}) + p = NewLimitedPostings(p, 2) + require.True(t, p.Seek(5)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.True(t, p.Seek(5)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.True(t, p.Seek(10)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.True(t, p.Next()) + require.Equal(t, storage.SeriesRef(20), p.At()) + require.True(t, p.Seek(10)) + require.Equal(t, storage.SeriesRef(20), p.At()) + require.True(t, p.Seek(20)) + require.Equal(t, storage.SeriesRef(20), p.At()) + require.False(t, p.Next()) + require.NoError(t, p.Err()) + }) + t.Run("seek less than last", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50}) + p = NewLimitedPostings(p, 5) + require.True(t, p.Seek(45)) + require.Equal(t, storage.SeriesRef(50), p.At()) + require.False(t, p.Next()) + }) + t.Run("seek exactly last", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50}) + p = NewLimitedPostings(p, 5) + require.True(t, p.Seek(50)) + require.Equal(t, storage.SeriesRef(50), p.At()) + require.False(t, p.Next()) + }) + t.Run("seek more than last", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50}) + p = NewLimitedPostings(p, 6) + require.False(t, p.Seek(60)) + require.False(t, p.Next()) + }) + t.Run("seek with limit", func(t *testing.T) { + p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50}) + p = NewLimitedPostings(p, 3) + require.True(t, p.Seek(10)) + require.Equal(t, storage.SeriesRef(10), p.At()) + require.True(t, p.Seek(20)) + require.Equal(t, storage.SeriesRef(20), p.At()) + require.True(t, p.Seek(30)) + require.Equal(t, storage.SeriesRef(30), p.At()) + require.True(t, p.Seek(10)) + require.Equal(t, storage.SeriesRef(30), p.At()) + require.False(t, p.Seek(40)) + require.False(t, p.Next()) + }) + }) +} + // BenchmarkListPostings benchmarks ListPostings by iterating Next/At sequentially. // See also BenchmarkIntersect as it performs more `At` calls than `Next` calls when intersecting. func BenchmarkListPostings(b *testing.B) { diff --git a/tsdb/querier.go b/tsdb/querier.go index 1083cbba0e..db3c216939 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -79,11 +79,13 @@ func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, er func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { res, err := q.index.SortedLabelValues(ctx, name, matchers...) + res = truncateToLimit(res, hints) return res, nil, err } func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { res, err := q.index.LabelNames(ctx, matchers...) + res = truncateToLimit(res, hints) return res, nil, err } @@ -101,6 +103,13 @@ func (q *blockBaseQuerier) Close() error { return errs.Err() } +func truncateToLimit(s []string, hints *storage.LabelHints) []string { + if hints != nil && hints.Limit > 0 && len(s) > hints.Limit { + s = s[:hints.Limit] + } + return s +} + type blockQuerier struct { *blockBaseQuerier } @@ -119,33 +128,34 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora } func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher, - index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64, + ir IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64, ) storage.SeriesSet { disableTrimming := false sharded := hints != nil && hints.ShardCount > 0 - p, err := PostingsForMatchers(ctx, index, ms...) + p, err := PostingsForMatchers(ctx, ir, ms...) if err != nil { return storage.ErrSeriesSet(err) } if sharded { - p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) + p = ir.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) } if sortSeries { - p = index.SortedPostings(p) + p = ir.SortedPostings(p) } if hints != nil { mint = hints.Start maxt = hints.End disableTrimming = hints.DisableTrimming + p = index.NewLimitedPostings(p, hints.Limit) if hints.Func == "series" { // When you're only looking up metadata (for example series API), you don't need to load any chunks. - return newBlockSeriesSet(index, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming) + return newBlockSeriesSet(ir, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming) } } - return newBlockSeriesSet(index, chunks, tombstones, p, mint, maxt, disableTrimming) + return newBlockSeriesSet(ir, chunks, tombstones, p, mint, maxt, disableTrimming) } // blockChunkQuerier provides chunk querying access to a single block database. diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 77772937a7..2fd5f7c30f 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -684,6 +684,96 @@ func TestBlockQuerierDelete(t *testing.T) { } } +func TestBlockQuerierLimit(t *testing.T) { + tmpdir := t.TempDir() + ctx := context.Background() + var ( + allValues []string + allNames = []string{"__name__"} + seriesEntries []storage.Series + ) + + for i := 0; i < 5; i++ { + value := fmt.Sprintf("value%d", i) + name := fmt.Sprintf("labelName%d", i) + allValues = append(allValues, value) + allNames = append(allNames, name) + + seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings( + "__name__", value, name, value, + ), []chunks.Sample{sample{100, 0, nil, nil}})) + } + + blockDir := createBlock(t, tmpdir, seriesEntries) + + // Check open err. + block, err := OpenBlock(nil, blockDir, nil) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, block.Close()) }) + + q, err := NewBlockQuerier(block, 0, 100) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, q.Close()) }) + + type testCase struct { + limit int + expectedSeries []storage.Series + expectedLabelValues []string + expectedLabelNames []string + } + + testCases := map[string]testCase{ + "without limit": { + expectedSeries: seriesEntries, + expectedLabelValues: allValues, + expectedLabelNames: allNames, + }, + "with limit": { + limit: 2, + expectedSeries: seriesEntries[:2], + expectedLabelValues: allValues[:2], + expectedLabelNames: allNames[:2], + }, + } + + for tName, tc := range testCases { + t.Run(fmt.Sprintf("label values %s", tName), func(t *testing.T) { + values, _, err := q.LabelValues(ctx, "__name__", &storage.LabelHints{ + Limit: tc.limit, + }) + require.NoError(t, err) + require.Equal(t, tc.expectedLabelValues, values) + }) + + t.Run(fmt.Sprintf("label names %s", tName), func(t *testing.T) { + names, _, err := q.LabelNames(ctx, &storage.LabelHints{ + Limit: tc.limit, + }) + require.NoError(t, err) + require.Equal(t, tc.expectedLabelNames, names) + }) + + t.Run(fmt.Sprintf("select %s", tName), func(t *testing.T) { + matcher := labels.MustNewMatcher(labels.MatchRegexp, "__name__", "value.*") + set := q.Select(ctx, true, &storage.SelectHints{ + Start: 0, + End: 100, + Limit: tc.limit, + }, matcher) + + var s []storage.Series + for set.Next() { + s = append(s, set.At()) + } + require.NoError(t, err) + require.Equal(t, len(tc.expectedSeries), len(s)) + for i, exp := range tc.expectedSeries { + require.True(t, labels.Equal(exp.Labels(), s[i].Labels())) + } + }) + } +} + type fakeChunksReader struct { ChunkReader chks map[chunks.ChunkRef]chunkenc.Chunk