mirror of
https://github.com/prometheus/prometheus.git
synced 2024-09-19 23:37:31 -07:00
Compare commits
2 commits
98f007e442
...
443c9b8158
Author | SHA1 | Date | |
---|---|---|---|
443c9b8158 | |||
186fdab732 |
|
@ -79,11 +79,13 @@ func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, er
|
|||
|
||||
func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
res, err := q.index.SortedLabelValues(ctx, name, matchers...)
|
||||
res = truncateToLimit(res, hints)
|
||||
return res, nil, err
|
||||
}
|
||||
|
||||
func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
res, err := q.index.LabelNames(ctx, matchers...)
|
||||
res = truncateToLimit(res, hints)
|
||||
return res, nil, err
|
||||
}
|
||||
|
||||
|
@ -101,6 +103,13 @@ func (q *blockBaseQuerier) Close() error {
|
|||
return errs.Err()
|
||||
}
|
||||
|
||||
func truncateToLimit(s []string, hints *storage.LabelHints) []string {
|
||||
if hints != nil && hints.Limit > 0 && len(s) > hints.Limit {
|
||||
s = s[:hints.Limit]
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
type blockQuerier struct {
|
||||
*blockBaseQuerier
|
||||
}
|
||||
|
@ -139,6 +148,7 @@ func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.Select
|
|||
mint = hints.Start
|
||||
maxt = hints.End
|
||||
disableTrimming = hints.DisableTrimming
|
||||
p = truncatePostingsToLimit(hints.Limit, p)
|
||||
if hints.Func == "series" {
|
||||
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
|
||||
return newBlockSeriesSet(index, newNopChunkReader(), tombstones, p, mint, maxt, disableTrimming)
|
||||
|
@ -148,6 +158,25 @@ func selectSeriesSet(ctx context.Context, sortSeries bool, hints *storage.Select
|
|||
return newBlockSeriesSet(index, chunks, tombstones, p, mint, maxt, disableTrimming)
|
||||
}
|
||||
|
||||
// truncatePostingsToLimit truncates the postings returned by p to the limit.
|
||||
func truncatePostingsToLimit(limit int, p index.Postings) index.Postings {
|
||||
if limit <= 0 {
|
||||
return p
|
||||
}
|
||||
|
||||
out := make([]storage.SeriesRef, 0, 128)
|
||||
|
||||
for p.Next() {
|
||||
id := p.At()
|
||||
out = append(out, id)
|
||||
if len(out) >= limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return index.NewListPostings(out)
|
||||
}
|
||||
|
||||
// blockChunkQuerier provides chunk querying access to a single block database.
|
||||
type blockChunkQuerier struct {
|
||||
*blockBaseQuerier
|
||||
|
|
|
@ -684,6 +684,96 @@ func TestBlockQuerierDelete(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestBlockQuerierLimit(t *testing.T) {
|
||||
tmpdir := t.TempDir()
|
||||
ctx := context.Background()
|
||||
var (
|
||||
allValues []string
|
||||
allNames = []string{"__name__"}
|
||||
seriesEntries []storage.Series
|
||||
)
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
value := fmt.Sprintf("value%d", i)
|
||||
name := fmt.Sprintf("labelName%d", i)
|
||||
allValues = append(allValues, value)
|
||||
allNames = append(allNames, name)
|
||||
|
||||
seriesEntries = append(seriesEntries, storage.NewListSeries(labels.FromStrings(
|
||||
"__name__", value, name, value,
|
||||
), []chunks.Sample{sample{100, 0, nil, nil}}))
|
||||
}
|
||||
|
||||
blockDir := createBlock(t, tmpdir, seriesEntries)
|
||||
|
||||
// Check open err.
|
||||
block, err := OpenBlock(nil, blockDir, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, block.Close()) })
|
||||
|
||||
q, err := NewBlockQuerier(block, 0, 100)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, q.Close()) })
|
||||
|
||||
type testCase struct {
|
||||
limit int
|
||||
expectedSeries []storage.Series
|
||||
expectedLabelValues []string
|
||||
expectedLabelNames []string
|
||||
}
|
||||
|
||||
testCases := map[string]testCase{
|
||||
"without limit": {
|
||||
expectedSeries: seriesEntries,
|
||||
expectedLabelValues: allValues,
|
||||
expectedLabelNames: allNames,
|
||||
},
|
||||
"with limit": {
|
||||
limit: 2,
|
||||
expectedSeries: seriesEntries[:2],
|
||||
expectedLabelValues: allValues[:2],
|
||||
expectedLabelNames: allNames[:2],
|
||||
},
|
||||
}
|
||||
|
||||
for tName, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("label values %s", tName), func(t *testing.T) {
|
||||
values, _, err := q.LabelValues(ctx, "__name__", &storage.LabelHints{
|
||||
Limit: tc.limit,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expectedLabelValues, values)
|
||||
})
|
||||
|
||||
t.Run(fmt.Sprintf("label names %s", tName), func(t *testing.T) {
|
||||
names, _, err := q.LabelNames(ctx, &storage.LabelHints{
|
||||
Limit: tc.limit,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expectedLabelNames, names)
|
||||
})
|
||||
|
||||
t.Run(fmt.Sprintf("select %s", tName), func(t *testing.T) {
|
||||
matcher := labels.MustNewMatcher(labels.MatchRegexp, "__name__", "value.*")
|
||||
set := q.Select(ctx, true, &storage.SelectHints{
|
||||
Start: 0,
|
||||
End: 100,
|
||||
Limit: tc.limit,
|
||||
}, matcher)
|
||||
|
||||
var s []storage.Series
|
||||
for set.Next() {
|
||||
s = append(s, set.At())
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(tc.expectedSeries), len(s))
|
||||
for i, exp := range tc.expectedSeries {
|
||||
require.True(t, labels.Equal(exp.Labels(), s[i].Labels()))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeChunksReader struct {
|
||||
ChunkReader
|
||||
chks map[chunks.ChunkRef]chunkenc.Chunk
|
||||
|
|
Loading…
Reference in a new issue