tsdb: Implement limit in block querier

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
This commit is contained in:
🌲 Harry 🌊 John 🏔 2024-09-13 12:09:51 -07:00
parent b6015e7c73
commit 186fdab732
2 changed files with 119 additions and 0 deletions

View file

@ -79,11 +79,13 @@ func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, er
func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, err := q.index.SortedLabelValues(ctx, name, matchers...) res, err := q.index.SortedLabelValues(ctx, name, matchers...)
res = truncateToLimit(res, hints)
return res, nil, err return res, nil, err
} }
func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, err := q.index.LabelNames(ctx, matchers...) res, err := q.index.LabelNames(ctx, matchers...)
res = truncateToLimit(res, hints)
return res, nil, err return res, nil, err
} }
@ -101,6 +103,13 @@ func (q *blockBaseQuerier) Close() error {
return errs.Err() return errs.Err()
} }
func truncateToLimit(s []string, hints *storage.LabelHints) []string {
if hints != nil && hints.Limit > 0 && len(s) > hints.Limit {
s = s[:hints.Limit]
}
return s
}
type blockQuerier struct { type blockQuerier struct {
*blockBaseQuerier *blockBaseQuerier
} }
@ -139,6 +148,7 @@ func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.Select
mint = hints.Start mint = hints.Start
maxt = hints.End maxt = hints.End
disableTrimming = hints.DisableTrimming disableTrimming = hints.DisableTrimming
p = truncatePostingsToLimit(hints.Limit, p)
if hints.Func == "series" { if hints.Func == "series" {
// When you're only looking up metadata (for example series API), you don't need to load any chunks. // When you're only looking up metadata (for example series API), you don't need to load any chunks.
return newBlockSeriesSet(index, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming) return newBlockSeriesSet(index, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming)
@ -148,6 +158,25 @@ func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.Select
return newBlockSeriesSet(index, chunks, tombstones, p, mint, maxt, disableTrimming) return newBlockSeriesSet(index, chunks, tombstones, p, mint, maxt, disableTrimming)
} }
// truncatePostingsToLimit truncates the postings returned by p to the limit.
func truncatePostingsToLimit(limit int, p index.Postings) index.Postings {
if limit <= 0 {
return p
}
out := make([]storage.SeriesRef, 0, 128)
for p.Next() {
id := p.At()
out = append(out, id)
if len(out) >= limit {
break
}
}
return index.NewListPostings(out)
}
// blockChunkQuerier provides chunk querying access to a single block database. // blockChunkQuerier provides chunk querying access to a single block database.
type blockChunkQuerier struct { type blockChunkQuerier struct {
*blockBaseQuerier *blockBaseQuerier

View file

@ -684,6 +684,96 @@ func TestBlockQuerierDelete(t *testing.T) {
} }
} }
func TestBlockQuerierLimit(t *testing.T) {
tmpdir := t.TempDir()
ctx := context.Background()
var (
allValues []string
allNames = []string{"__name__"}
seriesEntries []storage.Series
)
for i := 0; i < 5; i++ {
value := fmt.Sprintf("value%d", i)
name := fmt.Sprintf("labelName%d", i)
allValues = append(allValues, value)
allNames = append(allNames, name)
seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings(
"__name__", value, name, value,
), []chunks.Sample{sample{100, 0, nil, nil}}))
}
blockDir := createBlock(t, tmpdir, seriesEntries)
// Check open err.
block, err := OpenBlock(nil, blockDir, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, block.Close()) })
q, err := NewBlockQuerier(block, 0, 100)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, q.Close()) })
type testCase struct {
limit int
expectedSeries []storage.Series
expectedLabelValues []string
expectedLabelNames []string
}
testCases := map[string]testCase{
"without limit": {
expectedSeries: seriesEntries,
expectedLabelValues: allValues,
expectedLabelNames: allNames,
},
"with limit": {
limit: 2,
expectedSeries: seriesEntries[:2],
expectedLabelValues: allValues[:2],
expectedLabelNames: allNames[:2],
},
}
for tName, tc := range testCases {
t.Run(fmt.Sprintf("label values %s", tName), func(t *testing.T) {
values, _, err := q.LabelValues(ctx, "__name__", &storage.LabelHints{
Limit: tc.limit,
})
require.NoError(t, err)
require.Equal(t, tc.expectedLabelValues, values)
})
t.Run(fmt.Sprintf("label names %s", tName), func(t *testing.T) {
names, _, err := q.LabelNames(ctx, &storage.LabelHints{
Limit: tc.limit,
})
require.NoError(t, err)
require.Equal(t, tc.expectedLabelNames, names)
})
t.Run(fmt.Sprintf("select %s", tName), func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchRegexp, "__name__", "value.*")
set := q.Select(ctx, true, &storage.SelectHints{
Start: 0,
End: 100,
Limit: tc.limit,
}, matcher)
var s []storage.Series
for set.Next() {
s = append(s, set.At())
}
require.NoError(t, err)
require.Equal(t, len(tc.expectedSeries), len(s))
for i, exp := range tc.expectedSeries {
require.True(t, labels.Equal(exp.Labels(), s[i].Labels()))
}
})
}
}
type fakeChunksReader struct { type fakeChunksReader struct {
ChunkReader ChunkReader
chks map[chunks.ChunkRef]chunkenc.Chunk chks map[chunks.ChunkRef]chunkenc.Chunk