Add matchers to LabelValues() call (#8400)

* Accept matchers in querier LabelValues()

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* create matcher to only select metrics which have searched label

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* test case for merge querier with matchers

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* test LabelValues with matchers on head

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* add test for LabelValues on block

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* formatting fix

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* Add comments

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* add missing lock release

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* remove unused parameter

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* Benchmarks for LabelValues() methods on block/head

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* Better comment

Co-authored-by: Julien Pivotto <roidelapluie@gmail.com>
Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* update comment

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* minor refactor make code cleaner

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* better comments

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* fix expected errors in test

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* Deleting parameter which can only be empty

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* fix comments

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* remove unnecessary lock

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* only lookup label value if label name was looked up

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* Return error when there is one

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* Call .Get() on decoder before checking errors

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* only lock head.symMtx when necessary

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* remove unnecessary delete()

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* re-use code instead of duplicating it

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* Consistently return error from LabelValueFor()

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* move helper func from util.go to querier.go

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* Fix test expectation

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>

* ensure result de-duplication and sorting works

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

* return named error from LabelValueFor()

Signed-off-by: Mauro Stettler <mauro.stettler@gmail.com>

Co-authored-by: Julien Pivotto <roidelapluie@gmail.com>
Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
This commit is contained in:
Mauro Stettler 2021-02-09 14:38:35 -03:00 committed by GitHub
parent 86c71856e8
commit 7715fe3219
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 496 additions and 81 deletions

View file

@ -181,9 +181,11 @@ type errQuerier struct {
func (q *errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage.SeriesSet {
return errSeriesSet{err: q.err}
}
func (*errQuerier) LabelValues(string) ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) Close() error { return nil }
func (*errQuerier) LabelValues(string, ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}
func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) Close() error { return nil }
// errSeriesSet implements storage.SeriesSet which always returns error.
type errSeriesSet struct {

View file

@ -230,7 +230,7 @@ func (errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) storage
return storage.ErrSeriesSet(errSelect)
}
func (errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
func (errQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, errors.New("label values error")
}

View file

@ -95,8 +95,9 @@ type ChunkQuerier interface {
type LabelQuerier interface {
// LabelValues returns all potential values for a label name.
// It is not safe to use the strings beyond the lifefime of the querier.
// TODO(yeya24): support matchers or hints.
LabelValues(name string) ([]string, Warnings, error)
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error)
// LabelNames returns all the unique label names present in the block in sorted order.
// TODO(yeya24): support matchers or hints.

View file

@ -155,8 +155,10 @@ func (l labelGenericQueriers) SplitByHalf() (labelGenericQueriers, labelGenericQ
}
// LabelValues returns all potential values for a label name.
func (q *mergeGenericQuerier) LabelValues(name string) ([]string, Warnings, error) {
res, ws, err := q.lvals(q.queriers, name)
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (q *mergeGenericQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
res, ws, err := q.lvals(q.queriers, name, matchers...)
if err != nil {
return nil, nil, errors.Wrapf(err, "LabelValues() from merge generic querier for label %s", name)
}
@ -164,22 +166,22 @@ func (q *mergeGenericQuerier) LabelValues(name string) ([]string, Warnings, erro
}
// lvals performs merge sort for LabelValues from multiple queriers.
func (q *mergeGenericQuerier) lvals(lq labelGenericQueriers, n string) ([]string, Warnings, error) {
func (q *mergeGenericQuerier) lvals(lq labelGenericQueriers, n string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
if lq.Len() == 0 {
return nil, nil, nil
}
if lq.Len() == 1 {
return lq.Get(0).LabelValues(n)
return lq.Get(0).LabelValues(n, matchers...)
}
a, b := lq.SplitByHalf()
var ws Warnings
s1, w, err := q.lvals(a, n)
s1, w, err := q.lvals(a, n, matchers...)
ws = append(ws, w...)
if err != nil {
return nil, ws, err
}
s2, ws, err := q.lvals(b, n)
s2, ws, err := q.lvals(b, n, matchers...)
ws = append(ws, w...)
if err != nil {
return nil, ws, err

View file

@ -719,7 +719,7 @@ type mockGenericQuerier struct {
closed bool
labelNamesCalls int
labelNamesRequested []string
labelNamesRequested []labelNameRequest
sortedSeriesRequested []bool
resp []string
@ -727,6 +727,11 @@ type mockGenericQuerier struct {
err error
}
type labelNameRequest struct {
name string
matchers []*labels.Matcher
}
func (m *mockGenericQuerier) Select(b bool, _ *SelectHints, _ ...*labels.Matcher) genericSeriesSet {
m.mtx.Lock()
m.sortedSeriesRequested = append(m.sortedSeriesRequested, b)
@ -734,9 +739,12 @@ func (m *mockGenericQuerier) Select(b bool, _ *SelectHints, _ ...*labels.Matcher
return &mockGenericSeriesSet{resp: m.resp, warnings: m.warnings, err: m.err}
}
func (m *mockGenericQuerier) LabelValues(name string) ([]string, Warnings, error) {
func (m *mockGenericQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
m.mtx.Lock()
m.labelNamesRequested = append(m.labelNamesRequested, name)
m.labelNamesRequested = append(m.labelNamesRequested, labelNameRequest{
name: name,
matchers: matchers,
})
m.mtx.Unlock()
return m.resp, m.warnings, m.err
}
@ -808,8 +816,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
expectedSelectsSeries []labels.Labels
expectedLabels []string
expectedWarnings [3]Warnings
expectedErrs [3]error
expectedWarnings [4]Warnings
expectedErrs [4]error
}{
{},
{
@ -837,7 +845,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
{
name: "one failed primary querier",
queriers: []genericQuerier{&mockGenericQuerier{warnings: nil, err: errStorage}},
expectedErrs: [3]error{errStorage, errStorage, errStorage},
expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with successful secondaries",
@ -873,7 +881,7 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"b"}, warnings: nil, err: nil}},
&secondaryQuerier{genericQuerier: &mockGenericQuerier{resp: []string{"c"}, warnings: nil, err: nil}},
},
expectedErrs: [3]error{errStorage, errStorage, errStorage},
expectedErrs: [4]error{errStorage, errStorage, errStorage, errStorage},
},
{
name: "one successful primary querier with failed secondaries",
@ -886,7 +894,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
labels.FromStrings("test", "a"),
},
expectedLabels: []string{"a"},
expectedWarnings: [3]Warnings{
expectedWarnings: [4]Warnings{
[]error{errStorage, errStorage},
[]error{errStorage, errStorage},
[]error{errStorage, errStorage},
[]error{errStorage, errStorage},
@ -903,7 +912,8 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
labels.FromStrings("test", "b"),
},
expectedLabels: []string{"a", "b"},
expectedWarnings: [3]Warnings{
expectedWarnings: [4]Warnings{
[]error{warnStorage, warnStorage},
[]error{warnStorage, warnStorage},
[]error{warnStorage, warnStorage},
[]error{warnStorage, warnStorage},
@ -964,7 +974,26 @@ func TestMergeGenericQuerierWithSecondaries_ErrorHandling(t *testing.T) {
for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)
require.Equal(t, []string{"test"}, m.labelNamesRequested)
require.Equal(t, []labelNameRequest{{name: "test"}}, m.labelNamesRequested)
}
})
t.Run("LabelValuesWithMatchers", func(t *testing.T) {
matcher := labels.MustNewMatcher(labels.MatchEqual, "otherLabel", "someValue")
res, w, err := q.LabelValues("test2", matcher)
require.Equal(t, tcase.expectedWarnings[3], w)
require.True(t, errors.Is(err, tcase.expectedErrs[3]), "expected error doesn't match")
require.Equal(t, tcase.expectedLabels, res)
if err != nil {
return
}
for _, qr := range q.queriers {
m := unwrapMockGenericQuerier(t, qr)
require.Equal(t, []labelNameRequest{
{name: "test"},
{name: "test2", matchers: []*labels.Matcher{matcher}},
}, m.labelNamesRequested)
}
})
})

View file

@ -28,7 +28,7 @@ func (noopQuerier) Select(bool, *SelectHints, ...*labels.Matcher) SeriesSet {
return NoopSeriesSet()
}
func (noopQuerier) LabelValues(string) ([]string, Warnings, error) {
func (noopQuerier) LabelValues(string, ...*labels.Matcher) ([]string, Warnings, error) {
return nil, nil, nil
}
@ -51,7 +51,7 @@ func (noopChunkQuerier) Select(bool, *SelectHints, ...*labels.Matcher) ChunkSeri
return NoopChunkedSeriesSet()
}
func (noopChunkQuerier) LabelValues(string) ([]string, Warnings, error) {
func (noopChunkQuerier) LabelValues(string, ...*labels.Matcher) ([]string, Warnings, error) {
return nil, nil, nil
}

View file

@ -206,7 +206,7 @@ func (q querier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, lab
}
// LabelValues implements storage.Querier and is a noop.
func (q *querier) LabelValues(string) ([]string, storage.Warnings, error) {
func (q *querier) LabelValues(string, ...*labels.Matcher) ([]string, storage.Warnings, error) {
// TODO: Implement: https://github.com/prometheus/prometheus/issues/3351
return nil, nil, errors.New("not implemented")
}

View file

@ -47,8 +47,8 @@ func newSecondaryQuerierFromChunk(cq ChunkQuerier) genericQuerier {
return &secondaryQuerier{genericQuerier: newGenericQuerierFromChunk(cq)}
}
func (s *secondaryQuerier) LabelValues(name string) ([]string, Warnings, error) {
vals, w, err := s.genericQuerier.LabelValues(name)
func (s *secondaryQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, Warnings, error) {
vals, w, err := s.genericQuerier.LabelValues(name, matchers...)
if err != nil {
return nil, append([]error{err}, w...), nil
}

View file

@ -20,6 +20,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sort"
"sync"
"github.com/go-kit/kit/log"
@ -63,10 +64,10 @@ type IndexReader interface {
Symbols() index.StringIter
// SortedLabelValues returns sorted possible label values.
SortedLabelValues(name string) ([]string, error)
SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error)
// LabelValues returns possible label values which may not be sorted.
LabelValues(name string) ([]string, error)
LabelValues(name string, matchers ...*labels.Matcher) ([]string, error)
// Postings returns the postings list iterator for the label pairs.
// The Postings here contain the offsets to the series inside the index.
@ -86,6 +87,11 @@ type IndexReader interface {
// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames() ([]string, error)
// LabelValueFor returns label value for the given label name in the series referred to by ID.
// If the series couldn't be found or the series doesn't have the requested label a
// storage.ErrNotFound is returned as error.
LabelValueFor(id uint64, label string) (string, error)
// Close releases the underlying resources of the reader.
Close() error
}
@ -415,14 +421,29 @@ func (r blockIndexReader) Symbols() index.StringIter {
return r.ir.Symbols()
}
func (r blockIndexReader) SortedLabelValues(name string) ([]string, error) {
st, err := r.ir.SortedLabelValues(name)
func (r blockIndexReader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
var st []string
var err error
if len(matchers) == 0 {
st, err = r.ir.SortedLabelValues(name)
} else {
st, err = r.LabelValues(name, matchers...)
if err == nil {
sort.Strings(st)
}
}
return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
func (r blockIndexReader) LabelValues(name string) ([]string, error) {
st, err := r.ir.LabelValues(name)
return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
func (r blockIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
if len(matchers) == 0 {
st, err := r.ir.LabelValues(name)
return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
return labelValuesWithMatchers(r, name, matchers...)
}
func (r blockIndexReader) Postings(name string, values ...string) (index.Postings, error) {
@ -453,6 +474,11 @@ func (r blockIndexReader) Close() error {
return nil
}
// LabelValueFor returns label value for the given label name in the series referred to by ID.
func (r blockIndexReader) LabelValueFor(id uint64, label string) (string, error) {
return r.ir.LabelValueFor(id, label)
}
type blockTombstoneReader struct {
tombstones.Reader
b *Block

View file

@ -17,11 +17,13 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
"testing"
@ -212,6 +214,78 @@ func TestCorruptedChunk(t *testing.T) {
}
}
func TestLabelValuesWithMatchers(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_block_label_values_with_matchers")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(tmpdir))
}()
var seriesEntries []storage.Series
for i := 0; i < 100; i++ {
seriesEntries = append(seriesEntries, storage.NewListSeries(labels.Labels{
{Name: "unique", Value: fmt.Sprintf("value%d", i)},
{Name: "tens", Value: fmt.Sprintf("value%d", i/10)},
}, []tsdbutil.Sample{sample{100, 0}}))
}
blockDir := createBlock(t, tmpdir, seriesEntries)
files, err := sequenceFiles(chunkDir(blockDir))
require.NoError(t, err)
require.Greater(t, len(files), 0, "No chunk created.")
// Check open err.
block, err := OpenBlock(nil, blockDir, nil)
require.NoError(t, err)
defer func() { require.NoError(t, block.Close()) }()
indexReader, err := block.Index()
require.NoError(t, err)
defer func() { require.NoError(t, indexReader.Close()) }()
testCases := []struct {
name string
labelName string
matchers []*labels.Matcher
expectedValues []string
}{
{
name: "get tens based on unique id",
labelName: "tens",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")},
expectedValues: []string{"value3"},
}, {
name: "get unique ids based on a ten",
labelName: "unique",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")},
expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"},
}, {
name: "get tens by pattern matching on unique id",
labelName: "tens",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")},
expectedValues: []string{"value5", "value6", "value7"},
}, {
name: "get tens by matching for absence of unique label",
labelName: "tens",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")},
expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"},
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
actualValues, err := indexReader.SortedLabelValues(tt.labelName, tt.matchers...)
require.NoError(t, err)
require.Equal(t, tt.expectedValues, actualValues)
actualValues, err = indexReader.LabelValues(tt.labelName, tt.matchers...)
sort.Strings(actualValues)
require.NoError(t, err)
require.Equal(t, tt.expectedValues, actualValues)
})
}
}
// TestBlockSize ensures that the block size is calculated correctly.
func TestBlockSize(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_blockSize")
@ -301,6 +375,49 @@ func TestReadIndexFormatV1(t *testing.T) {
})
}
func BenchmarkLabelValuesWithMatchers(b *testing.B) {
tmpdir, err := ioutil.TempDir("", "bench_block_label_values_with_matchers")
require.NoError(b, err)
defer func() {
require.NoError(b, os.RemoveAll(tmpdir))
}()
var seriesEntries []storage.Series
metricCount := 1000000
for i := 0; i < metricCount; i++ {
seriesEntries = append(seriesEntries, storage.NewListSeries(labels.Labels{
{Name: "unique", Value: fmt.Sprintf("value%d", i)},
{Name: "tens", Value: fmt.Sprintf("value%d", i/(metricCount/10))},
{Name: "ninety", Value: fmt.Sprintf("value%d", i/(metricCount/10)/9)}, // "0" for the first 90%, then "1"
}, []tsdbutil.Sample{sample{100, 0}}))
}
blockDir := createBlock(b, tmpdir, seriesEntries)
files, err := sequenceFiles(chunkDir(blockDir))
require.NoError(b, err)
require.Greater(b, len(files), 0, "No chunk created.")
// Check open err.
block, err := OpenBlock(nil, blockDir, nil)
require.NoError(b, err)
defer func() { require.NoError(b, block.Close()) }()
indexReader, err := block.Index()
require.NoError(b, err)
defer func() { require.NoError(b, indexReader.Close()) }()
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "ninety", "value0")}
b.ResetTimer()
b.ReportAllocs()
for benchIdx := 0; benchIdx < b.N; benchIdx++ {
actualValues, err := indexReader.LabelValues("tens", matchers...)
require.NoError(b, err)
require.Equal(b, 9, len(actualValues))
}
}
// createBlock creates a block with given set of series and returns its dir.
func createBlock(tb testing.TB, dir string, series []storage.Series) string {
blockDir, err := CreateBlock(series, dir, 0, log.NewNopLogger())

View file

@ -1635,8 +1635,10 @@ func (h *headIndexReader) Symbols() index.StringIter {
// SortedLabelValues returns label values present in the head for the
// specific label name that are within the time range mint to maxt.
func (h *headIndexReader) SortedLabelValues(name string) ([]string, error) {
values, err := h.LabelValues(name)
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (h *headIndexReader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
values, err := h.LabelValues(name, matchers...)
if err == nil {
sort.Strings(values)
}
@ -1645,15 +1647,20 @@ func (h *headIndexReader) SortedLabelValues(name string) ([]string, error) {
// LabelValues returns label values present in the head for the
// specific label name that are within the time range mint to maxt.
func (h *headIndexReader) LabelValues(name string) ([]string, error) {
h.head.symMtx.RLock()
defer h.head.symMtx.RUnlock()
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (h *headIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
return []string{}, nil
}
values := h.head.postings.LabelValues(name)
return values, nil
if len(matchers) == 0 {
h.head.symMtx.RLock()
defer h.head.symMtx.RUnlock()
return h.head.postings.LabelValues(name), nil
}
return labelValuesWithMatchers(h, name, matchers...)
}
// LabelNames returns all the unique label names present in the head
@ -1746,6 +1753,21 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
return nil
}
// LabelValueFor returns label value for the given label name in the series referred to by ID.
func (h *headIndexReader) LabelValueFor(id uint64, label string) (string, error) {
memSeries := h.head.series.getByID(id)
if memSeries == nil {
return "", storage.ErrNotFound
}
value := memSeries.lset.Get(label)
if value == "" {
return "", storage.ErrNotFound
}
return value, nil
}
func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, error) {
// Just using `getOrSet` below would be semantically sufficient, but we'd create
// a new series on every sample inserted via Add(), which causes allocations

View file

@ -1884,6 +1884,67 @@ func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) {
}
}
func TestHeadLabelValuesWithMatchers(t *testing.T) {
head, _ := newTestHead(t, 1000, false)
defer func() {
require.NoError(t, head.Close())
}()
app := head.Appender(context.Background())
for i := 0; i < 100; i++ {
_, err := app.Add(labels.Labels{
{Name: "unique", Value: fmt.Sprintf("value%d", i)},
{Name: "tens", Value: fmt.Sprintf("value%d", i/10)},
}, 100, 0)
require.NoError(t, err)
}
require.NoError(t, app.Commit())
var testCases = []struct {
name string
labelName string
matchers []*labels.Matcher
expectedValues []string
}{
{
name: "get tens based on unique id",
labelName: "tens",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")},
expectedValues: []string{"value3"},
}, {
name: "get unique ids based on a ten",
labelName: "unique",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")},
expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"},
}, {
name: "get tens by pattern matching on unique id",
labelName: "tens",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")},
expectedValues: []string{"value5", "value6", "value7"},
}, {
name: "get tens by matching for absence of unique label",
labelName: "tens",
matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")},
expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"},
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
headIdxReader := head.indexRange(0, 200)
actualValues, err := headIdxReader.SortedLabelValues(tt.labelName, tt.matchers...)
require.NoError(t, err)
require.Equal(t, tt.expectedValues, actualValues)
actualValues, err = headIdxReader.LabelValues(tt.labelName, tt.matchers...)
sort.Strings(actualValues)
require.NoError(t, err)
require.Equal(t, tt.expectedValues, actualValues)
})
}
}
func TestErrReuseAppender(t *testing.T) {
head, _ := newTestHead(t, 1000, false)
defer func() {
@ -1952,3 +2013,34 @@ func TestHeadMintAfterTruncation(t *testing.T) {
require.NoError(t, head.Close())
}
func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) {
chunkRange := int64(2000)
head, _ := newTestHead(b, chunkRange, false)
b.Cleanup(func() { require.NoError(b, head.Close()) })
app := head.Appender(context.Background())
metricCount := 1000000
for i := 0; i < metricCount; i++ {
_, err := app.Add(labels.Labels{
{Name: "unique", Value: fmt.Sprintf("value%d", i)},
{Name: "tens", Value: fmt.Sprintf("value%d", i/(metricCount/10))},
{Name: "ninety", Value: fmt.Sprintf("value%d", i/(metricCount/10)/9)}, // "0" for the first 90%, then "1"
}, 100, 0)
require.NoError(b, err)
}
require.NoError(b, app.Commit())
headIdxReader := head.indexRange(0, 200)
matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "ninety", "value0")}
b.ResetTimer()
b.ReportAllocs()
for benchIdx := 0; benchIdx < b.N; benchIdx++ {
actualValues, err := headIdxReader.LabelValues("tens", matchers...)
require.NoError(b, err)
require.Equal(b, 9, len(actualValues))
}
}

View file

@ -31,6 +31,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/encoding"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
@ -1443,8 +1444,8 @@ func (r *Reader) SymbolTableSize() uint64 {
// SortedLabelValues returns value tuples that exist for the given label name.
// It is not safe to use the return value beyond the lifetime of the byte slice
// passed into the Reader.
func (r *Reader) SortedLabelValues(name string) ([]string, error) {
values, err := r.LabelValues(name)
func (r *Reader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
values, err := r.LabelValues(name, matchers...)
if err == nil && r.version == FormatV1 {
sort.Strings(values)
}
@ -1454,7 +1455,12 @@ func (r *Reader) SortedLabelValues(name string) ([]string, error) {
// LabelValues returns value tuples that exist for the given label name.
// It is not safe to use the return value beyond the lifetime of the byte slice
// passed into the Reader.
func (r *Reader) LabelValues(name string) ([]string, error) {
// TODO(replay): Support filtering by matchers
func (r *Reader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
if len(matchers) > 0 {
return nil, errors.Errorf("matchers parameter is not implemented: %+v", matchers)
}
if r.version == FormatV1 {
e, ok := r.postingsV1[name]
if !ok {
@ -1505,6 +1511,32 @@ func (r *Reader) LabelValues(name string) ([]string, error) {
return values, nil
}
// LabelValueFor returns label value for the given label name in the series referred to by ID.
func (r *Reader) LabelValueFor(id uint64, label string) (string, error) {
offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
if r.version == FormatV2 {
offset = id * 16
}
d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable)
buf := d.Get()
if d.Err() != nil {
return "", errors.Wrap(d.Err(), "label values for")
}
value, err := r.dec.LabelValueFor(buf, label)
if err != nil {
return "", storage.ErrNotFound
}
if value == "" {
return "", storage.ErrNotFound
}
return value, nil
}
// Series reads the series with the given ID and writes its labels and chunks into lbls and chks.
func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
offset := id
@ -1683,6 +1715,37 @@ func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
return n, newBigEndianPostings(l), d.Err()
}
// LabelValueFor decodes a label for a given series.
func (dec *Decoder) LabelValueFor(b []byte, label string) (string, error) {
d := encoding.Decbuf{B: b}
k := d.Uvarint()
for i := 0; i < k; i++ {
lno := uint32(d.Uvarint())
lvo := uint32(d.Uvarint())
if d.Err() != nil {
return "", errors.Wrap(d.Err(), "read series label offsets")
}
ln, err := dec.LookupSymbol(lno)
if err != nil {
return "", errors.Wrap(err, "lookup label name")
}
if ln == label {
lv, err := dec.LookupSymbol(lvo)
if err != nil {
return "", errors.Wrap(err, "lookup label value")
}
return lv, nil
}
}
return "", d.Err()
}
// Series decodes a series entry from the given byte slice into lset and chks.
func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
*lbls = (*lbls)[:0]

View file

@ -83,8 +83,8 @@ func newBlockBaseQuerier(b BlockReader, mint, maxt int64) (*blockBaseQuerier, er
}, nil
}
func (q *blockBaseQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
res, err := q.index.SortedLabelValues(name)
func (q *blockBaseQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
res, err := q.index.SortedLabelValues(name, matchers...)
return res, nil, err
}
@ -369,6 +369,44 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting
return ix.Postings(m.Name, res...)
}
func labelValuesWithMatchers(r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
// We're only interested in metrics which have the label <name>.
requireLabel, err := labels.NewMatcher(labels.MatchNotEqual, name, "")
if err != nil {
return nil, errors.Wrapf(err, "Failed to instantiate label matcher")
}
var p index.Postings
p, err = PostingsForMatchers(r, append(matchers, requireLabel)...)
if err != nil {
return nil, err
}
dedupe := map[string]interface{}{}
for p.Next() {
v, err := r.LabelValueFor(p.At(), name)
if err != nil {
if err == storage.ErrNotFound {
continue
}
return nil, err
}
dedupe[v] = nil
}
if err = p.Err(); err != nil {
return nil, err
}
values := make([]string, 0, len(dedupe))
for value := range dedupe {
values = append(values, value)
}
return values, nil
}
// blockBaseSeriesSet allows to iterate over all series in the single block.
// Iterated series are trimmed with given min and max time as well as tombstones.
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.

View file

@ -1147,22 +1147,39 @@ func (m mockIndex) Close() error {
return nil
}
func (m mockIndex) SortedLabelValues(name string) ([]string, error) {
values, _ := m.LabelValues(name)
func (m mockIndex) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
values, _ := m.LabelValues(name, matchers...)
sort.Strings(values)
return values, nil
}
func (m mockIndex) LabelValues(name string) ([]string, error) {
func (m mockIndex) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
values := []string{}
for l := range m.postings {
if l.Name == name {
values = append(values, l.Value)
if len(matchers) == 0 {
for l := range m.postings {
if l.Name == name {
values = append(values, l.Value)
}
}
return values, nil
}
for _, series := range m.series {
for _, matcher := range matchers {
if matcher.Matches(series.l.Get(matcher.Name)) {
values = append(values, series.l.Get(name))
}
}
}
return values, nil
}
func (m mockIndex) LabelValueFor(id uint64, label string) (string, error) {
return m.series[id].l.Get(label), nil
}
func (m mockIndex) Postings(name string, values ...string) (index.Postings, error) {
res := make([]index.Postings, 0, len(values))
for _, value := range values {
@ -1970,15 +1987,19 @@ func (m mockMatcherIndex) Symbols() index.StringIter { return nil }
func (m mockMatcherIndex) Close() error { return nil }
// SortedLabelValues will return error if it is called.
func (m mockMatcherIndex) SortedLabelValues(name string) ([]string, error) {
func (m mockMatcherIndex) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
return []string{}, errors.New("sorted label values called")
}
// LabelValues will return error if it is called.
func (m mockMatcherIndex) LabelValues(name string) ([]string, error) {
func (m mockMatcherIndex) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
return []string{}, errors.New("label values called")
}
func (m mockMatcherIndex) LabelValueFor(id uint64, label string) (string, error) {
return "", errors.New("label value for called")
}
func (m mockMatcherIndex) Postings(name string, values ...string) (index.Postings, error) {
return index.EmptyPostings(), nil
}

View file

@ -608,47 +608,35 @@ func (api *API) labelValues(r *http.Request) (result apiFuncResult) {
warnings storage.Warnings
)
if len(matcherSets) > 0 {
hints := &storage.SelectHints{
Start: timestamp.FromTime(start),
End: timestamp.FromTime(end),
Func: "series", // There is no series function, this token is used for lookups that don't need samples.
}
var callWarnings storage.Warnings
labelValuesSet := make(map[string]struct{})
// Get all series which match matchers.
for _, mset := range matcherSets {
s := q.Select(false, hints, mset...)
for s.Next() {
series := s.At()
labelValue := series.Labels().Get(name)
// Filter out empty value.
if labelValue == "" {
continue
}
labelValuesSet[labelValue] = struct{}{}
for _, matchers := range matcherSets {
vals, callWarnings, err = q.LabelValues(name, matchers...)
if err != nil {
return apiFuncResult{nil, &apiError{errorExec, err}, warnings, closer}
}
warnings = append(warnings, s.Warnings()...)
if err := s.Err(); err != nil {
return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil}
warnings = append(warnings, callWarnings...)
for _, val := range vals {
labelValuesSet[val] = struct{}{}
}
}
// Convert the map to an array.
vals = make([]string, 0, len(labelValuesSet))
for key := range labelValuesSet {
vals = append(vals, key)
for val := range labelValuesSet {
vals = append(vals, val)
}
sort.Strings(vals)
} else {
vals, warnings, err = q.LabelValues(name)
if err != nil {
return apiFuncResult{nil, &apiError{errorExec, err}, warnings, closer}
}
if vals == nil {
vals = []string{}
}
}
if vals == nil {
vals = []string{}
}
sort.Strings(vals)
return apiFuncResult{vals, nil, warnings, closer}
}

View file

@ -1699,6 +1699,20 @@ func testEndpoints(t *testing.T, api *API, tr *testTargetRetriever, testLabelAPI
"boo",
},
},
// Try to overlap the selected series set as much as possible to test that the value de-duplication works.
{
endpoint: api.labelValues,
params: map[string]string{
"name": "foo",
},
query: url.Values{
"match[]": []string{`test_metric4{dup=~"^1"}`, `test_metric4{foo=~".+o$"}`},
},
response: []string{
"bar",
"boo",
},
},
// Label names.
{
endpoint: api.labelNames,