mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Add context argument to LabelQuerier.LabelValues (#12665)
Add context argument to LabelQuerier.LabelValues and LabelQuerier.SortedLabelValues. Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
parent
e21ff116d8
commit
156222cc50
|
@ -512,7 +512,7 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
|
|||
|
||||
postingInfos = postingInfos[:0]
|
||||
for _, n := range allLabelNames {
|
||||
values, err := ir.SortedLabelValues(n)
|
||||
values, err := ir.SortedLabelValues(ctx, n)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -528,7 +528,7 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
|
|||
|
||||
postingInfos = postingInfos[:0]
|
||||
for _, n := range allLabelNames {
|
||||
lv, err := ir.SortedLabelValues(n)
|
||||
lv, err := ir.SortedLabelValues(ctx, n)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -538,7 +538,7 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
|
|||
printInfo(postingInfos)
|
||||
|
||||
postingInfos = postingInfos[:0]
|
||||
lv, err := ir.SortedLabelValues("__name__")
|
||||
lv, err := ir.SortedLabelValues(ctx, "__name__")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -198,7 +198,7 @@ func (q *errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*lab
|
|||
return errSeriesSet{err: q.err}
|
||||
}
|
||||
|
||||
func (*errQuerier) LabelValues(string, ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
||||
func (*errQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -233,7 +233,7 @@ func (errQuerier) Select(context.Context, bool, *storage.SelectHints, ...*labels
|
|||
return storage.ErrSeriesSet(errSelect)
|
||||
}
|
||||
|
||||
func (errQuerier) LabelValues(string, ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
||||
func (errQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
||||
return nil, nil, errors.New("label values error")
|
||||
}
|
||||
|
||||
|
|
|
@ -118,7 +118,7 @@ type MockQuerier struct {
|
|||
SelectMockFunction func(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) SeriesSet
|
||||
}
|
||||
|
||||
func (q *MockQuerier) LabelValues(string, ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
func (q *MockQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
|
@ -157,7 +157,7 @@ type LabelQuerier interface {
|
|||
// It is not safe to use the strings beyond the lifetime of the querier.
|
||||
// If matchers are specified the returned result set is reduced
|
||||
// to label values of metrics matching the matchers.
|
||||
LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error)
|
||||
LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, Warnings, error)
|
||||
|
||||
// LabelNames returns all the unique label names present in the block in sorted order.
|
||||
// If matchers are specified the returned result set is reduced
|
||||
|
|
|
@ -158,8 +158,8 @@ func (l labelGenericQueriers) SplitByHalf() (labelGenericQueriers, labelGenericQ
|
|||
// LabelValues returns all potential values for a label name.
|
||||
// If matchers are specified the returned result set is reduced
|
||||
// to label values of metrics matching the matchers.
|
||||
func (q *mergeGenericQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
res, ws, err := q.lvals(q.queriers, name, matchers...)
|
||||
func (q *mergeGenericQuerier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
res, ws, err := q.lvals(ctx, q.queriers, name, matchers...)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("LabelValues() from merge generic querier for label %s: %w", name, err)
|
||||
}
|
||||
|
@ -167,22 +167,22 @@ func (q *mergeGenericQuerier) LabelValues(name string, matchers ...*labels.Match
|
|||
}
|
||||
|
||||
// lvals performs merge sort for LabelValues from multiple queriers.
|
||||
func (q *mergeGenericQuerier) lvals(lq labelGenericQueriers, n string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
func (q *mergeGenericQuerier) lvals(ctx context.Context, lq labelGenericQueriers, n string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
if lq.Len() == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
if lq.Len() == 1 {
|
||||
return lq.Get(0).LabelValues(n, matchers...)
|
||||
return lq.Get(0).LabelValues(ctx, n, matchers...)
|
||||
}
|
||||
a, b := lq.SplitByHalf()
|
||||
|
||||
var ws Warnings
|
||||
s1, w, err := q.lvals(a, n, matchers...)
|
||||
s1, w, err := q.lvals(ctx, a, n, matchers...)
|
||||
ws = append(ws, w...)
|
||||
if err != nil {
|
||||
return nil, ws, err
|
||||
}
|
||||
s2, ws, err := q.lvals(b, n, matchers...)
|
||||
s2, ws, err := q.lvals(ctx, b, n, matchers...)
|
||||
ws = append(ws, w...)
|
||||
if err != nil {
|
||||
return nil, ws, err
|
||||
|
|
|
@ -990,7 +990,7 @@ func (m *mockGenericQuerier) Select(_ context.Context, b bool, _ *SelectHints, _
|
|||
return &mockGenericSeriesSet{resp: m.resp, warnings: m.warnings, err: m.err}
|
||||
}
|
||||
|
||||
func (m *mockGenericQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
func (m *mockGenericQuerier) LabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
m.mtx.Lock()
|
||||
m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{
|
||||
name: name,
|
||||
|
@ -1215,7 +1215,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
|
|||
}
|
||||
})
|
||||
t.Run("LabelValues", func(t *testing.T) {
|
||||
res, w, err := q.LabelValues("test")
|
||||
res, w, err := q.LabelValues(ctx, "test")
|
||||
require.Equal(t, tcase.expectedWarnings[2], w)
|
||||
require.True(t, errors.Is(err, tcase.expectedErrs[2]), "expected error doesn't match")
|
||||
require.Equal(t, tcase.expectedLabels, res)
|
||||
|
@ -1231,7 +1231,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
|
|||
})
|
||||
t.Run("LabelValuesWithMatchers", func(t *testing.T) {
|
||||
matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue")
|
||||
res, w, err := q.LabelValues("test2", matcher)
|
||||
res, w, err := q.LabelValues(ctx, "test2", matcher)
|
||||
require.Equal(t, tcase.expectedWarnings[3], w)
|
||||
require.True(t, errors.Is(err, tcase.expectedErrs[3]), "expected error doesn't match")
|
||||
require.Equal(t, tcase.expectedLabels, res)
|
||||
|
|
|
@ -30,7 +30,7 @@ func (noopQuerier) Select(context.Context, bool, *SelectHints, ...*labels.Matche
|
|||
return NoopSeriesSet()
|
||||
}
|
||||
|
||||
func (noopQuerier) LabelValues(string, ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
func (noopQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ func (noopChunkQuerier) Select(context.Context, bool, *SelectHints, ...*labels.M
|
|||
return NoopChunkedSeriesSet()
|
||||
}
|
||||
|
||||
func (noopChunkQuerier) LabelValues(string, ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
func (noopChunkQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -209,7 +209,7 @@ func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, []s
|
|||
}
|
||||
|
||||
// LabelValues implements storage.Querier and is a noop.
|
||||
func (q *querier) LabelValues(string, ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
||||
func (q *querier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
||||
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
|
||||
return nil, nil, errors.New("not implemented")
|
||||
}
|
||||
|
|
|
@ -48,8 +48,8 @@ func newSecondaryQuerierFromChunk(cq ChunkQuerier) genericQuerier {
|
|||
return &secondaryQuerier{genericQuerier: newGenericQuerierFromChunk(cq)}
|
||||
}
|
||||
|
||||
func (s *secondaryQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
vals, w, err := s.genericQuerier.LabelValues(name, matchers...)
|
||||
func (s *secondaryQuerier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
|
||||
vals, w, err := s.genericQuerier.LabelValues(ctx, name, matchers...)
|
||||
if err != nil {
|
||||
return nil, append([]error{err}, w...), nil
|
||||
}
|
||||
|
|
|
@ -65,10 +65,10 @@ type IndexReader interface {
|
|||
Symbols() index.StringIter
|
||||
|
||||
// SortedLabelValues returns sorted possible label values.
|
||||
SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error)
|
||||
SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)
|
||||
|
||||
// LabelValues returns possible label values which may not be sorted.
|
||||
LabelValues(name string, matchers ...*labels.Matcher) ([]string, error)
|
||||
LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)
|
||||
|
||||
// Postings returns the postings list iterator for the label pairs.
|
||||
// The Postings here contain the offsets to the series inside the index.
|
||||
|
@ -91,7 +91,7 @@ type IndexReader interface {
|
|||
// LabelValueFor returns label value for the given label name in the series referred to by ID.
|
||||
// If the series couldn't be found or the series doesn't have the requested label a
|
||||
// storage.ErrNotFound is returned as error.
|
||||
LabelValueFor(id storage.SeriesRef, label string) (string, error)
|
||||
LabelValueFor(ctx context.Context, id storage.SeriesRef, label string) (string, error)
|
||||
|
||||
// LabelNamesFor returns all the label names for the series referred to by IDs.
|
||||
// The names returned are sorted.
|
||||
|
@ -455,14 +455,14 @@ func (r blockIndexReader) Symbols() index.StringIter {
|
|||
return r.ir.Symbols()
|
||||
}
|
||||
|
||||
func (r blockIndexReader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
func (r blockIndexReader) SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
var st []string
|
||||
var err error
|
||||
|
||||
if len(matchers) == 0 {
|
||||
st, err = r.ir.SortedLabelValues(name)
|
||||
st, err = r.ir.SortedLabelValues(ctx, name)
|
||||
} else {
|
||||
st, err = r.LabelValues(name, matchers...)
|
||||
st, err = r.LabelValues(ctx, name, matchers...)
|
||||
if err == nil {
|
||||
slices.Sort(st)
|
||||
}
|
||||
|
@ -471,13 +471,13 @@ func (r blockIndexReader) SortedLabelValues(name string, matchers ...*labels.Mat
|
|||
return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
|
||||
}
|
||||
|
||||
func (r blockIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
func (r blockIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
if len(matchers) == 0 {
|
||||
st, err := r.ir.LabelValues(name)
|
||||
st, err := r.ir.LabelValues(ctx, name)
|
||||
return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
|
||||
}
|
||||
|
||||
return labelValuesWithMatchers(r.ir, name, matchers...)
|
||||
return labelValuesWithMatchers(ctx, r.ir, name, matchers...)
|
||||
}
|
||||
|
||||
func (r blockIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error) {
|
||||
|
@ -513,8 +513,8 @@ func (r blockIndexReader) Close() error {
|
|||
}
|
||||
|
||||
// LabelValueFor returns label value for the given label name in the series referred to by ID.
|
||||
func (r blockIndexReader) LabelValueFor(id storage.SeriesRef, label string) (string, error) {
|
||||
return r.ir.LabelValueFor(id, label)
|
||||
func (r blockIndexReader) LabelValueFor(ctx context.Context, id storage.SeriesRef, label string) (string, error) {
|
||||
return r.ir.LabelValueFor(ctx, id, label)
|
||||
}
|
||||
|
||||
// LabelNamesFor returns all the label names for the series referred to by IDs.
|
||||
|
|
|
@ -211,6 +211,7 @@ func TestCorruptedChunk(t *testing.T) {
|
|||
|
||||
func TestLabelValuesWithMatchers(t *testing.T) {
|
||||
tmpdir := t.TempDir()
|
||||
ctx := context.Background()
|
||||
|
||||
var seriesEntries []storage.Series
|
||||
for i := 0; i < 100; i++ {
|
||||
|
@ -265,11 +266,11 @@ func TestLabelValuesWithMatchers(t *testing.T) {
|
|||
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
actualValues, err := indexReader.SortedLabelValues(tt.labelName, tt.matchers...)
|
||||
actualValues, err := indexReader.SortedLabelValues(ctx, tt.labelName, tt.matchers...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.expectedValues, actualValues)
|
||||
|
||||
actualValues, err = indexReader.LabelValues(tt.labelName, tt.matchers...)
|
||||
actualValues, err = indexReader.LabelValues(ctx, tt.labelName, tt.matchers...)
|
||||
sort.Strings(actualValues)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.expectedValues, actualValues)
|
||||
|
@ -365,6 +366,7 @@ func TestReadIndexFormatV1(t *testing.T) {
|
|||
|
||||
func BenchmarkLabelValuesWithMatchers(b *testing.B) {
|
||||
tmpdir := b.TempDir()
|
||||
ctx := context.Background()
|
||||
|
||||
var seriesEntries []storage.Series
|
||||
metricCount := 1000000
|
||||
|
@ -398,7 +400,7 @@ func BenchmarkLabelValuesWithMatchers(b *testing.B) {
|
|||
b.ReportAllocs()
|
||||
|
||||
for benchIdx := 0; benchIdx < b.N; benchIdx++ {
|
||||
actualValues, err := indexReader.LabelValues("b_tens", matchers...)
|
||||
actualValues, err := indexReader.LabelValues(ctx, "b_tens", matchers...)
|
||||
require.NoError(b, err)
|
||||
require.Equal(b, 9, len(actualValues))
|
||||
}
|
||||
|
|
|
@ -1000,7 +1000,7 @@ func TestWALFlushedOnDBClose(t *testing.T) {
|
|||
q, err := db.Querier(0, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
values, ws, err := q.LabelValues("labelname")
|
||||
values, ws, err := q.LabelValues(ctx, "labelname")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(ws))
|
||||
require.Equal(t, []string{"labelvalue"}, values)
|
||||
|
@ -1930,7 +1930,7 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
|
|||
defer q.Close()
|
||||
|
||||
// The requested interval covers 2 blocks, so the querier's label values for blockID should give us 2 values, one from each block.
|
||||
b, ws, err := q.LabelValues("blockID")
|
||||
b, ws, err := q.LabelValues(ctx, "blockID")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, storage.Warnings(nil), ws)
|
||||
require.Equal(t, []string{"1", "2"}, b)
|
||||
|
|
|
@ -62,8 +62,8 @@ func (h *headIndexReader) Symbols() index.StringIter {
|
|||
// specific label name that are within the time range mint to maxt.
|
||||
// If matchers are specified the returned result set is reduced
|
||||
// to label values of metrics matching the matchers.
|
||||
func (h *headIndexReader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
values, err := h.LabelValues(name, matchers...)
|
||||
func (h *headIndexReader) SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
values, err := h.LabelValues(ctx, name, matchers...)
|
||||
if err == nil {
|
||||
slices.Sort(values)
|
||||
}
|
||||
|
@ -74,16 +74,16 @@ func (h *headIndexReader) SortedLabelValues(name string, matchers ...*labels.Mat
|
|||
// specific label name that are within the time range mint to maxt.
|
||||
// If matchers are specified the returned result set is reduced
|
||||
// to label values of metrics matching the matchers.
|
||||
func (h *headIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
func (h *headIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
if len(matchers) == 0 {
|
||||
return h.head.postings.LabelValues(name), nil
|
||||
return h.head.postings.LabelValues(ctx, name), nil
|
||||
}
|
||||
|
||||
return labelValuesWithMatchers(h, name, matchers...)
|
||||
return labelValuesWithMatchers(ctx, h, name, matchers...)
|
||||
}
|
||||
|
||||
// LabelNames returns all the unique label names present in the head
|
||||
|
@ -216,7 +216,7 @@ func (s *memSeries) oooHeadChunkID(pos int) chunks.HeadChunkID {
|
|||
}
|
||||
|
||||
// LabelValueFor returns label value for the given label name in the series referred to by ID.
|
||||
func (h *headIndexReader) LabelValueFor(id storage.SeriesRef, label string) (string, error) {
|
||||
func (h *headIndexReader) LabelValueFor(_ context.Context, id storage.SeriesRef, label string) (string, error) {
|
||||
memSeries := h.head.series.getByID(chunks.HeadSeriesRef(id))
|
||||
if memSeries == nil {
|
||||
return "", storage.ErrNotFound
|
||||
|
|
|
@ -806,6 +806,8 @@ func TestHead_Truncate(t *testing.T) {
|
|||
|
||||
h.initTime(0)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
s1, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1"))
|
||||
s2, _, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1"))
|
||||
s3, _, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2"))
|
||||
|
@ -874,7 +876,7 @@ func TestHead_Truncate(t *testing.T) {
|
|||
ss = map[string]struct{}{}
|
||||
values[name] = ss
|
||||
}
|
||||
for _, value := range h.postings.LabelValues(name) {
|
||||
for _, value := range h.postings.LabelValues(ctx, name) {
|
||||
ss[value] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
@ -2640,7 +2642,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
|
|||
ctx = context.Background()
|
||||
)
|
||||
|
||||
app := head.Appender(context.Background())
|
||||
app := head.Appender(ctx)
|
||||
for i, name := range expectedLabelNames {
|
||||
_, err := app.Append(0, labels.FromStrings(name, expectedLabelValues[i]), seriesTimestamps[i], 0)
|
||||
require.NoError(t, err)
|
||||
|
@ -2670,7 +2672,7 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
|
|||
require.Equal(t, tt.expectedNames, actualLabelNames)
|
||||
if len(tt.expectedValues) > 0 {
|
||||
for i, name := range expectedLabelNames {
|
||||
actualLabelValue, err := headIdxReader.SortedLabelValues(name)
|
||||
actualLabelValue, err := headIdxReader.SortedLabelValues(ctx, name)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []string{tt.expectedValues[i]}, actualLabelValue)
|
||||
}
|
||||
|
@ -2683,6 +2685,8 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) {
|
|||
head, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
|
||||
t.Cleanup(func() { require.NoError(t, head.Close()) })
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
app := head.Appender(context.Background())
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := app.Append(0, labels.FromStrings(
|
||||
|
@ -2726,11 +2730,11 @@ func TestHeadLabelValuesWithMatchers(t *testing.T) {
|
|||
t.Run(tt.name, func(t *testing.T) {
|
||||
headIdxReader := head.indexRange(0, 200)
|
||||
|
||||
actualValues, err := headIdxReader.SortedLabelValues(tt.labelName, tt.matchers...)
|
||||
actualValues, err := headIdxReader.SortedLabelValues(ctx, tt.labelName, tt.matchers...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.expectedValues, actualValues)
|
||||
|
||||
actualValues, err = headIdxReader.LabelValues(tt.labelName, tt.matchers...)
|
||||
actualValues, err = headIdxReader.LabelValues(ctx, tt.labelName, tt.matchers...)
|
||||
sort.Strings(actualValues)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.expectedValues, actualValues)
|
||||
|
@ -2902,6 +2906,8 @@ func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) {
|
|||
head, _ := newTestHead(b, chunkRange, wlog.CompressionNone, false)
|
||||
b.Cleanup(func() { require.NoError(b, head.Close()) })
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
app := head.Appender(context.Background())
|
||||
|
||||
metricCount := 1000000
|
||||
|
@ -2922,7 +2928,7 @@ func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) {
|
|||
b.ReportAllocs()
|
||||
|
||||
for benchIdx := 0; benchIdx < b.N; benchIdx++ {
|
||||
actualValues, err := headIdxReader.LabelValues("b_tens", matchers...)
|
||||
actualValues, err := headIdxReader.LabelValues(ctx, "b_tens", matchers...)
|
||||
require.NoError(b, err)
|
||||
require.Equal(b, 9, len(actualValues))
|
||||
}
|
||||
|
|
|
@ -1457,8 +1457,8 @@ func (r *Reader) SymbolTableSize() uint64 {
|
|||
// SortedLabelValues returns value tuples that exist for the given label name.
|
||||
// It is not safe to use the return value beyond the lifetime of the byte slice
|
||||
// passed into the Reader.
|
||||
func (r *Reader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
values, err := r.LabelValues(name, matchers...)
|
||||
func (r *Reader) SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
values, err := r.LabelValues(ctx, name, matchers...)
|
||||
if err == nil && r.version == FormatV1 {
|
||||
slices.Sort(values)
|
||||
}
|
||||
|
@ -1469,7 +1469,7 @@ func (r *Reader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]
|
|||
// It is not safe to use the return value beyond the lifetime of the byte slice
|
||||
// passed into the Reader.
|
||||
// TODO(replay): Support filtering by matchers
|
||||
func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
func (r *Reader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
if len(matchers) > 0 {
|
||||
return nil, errors.Errorf("matchers parameter is not implemented: %+v", matchers)
|
||||
}
|
||||
|
@ -1500,7 +1500,7 @@ func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string
|
|||
lastVal := e[len(e)-1].value
|
||||
|
||||
skip := 0
|
||||
for d.Err() == nil {
|
||||
for d.Err() == nil && ctx.Err() == nil {
|
||||
if skip == 0 {
|
||||
// These are always the same number of bytes,
|
||||
// and it's faster to skip than parse.
|
||||
|
@ -1521,7 +1521,8 @@ func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string
|
|||
if d.Err() != nil {
|
||||
return nil, errors.Wrap(d.Err(), "get postings offset entry")
|
||||
}
|
||||
return values, nil
|
||||
|
||||
return values, ctx.Err()
|
||||
}
|
||||
|
||||
// LabelNamesFor returns all the label names for the series referred to by IDs.
|
||||
|
@ -1572,7 +1573,7 @@ func (r *Reader) LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([
|
|||
}
|
||||
|
||||
// LabelValueFor returns label value for the given label name in the series referred to by ID.
|
||||
func (r *Reader) LabelValueFor(id storage.SeriesRef, label string) (string, error) {
|
||||
func (r *Reader) LabelValueFor(ctx context.Context, id storage.SeriesRef, label string) (string, error) {
|
||||
offset := id
|
||||
// In version 2 series IDs are no longer exact references but series are 16-byte padded
|
||||
// and the ID is the multiple of 16 of the actual position.
|
||||
|
@ -1585,7 +1586,7 @@ func (r *Reader) LabelValueFor(id storage.SeriesRef, label string) (string, erro
|
|||
return "", errors.Wrap(d.Err(), "label values for")
|
||||
}
|
||||
|
||||
value, err := r.dec.LabelValueFor(buf, label)
|
||||
value, err := r.dec.LabelValueFor(ctx, buf, label)
|
||||
if err != nil {
|
||||
return "", storage.ErrNotFound
|
||||
}
|
||||
|
@ -1810,7 +1811,7 @@ func (dec *Decoder) LabelNamesOffsetsFor(b []byte) ([]uint32, error) {
|
|||
}
|
||||
|
||||
// LabelValueFor decodes a label for a given series.
|
||||
func (dec *Decoder) LabelValueFor(b []byte, label string) (string, error) {
|
||||
func (dec *Decoder) LabelValueFor(ctx context.Context, b []byte, label string) (string, error) {
|
||||
d := encoding.Decbuf{B: b}
|
||||
k := d.Uvarint()
|
||||
|
||||
|
@ -1822,13 +1823,13 @@ func (dec *Decoder) LabelValueFor(b []byte, label string) (string, error) {
|
|||
return "", errors.Wrap(d.Err(), "read series label offsets")
|
||||
}
|
||||
|
||||
ln, err := dec.LookupSymbol(context.TODO(), lno)
|
||||
ln, err := dec.LookupSymbol(ctx, lno)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "lookup label name")
|
||||
}
|
||||
|
||||
if ln == label {
|
||||
lv, err := dec.LookupSymbol(context.TODO(), lvo)
|
||||
lv, err := dec.LookupSymbol(ctx, lvo)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "lookup label value")
|
||||
}
|
||||
|
|
|
@ -93,7 +93,7 @@ func (m mockIndex) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m mockIndex) LabelValues(name string) ([]string, error) {
|
||||
func (m mockIndex) LabelValues(_ context.Context, name string) ([]string, error) {
|
||||
values := []string{}
|
||||
for l := range m.postings {
|
||||
if l.Name == name {
|
||||
|
@ -449,7 +449,7 @@ func TestPersistence_index_e2e(t *testing.T) {
|
|||
for k, v := range labelPairs {
|
||||
sort.Strings(v)
|
||||
|
||||
res, err := ir.SortedLabelValues(k)
|
||||
res, err := ir.SortedLabelValues(ctx, k)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(v), len(res))
|
||||
|
|
|
@ -136,7 +136,7 @@ func (p *MemPostings) LabelNames() []string {
|
|||
}
|
||||
|
||||
// LabelValues returns label values for the given name.
|
||||
func (p *MemPostings) LabelValues(name string) []string {
|
||||
func (p *MemPostings) LabelValues(_ context.Context, name string) []string {
|
||||
p.mtx.RLock()
|
||||
defer p.mtx.RUnlock()
|
||||
|
||||
|
|
|
@ -158,16 +158,16 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
|
|||
// LabelValues needs to be overridden from the headIndexReader implementation due
|
||||
// to the check that happens at the beginning where we make sure that the query
|
||||
// interval overlaps with the head minooot and maxooot.
|
||||
func (oh *OOOHeadIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
func (oh *OOOHeadIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
if oh.maxt < oh.head.MinOOOTime() || oh.mint > oh.head.MaxOOOTime() {
|
||||
return []string{}, nil
|
||||
}
|
||||
|
||||
if len(matchers) == 0 {
|
||||
return oh.head.postings.LabelValues(name), nil
|
||||
return oh.head.postings.LabelValues(ctx, name), nil
|
||||
}
|
||||
|
||||
return labelValuesWithMatchers(oh, name, matchers...)
|
||||
return labelValuesWithMatchers(ctx, oh, name, matchers...)
|
||||
}
|
||||
|
||||
type chunkMetaAndChunkDiskMapperRef struct {
|
||||
|
@ -414,15 +414,15 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l
|
|||
return ir.ch.oooIR.series(ref, builder, chks, ir.ch.lastMmapRef)
|
||||
}
|
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
func (ir *OOOCompactionHeadIndexReader) LabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) PostingsForMatchers(concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
|
||||
func (ir *OOOCompactionHeadIndexReader) PostingsForMatchers(_ context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
|
@ -430,7 +430,7 @@ func (ir *OOOCompactionHeadIndexReader) LabelNames(context.Context, ...*labels.M
|
|||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (ir *OOOCompactionHeadIndexReader) LabelValueFor(id storage.SeriesRef, label string) (string, error) {
|
||||
func (ir *OOOCompactionHeadIndexReader) LabelValueFor(context.Context, storage.SeriesRef, string) (string, error) {
|
||||
return "", errors.New("not implemented")
|
||||
}
|
||||
|
||||
|
|
|
@ -378,6 +378,8 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
|
|||
head, _ := newTestHead(t, chunkRange, wlog.CompressionNone, true)
|
||||
t.Cleanup(func() { require.NoError(t, head.Close()) })
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
app := head.Appender(context.Background())
|
||||
|
||||
// Add in-order samples
|
||||
|
@ -437,24 +439,24 @@ func TestOOOHeadChunkReader_LabelValues(t *testing.T) {
|
|||
// We first want to test using a head index reader that covers the biggest query interval
|
||||
oh := NewOOOHeadIndexReader(head, tc.queryMinT, tc.queryMaxT)
|
||||
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")}
|
||||
values, err := oh.LabelValues("foo", matchers...)
|
||||
values, err := oh.LabelValues(ctx, "foo", matchers...)
|
||||
sort.Strings(values)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expValues1, values)
|
||||
|
||||
matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^bar.")}
|
||||
values, err = oh.LabelValues("foo", matchers...)
|
||||
values, err = oh.LabelValues(ctx, "foo", matchers...)
|
||||
sort.Strings(values)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expValues2, values)
|
||||
|
||||
matchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.")}
|
||||
values, err = oh.LabelValues("foo", matchers...)
|
||||
values, err = oh.LabelValues(ctx, "foo", matchers...)
|
||||
sort.Strings(values)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expValues3, values)
|
||||
|
||||
values, err = oh.LabelValues("foo")
|
||||
values, err = oh.LabelValues(ctx, "foo")
|
||||
sort.Strings(values)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expValues4, values)
|
||||
|
|
|
@ -89,8 +89,8 @@ func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, er
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (q *blockBaseQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
||||
res, err := q.index.SortedLabelValues(name, matchers...)
|
||||
func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
||||
res, err := q.index.SortedLabelValues(ctx, name, matchers...)
|
||||
return res, nil, err
|
||||
}
|
||||
|
||||
|
@ -374,7 +374,7 @@ func postingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Postings, erro
|
|||
}
|
||||
}
|
||||
|
||||
vals, err := ix.LabelValues(m.Name)
|
||||
vals, err := ix.LabelValues(context.TODO(), m.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -411,7 +411,7 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting
|
|||
return ix.Postings(context.TODO(), m.Name, m.Value)
|
||||
}
|
||||
|
||||
vals, err := ix.LabelValues(m.Name)
|
||||
vals, err := ix.LabelValues(context.TODO(), m.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -431,13 +431,13 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting
|
|||
return ix.Postings(context.TODO(), m.Name, res...)
|
||||
}
|
||||
|
||||
func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
p, err := PostingsForMatchers(r, matchers...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "fetching postings for matchers")
|
||||
}
|
||||
|
||||
allValues, err := r.LabelValues(name)
|
||||
allValues, err := r.LabelValues(ctx, name)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "fetching values of label %s", name)
|
||||
}
|
||||
|
@ -463,7 +463,7 @@ func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Mat
|
|||
|
||||
valuesPostings := make([]index.Postings, len(allValues))
|
||||
for i, value := range allValues {
|
||||
valuesPostings[i], err = r.Postings(context.TODO(), name, value)
|
||||
valuesPostings[i], err = r.Postings(ctx, name, value)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "fetching postings for %s=%q", name, value)
|
||||
}
|
||||
|
|
|
@ -183,6 +183,8 @@ func benchmarkLabelValuesWithMatchers(b *testing.B, ir IndexReader) {
|
|||
nX := labels.MustNewMatcher(labels.MatchNotEqual, "n", "X"+postingsBenchSuffix)
|
||||
nPlus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^.+$")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
labelName string
|
||||
|
@ -205,7 +207,7 @@ func benchmarkLabelValuesWithMatchers(b *testing.B, ir IndexReader) {
|
|||
for _, c := range cases {
|
||||
b.Run(c.name, func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := labelValuesWithMatchers(ir, c.labelName, c.matchers...)
|
||||
_, err := labelValuesWithMatchers(ctx, ir, c.labelName, c.matchers...)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
})
|
||||
|
|
|
@ -1453,13 +1453,13 @@ func (m mockIndex) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m mockIndex) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
values, _ := m.LabelValues(name, matchers...)
|
||||
func (m mockIndex) SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
values, _ := m.LabelValues(ctx, name, matchers...)
|
||||
sort.Strings(values)
|
||||
return values, nil
|
||||
}
|
||||
|
||||
func (m mockIndex) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
func (m mockIndex) LabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
var values []string
|
||||
|
||||
if len(matchers) == 0 {
|
||||
|
@ -1483,7 +1483,7 @@ func (m mockIndex) LabelValues(name string, matchers ...*labels.Matcher) ([]stri
|
|||
return values, nil
|
||||
}
|
||||
|
||||
func (m mockIndex) LabelValueFor(id storage.SeriesRef, label string) (string, error) {
|
||||
func (m mockIndex) LabelValueFor(_ context.Context, id storage.SeriesRef, label string) (string, error) {
|
||||
return m.series[id].l.Get(label), nil
|
||||
}
|
||||
|
||||
|
@ -2238,7 +2238,7 @@ func TestQuerierIndexQueriesRace(t *testing.T) {
|
|||
q, err := db.Querier(math.MinInt64, math.MaxInt64)
|
||||
require.NoError(t, err)
|
||||
|
||||
values, _, err := q.LabelValues("seq", c.matchers...)
|
||||
values, _, err := q.LabelValues(ctx, "seq", c.matchers...)
|
||||
require.NoError(t, err)
|
||||
require.Emptyf(t, values, `label values for label "seq" should be empty`)
|
||||
}
|
||||
|
@ -2436,16 +2436,16 @@ func (m mockMatcherIndex) Symbols() index.StringIter { return nil }
|
|||
func (m mockMatcherIndex) Close() error { return nil }
|
||||
|
||||
// SortedLabelValues will return error if it is called.
|
||||
func (m mockMatcherIndex) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
func (m mockMatcherIndex) SortedLabelValues(context.Context, string, ...*labels.Matcher) ([]string, error) {
|
||||
return []string{}, errors.New("sorted label values called")
|
||||
}
|
||||
|
||||
// LabelValues will return error if it is called.
|
||||
func (m mockMatcherIndex) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
func (m mockMatcherIndex) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, error) {
|
||||
return []string{}, errors.New("label values called")
|
||||
}
|
||||
|
||||
func (m mockMatcherIndex) LabelValueFor(id storage.SeriesRef, label string) (string, error) {
|
||||
func (m mockMatcherIndex) LabelValueFor(context.Context, storage.SeriesRef, string) (string, error) {
|
||||
return "", errors.New("label value for called")
|
||||
}
|
||||
|
||||
|
|
|
@ -749,7 +749,7 @@ func (api *API) labelValues(r *http.Request) (result apiFuncResult) {
|
|||
var callWarnings storage.Warnings
|
||||
labelValuesSet := make(map[string]struct{})
|
||||
for _, matchers := range matcherSets {
|
||||
vals, callWarnings, err = q.LabelValues(name, matchers...)
|
||||
vals, callWarnings, err = q.LabelValues(ctx, name, matchers...)
|
||||
if err != nil {
|
||||
return apiFuncResult{nil, &apiError{errorExec, err}, warnings, closer}
|
||||
}
|
||||
|
@ -764,7 +764,7 @@ func (api *API) labelValues(r *http.Request) (result apiFuncResult) {
|
|||
vals = append(vals, val)
|
||||
}
|
||||
} else {
|
||||
vals, warnings, err = q.LabelValues(name)
|
||||
vals, warnings, err = q.LabelValues(ctx, name)
|
||||
if err != nil {
|
||||
return apiFuncResult{nil, &apiError{errorExec, err}, warnings, closer}
|
||||
}
|
||||
|
|
|
@ -170,7 +170,7 @@ type errorTestQuerier struct {
|
|||
err error
|
||||
}
|
||||
|
||||
func (t errorTestQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
||||
func (t errorTestQuerier) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, storage.Warnings, error) {
|
||||
return nil, nil, t.err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue