tsdb: Implement limit in block querier

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
This commit is contained in:
🌲 Harry 🌊 John 🏔 2024-09-13 12:09:51 -07:00
parent 5d8f0ef0c2
commit cf9110eaa0
4 changed files with 284 additions and 6 deletions

View file

@ -778,6 +778,56 @@ func (it *ListPostings) Err() error {
return nil
}
// LimitedPostings wraps Postings and limits its iteration.
type LimitedPostings struct {
p Postings
limit int
i int
}
// NewLimitedPostings returns Postings that can only be iterated to the limit. 0 means limit is disabled.
func NewLimitedPostings(p Postings, limit int) Postings {
if limit <= 0 {
return p
}
return newLimitedPostings(p, limit)
}
func newLimitedPostings(p Postings, l int) *LimitedPostings {
return &LimitedPostings{p: p, limit: l}
}
func (it *LimitedPostings) At() storage.SeriesRef {
return it.p.At()
}
func (it *LimitedPostings) Next() bool {
if it.i >= it.limit {
return false
}
it.i++
return it.p.Next()
}
func (it *LimitedPostings) Seek(x storage.SeriesRef) bool {
// If the current value satisfies, then return.
if it.At() >= x {
return true
}
for it.Next() {
if it.At() >= x {
return true
}
}
return false
}
func (it *LimitedPostings) Err() error {
return nil
}
// bigEndianPostings implements the Postings interface over a byte stream of
// big endian numbers.
type bigEndianPostings struct {

View file

@ -1363,6 +1363,134 @@ func TestListPostings(t *testing.T) {
})
}
func TestLimitPostings(t *testing.T) {
t.Run("empty postings", func(t *testing.T) {
p := NewListPostings(nil)
p = NewLimitedPostings(p, 10)
require.False(t, p.Next())
require.False(t, p.Seek(10))
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("one posting", func(t *testing.T) {
t.Run("next", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10})
p = NewLimitedPostings(p, 10)
require.True(t, p.Next())
require.Equal(t, storage.SeriesRef(10), p.At())
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("seek less", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10})
p = NewLimitedPostings(p, 1)
require.True(t, p.Seek(5))
require.Equal(t, storage.SeriesRef(10), p.At())
require.True(t, p.Seek(5))
require.Equal(t, storage.SeriesRef(10), p.At())
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("seek equal", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10})
p = NewLimitedPostings(p, 1)
require.True(t, p.Seek(10))
require.Equal(t, storage.SeriesRef(10), p.At())
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("seek more", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10})
p = NewLimitedPostings(p, 1)
require.False(t, p.Seek(15))
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("seek after next", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10})
p = NewLimitedPostings(p, 1)
require.True(t, p.Next())
require.False(t, p.Seek(15))
require.False(t, p.Next())
require.NoError(t, p.Err())
})
})
t.Run("multiple postings", func(t *testing.T) {
t.Run("next no limit", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20})
p = NewLimitedPostings(p, 0)
require.True(t, p.Next())
require.Equal(t, storage.SeriesRef(10), p.At())
require.True(t, p.Next())
require.Equal(t, storage.SeriesRef(20), p.At())
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("next with limit", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20})
p = NewLimitedPostings(p, 1)
require.True(t, p.Next())
require.Equal(t, storage.SeriesRef(10), p.At())
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("seek", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20})
p = NewLimitedPostings(p, 2)
require.True(t, p.Seek(5))
require.Equal(t, storage.SeriesRef(10), p.At())
require.True(t, p.Seek(5))
require.Equal(t, storage.SeriesRef(10), p.At())
require.True(t, p.Seek(10))
require.Equal(t, storage.SeriesRef(10), p.At())
require.True(t, p.Next())
require.Equal(t, storage.SeriesRef(20), p.At())
require.True(t, p.Seek(10))
require.Equal(t, storage.SeriesRef(20), p.At())
require.True(t, p.Seek(20))
require.Equal(t, storage.SeriesRef(20), p.At())
require.False(t, p.Next())
require.NoError(t, p.Err())
})
t.Run("seek less than last", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50})
p = NewLimitedPostings(p, 5)
require.True(t, p.Seek(45))
require.Equal(t, storage.SeriesRef(50), p.At())
require.False(t, p.Next())
})
t.Run("seek exactly last", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50})
p = NewLimitedPostings(p, 5)
require.True(t, p.Seek(50))
require.Equal(t, storage.SeriesRef(50), p.At())
require.False(t, p.Next())
})
t.Run("seek more than last", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50})
p = NewLimitedPostings(p, 6)
require.False(t, p.Seek(60))
require.False(t, p.Next())
})
t.Run("seek with limit", func(t *testing.T) {
p := NewListPostings([]storage.SeriesRef{10, 20, 30, 40, 50})
p = NewLimitedPostings(p, 3)
require.True(t, p.Seek(10))
require.Equal(t, storage.SeriesRef(10), p.At())
require.True(t, p.Seek(20))
require.Equal(t, storage.SeriesRef(20), p.At())
require.True(t, p.Seek(30))
require.Equal(t, storage.SeriesRef(30), p.At())
require.True(t, p.Seek(10))
require.Equal(t, storage.SeriesRef(30), p.At())
require.False(t, p.Seek(40))
require.False(t, p.Next())
})
})
}
// BenchmarkListPostings benchmarks ListPostings by iterating Next/At sequentially.
// See also BenchmarkIntersect as it performs more `At` calls than `Next` calls when intersecting.
func BenchmarkListPostings(b *testing.B) {

View file

@ -79,11 +79,13 @@ func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, er
func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, err := q.index.SortedLabelValues(ctx, name, matchers...)
res = truncateToLimit(res, hints)
return res, nil, err
}
func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, err := q.index.LabelNames(ctx, matchers...)
res = truncateToLimit(res, hints)
return res, nil, err
}
@ -101,6 +103,13 @@ func (q *blockBaseQuerier) Close() error {
return errs.Err()
}
func truncateToLimit(s []string, hints *storage.LabelHints) []string {
if hints != nil && hints.Limit > 0 && len(s) > hints.Limit {
s = s[:hints.Limit]
}
return s
}
type blockQuerier struct {
*blockBaseQuerier
}
@ -119,33 +128,34 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
}
func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher,
index IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64,
ir IndexReader, chunks ChunkReader, tombstones tombstones.Reader, mint, maxt int64,
) storage.SeriesSet {
disableTrimming := false
sharded := hints != nil && hints.ShardCount > 0
p, err := PostingsForMatchers(ctx, index, ms...)
p, err := PostingsForMatchers(ctx, ir, ms...)
if err != nil {
return storage.ErrSeriesSet(err)
}
if sharded {
p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
p = ir.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
}
if sortSeries {
p = index.SortedPostings(p)
p = ir.SortedPostings(p)
}
if hints != nil {
mint = hints.Start
maxt = hints.End
disableTrimming = hints.DisableTrimming
p = index.NewLimitedPostings(p, hints.Limit)
if hints.Func == "series" {
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
return newBlockSeriesSet(index, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming)
return newBlockSeriesSet(ir, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming)
}
}
return newBlockSeriesSet(index, chunks, tombstones, p, mint, maxt, disableTrimming)
return newBlockSeriesSet(ir, chunks, tombstones, p, mint, maxt, disableTrimming)
}
// blockChunkQuerier provides chunk querying access to a single block database.

View file

@ -684,6 +684,96 @@ func TestBlockQuerierDelete(t *testing.T) {
}
}
func TestBlockQuerierLimit(t *testing.T) {
tmpdir := t.TempDir()
ctx := context.Background()
var (
allValues []string
allNames = []string{"__name__"}
seriesEntries []storage.Series
)
for i := 0; i < 5; i++ {
value := fmt.Sprintf("value%d", i)
name := fmt.Sprintf("labelName%d", i)
allValues = append(allValues, value)
allNames = append(allNames, name)
seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings(
"__name__", value, name, value,
), []chunks.Sample{sample{100, 0, nil, nil}}))
}
blockDir := createBlock(t, tmpdir, seriesEntries)
// Check open err.
block, err := OpenBlock(nil, blockDir, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, block.Close()) })
q, err := NewBlockQuerier(block, 0, 100)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, q.Close()) })
type testCase struct {
limit int
expectedSeries []storage.Series
expectedLabelValues []string
expectedLabelNames []string
}
testCases := map[string]testCase{
"without limit": {
expectedSeries: seriesEntries,
expectedLabelValues: allValues,
expectedLabelNames: allNames,
},
"with limit": {
limit: 2,
expectedSeries: seriesEntries[:2],
expectedLabelValues: allValues[:2],
expectedLabelNames: allNames[:2],
},
}
for tName, tc := range testCases {
t.Run(fmt.Sprintf("label values %s", tName), func(t *testing.T) {
values, _, err := q.LabelValues(ctx, "__name__", &storage.LabelHints{
Limit: tc.limit,
})
require.NoError(t, err)
require.Equal(t, tc.expectedLabelValues, values)
})
t.Run(fmt.Sprintf("label names %s", tName), func(t *testing.T) {
names, _, err := q.LabelNames(ctx, &storage.LabelHints{
Limit: tc.limit,
})
require.NoError(t, err)
require.Equal(t, tc.expectedLabelNames, names)
})
t.Run(fmt.Sprintf("select %s", tName), func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchRegexp, "__name__", "value.*")
set := q.Select(ctx, true, &storage.SelectHints{
Start: 0,
End: 100,
Limit: tc.limit,
}, matcher)
var s []storage.Series
for set.Next() {
s = append(s, set.At())
}
require.NoError(t, err)
require.Equal(t, len(tc.expectedSeries), len(s))
for i, exp := range tc.expectedSeries {
require.True(t, labels.Equal(exp.Labels(), s[i].Labels()))
}
})
}
}
type fakeChunksReader struct {
ChunkReader
chks map[chunks.ChunkRef]chunkenc.Chunk