mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
add LabelNames and LabelValues methods for mixed querier
This commit is contained in:
parent
7ecd729b37
commit
f2a58c2bb2
|
@ -28,6 +28,7 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
"github.com/prometheus/prometheus/util/strutil"
|
||||
)
|
||||
|
||||
type mergeGenericQuerier struct {
|
||||
|
@ -216,34 +217,7 @@ func (q *mergeGenericQuerier) lvals(ctx context.Context, lq labelGenericQueriers
|
|||
if err != nil {
|
||||
return nil, ws, err
|
||||
}
|
||||
return mergeStrings(s1, s2), ws, nil
|
||||
}
|
||||
|
||||
func mergeStrings(a, b []string) []string {
|
||||
maxl := len(a)
|
||||
if len(b) > len(a) {
|
||||
maxl = len(b)
|
||||
}
|
||||
res := make([]string, 0, maxl*10/9)
|
||||
|
||||
for len(a) > 0 && len(b) > 0 {
|
||||
switch {
|
||||
case a[0] == b[0]:
|
||||
res = append(res, a[0])
|
||||
a, b = a[1:], b[1:]
|
||||
case a[0] < b[0]:
|
||||
res = append(res, a[0])
|
||||
a = a[1:]
|
||||
default:
|
||||
res = append(res, b[0])
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// Append all remaining elements.
|
||||
res = append(res, a...)
|
||||
res = append(res, b...)
|
||||
return res
|
||||
return strutil.MergeStrings(s1, s2), ws, nil
|
||||
}
|
||||
|
||||
// LabelNames returns all the unique label names present in all queriers in sorted order.
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
"github.com/prometheus/prometheus/tsdb/index"
|
||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||
"github.com/prometheus/prometheus/util/annotations"
|
||||
"github.com/prometheus/prometheus/util/strutil"
|
||||
)
|
||||
|
||||
// checkContextEveryNIterations is used in some tight loops to check if the context is done.
|
||||
|
@ -1262,16 +1263,42 @@ type mixedUTF8BlockQuerier struct {
|
|||
}
|
||||
|
||||
func (q *mixedUTF8BlockQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
panic("not implemented")
|
||||
vals, an, err := q.blockQuerier.LabelValues(ctx, name, hints, matchers...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
newMatchers, escaped, original, ok := escapeUTF8NameMatcher(matchers, q.es)
|
||||
var vals2 []string
|
||||
if ok {
|
||||
vals2, _, err = q.blockQuerier.LabelValues(ctx, name, hints, newMatchers...)
|
||||
if err == nil && name == model.MetricNameLabel {
|
||||
for i := range vals2 {
|
||||
if vals2[i] == escaped {
|
||||
vals2[i] = original
|
||||
}
|
||||
}
|
||||
}
|
||||
vals = strutil.MergeStrings(vals, vals2)
|
||||
}
|
||||
if ix := slices.Index(vals, ""); ix != -1 && len(vals) > 1 {
|
||||
vals = append(vals[:ix], vals[ix+1:]...)
|
||||
}
|
||||
return vals, an, err
|
||||
}
|
||||
|
||||
func (q *mixedUTF8BlockQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
names, an, err := q.blockQuerier.LabelNames(ctx, hints, matchers...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
newMatchers, _, _, ok := escapeUTF8NameMatcher(matchers, q.es)
|
||||
if ok {
|
||||
matchers = newMatchers
|
||||
names2, _, err := q.blockQuerier.LabelNames(ctx, hints, newMatchers...)
|
||||
if err == nil {
|
||||
names = strutil.MergeStrings(names, names2)
|
||||
}
|
||||
}
|
||||
return q.blockQuerier.LabelNames(ctx, hints, matchers...)
|
||||
|
||||
return names, an, err
|
||||
}
|
||||
|
||||
func (q *mixedUTF8BlockQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
|
@ -1294,15 +1321,42 @@ type mixedUTF8BlockChunkQuerier struct {
|
|||
}
|
||||
|
||||
func (q *mixedUTF8BlockChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
panic("not implemented")
|
||||
vals, an, err := q.blockChunkQuerier.LabelValues(ctx, name, hints, matchers...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
newMatchers, escaped, original, ok := escapeUTF8NameMatcher(matchers, q.es)
|
||||
var vals2 []string
|
||||
if ok {
|
||||
vals2, _, err = q.blockChunkQuerier.LabelValues(ctx, name, hints, newMatchers...)
|
||||
if err == nil && name == model.MetricNameLabel {
|
||||
for i := range vals2 {
|
||||
if vals2[i] == escaped {
|
||||
vals2[i] = original
|
||||
}
|
||||
}
|
||||
}
|
||||
vals = strutil.MergeStrings(vals, vals2)
|
||||
}
|
||||
if ix := slices.Index(vals, ""); ix != -1 && len(vals) > 1 {
|
||||
vals = append(vals[:ix], vals[ix+1:]...)
|
||||
}
|
||||
return vals, an, err
|
||||
}
|
||||
|
||||
func (q *mixedUTF8BlockChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
names, an, err := q.blockChunkQuerier.LabelNames(ctx, hints, matchers...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
newMatchers, _, _, ok := escapeUTF8NameMatcher(matchers, q.es)
|
||||
if ok {
|
||||
matchers = newMatchers
|
||||
names2, _, err := q.blockChunkQuerier.LabelNames(ctx, hints, newMatchers...)
|
||||
if err == nil {
|
||||
names = strutil.MergeStrings(names, names2)
|
||||
}
|
||||
}
|
||||
return q.blockChunkQuerier.LabelNames(ctx, hints, matchers...)
|
||||
return names, an, err
|
||||
}
|
||||
|
||||
func (q *mixedUTF8BlockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"math"
|
||||
"math/rand"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
@ -187,8 +188,7 @@ func createIdxChkReaders(t *testing.T, tc []seriesSamples) (IndexReader, ChunkRe
|
|||
return mi, chkReader, blockMint, blockMaxt
|
||||
}
|
||||
|
||||
// TODO: rename to querierTestCase
|
||||
type blockQuerierTestCase struct {
|
||||
type querierSelectTestCase struct {
|
||||
mint, maxt int64
|
||||
ms []*labels.Matcher
|
||||
hints *storage.SelectHints
|
||||
|
@ -196,8 +196,8 @@ type blockQuerierTestCase struct {
|
|||
expChks storage.ChunkSeriesSet
|
||||
}
|
||||
|
||||
func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr ChunkReader, stones *tombstones.MemTombstones) {
|
||||
testQueriers(t, c,
|
||||
func testBlockQueriersSelect(t *testing.T, c querierSelectTestCase, ir IndexReader, cr ChunkReader, stones *tombstones.MemTombstones) {
|
||||
testQueriersSelect(t, c,
|
||||
&blockQuerier{
|
||||
blockBaseQuerier: &blockBaseQuerier{
|
||||
index: ir,
|
||||
|
@ -220,7 +220,7 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
|
|||
})
|
||||
}
|
||||
|
||||
func testQueriers(t *testing.T, c blockQuerierTestCase, q storage.Querier, cq storage.ChunkQuerier) {
|
||||
func testQueriersSelect(t *testing.T, c querierSelectTestCase, q storage.Querier, cq storage.ChunkQuerier) {
|
||||
t.Run("sample", func(t *testing.T) {
|
||||
res := q.Select(context.Background(), false, c.hints, c.ms...)
|
||||
defer func() { require.NoError(t, q.Close()) }()
|
||||
|
@ -286,7 +286,7 @@ func testQueriers(t *testing.T, c blockQuerierTestCase, q storage.Querier, cq st
|
|||
}
|
||||
|
||||
func TestBlockQuerier(t *testing.T) {
|
||||
for _, c := range []blockQuerierTestCase{
|
||||
for _, c := range []querierSelectTestCase{
|
||||
{
|
||||
mint: 0,
|
||||
maxt: 0,
|
||||
|
@ -416,7 +416,7 @@ func TestBlockQuerier(t *testing.T) {
|
|||
} {
|
||||
t.Run("", func(t *testing.T) {
|
||||
ir, cr, _, _ := createIdxChkReaders(t, testData)
|
||||
testBlockQuerier(t, c, ir, cr, tombstones.NewMemTombstones())
|
||||
testBlockQueriersSelect(t, c, ir, cr, tombstones.NewMemTombstones())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -436,6 +436,13 @@ var utf8Data = []seriesSamples{
|
|||
{{5, 3, nil, nil}, {6, 6, nil, nil}},
|
||||
},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"c": "d", "__name__": "baz.qux"},
|
||||
chunks: [][]sample{
|
||||
{{1, 1, nil, nil}, {2, 2, nil, nil}, {3, 3, nil, nil}},
|
||||
{{5, 3, nil, nil}, {6, 6, nil, nil}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var underscoreEscapedUTF8Data = []seriesSamples{
|
||||
|
@ -446,6 +453,13 @@ var underscoreEscapedUTF8Data = []seriesSamples{
|
|||
{{5, 1, nil, nil}, {6, 7, nil, nil}, {7, 2, nil, nil}},
|
||||
},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"e": "f", "__name__": "baz_qux"},
|
||||
chunks: [][]sample{
|
||||
{{1, 3, nil, nil}, {2, 2, nil, nil}, {3, 6, nil, nil}},
|
||||
{{5, 1, nil, nil}, {6, 7, nil, nil}, {7, 2, nil, nil}},
|
||||
},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"__name__": "another_metric"},
|
||||
chunks: [][]sample{
|
||||
|
@ -483,11 +497,11 @@ var c3Unescaped = storage.NewListChunkSeriesFromSamples(labels.FromStrings("a",
|
|||
[]chunks.Sample{sample{1, 3, nil, nil}, sample{2, 2, nil, nil}, sample{3, 6, nil, nil}}, []chunks.Sample{sample{5, 1, nil, nil}, sample{6, 7, nil, nil}, sample{7, 2, nil, nil}},
|
||||
)
|
||||
|
||||
func TestMixedUTF8BlockQuerier(t *testing.T) {
|
||||
func TestMixedUTF8BlockQuerier_Select(t *testing.T) {
|
||||
// TODO(npazosmendez): test cases
|
||||
// * same label set is combines and samples are returned in order
|
||||
|
||||
for _, c := range []blockQuerierTestCase{
|
||||
for _, c := range []querierSelectTestCase{
|
||||
{
|
||||
ms: []*labels.Matcher{},
|
||||
exp: newMockSeriesSet([]storage.Series{}),
|
||||
|
@ -555,12 +569,96 @@ func TestMixedUTF8BlockQuerier(t *testing.T) {
|
|||
},
|
||||
es: model.UnderscoreEscaping,
|
||||
}
|
||||
testQueriers(t, c, mixedQ, mixedChunkQ)
|
||||
testQueriersSelect(t, c, mixedQ, mixedChunkQ)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMixedUTF8BlockQuerier_Labels(t *testing.T) {
|
||||
for _, c := range []struct {
|
||||
mint, maxt int64
|
||||
ms []*labels.Matcher
|
||||
labelName string
|
||||
expLabelValues []string
|
||||
expLabelNames []string
|
||||
}{
|
||||
{
|
||||
mint: math.MinInt64,
|
||||
maxt: math.MaxInt64,
|
||||
ms: []*labels.Matcher{},
|
||||
labelName: "__name__",
|
||||
expLabelValues: []string{"another_metric", "baz.qux", "baz_qux", "foo.bar", "foo_bar"},
|
||||
expLabelNames: []string{"", "__name__", "a", "c", "e"},
|
||||
},
|
||||
{
|
||||
mint: math.MinInt64,
|
||||
maxt: math.MaxInt64,
|
||||
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "foo.bar")},
|
||||
labelName: "__name__",
|
||||
expLabelValues: []string{"foo.bar"},
|
||||
expLabelNames: []string{"__name__", "a"},
|
||||
},
|
||||
{
|
||||
mint: math.MinInt64,
|
||||
maxt: math.MaxInt64,
|
||||
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "baz.qux")},
|
||||
labelName: "e",
|
||||
expLabelValues: []string{"f"},
|
||||
expLabelNames: []string{"__name__", "c", "e"},
|
||||
},
|
||||
} {
|
||||
ir, cr, _, _ := createIdxChkReaders(t, mixedUTF8Data)
|
||||
q := &blockQuerier{
|
||||
blockBaseQuerier: &blockBaseQuerier{
|
||||
index: ir,
|
||||
chunks: cr,
|
||||
tombstones: tombstones.NewMemTombstones(),
|
||||
|
||||
mint: c.mint,
|
||||
maxt: c.maxt,
|
||||
},
|
||||
}
|
||||
|
||||
mixedQ := &mixedUTF8BlockQuerier{
|
||||
blockQuerier: q,
|
||||
es: model.UnderscoreEscaping,
|
||||
}
|
||||
|
||||
mixedChunkQ := &mixedUTF8BlockChunkQuerier{
|
||||
blockChunkQuerier: &blockChunkQuerier{
|
||||
blockBaseQuerier: &blockBaseQuerier{
|
||||
index: ir,
|
||||
chunks: cr,
|
||||
tombstones: tombstones.NewMemTombstones(),
|
||||
mint: c.mint,
|
||||
maxt: c.maxt,
|
||||
},
|
||||
},
|
||||
es: model.UnderscoreEscaping,
|
||||
}
|
||||
t.Run("LabelValues", func(t *testing.T) {
|
||||
lv, _, err := mixedQ.LabelValues(context.Background(), c.labelName, nil, c.ms...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.expLabelValues, lv)
|
||||
lv, _, err = mixedChunkQ.LabelValues(context.Background(), c.labelName, nil, c.ms...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.expLabelValues, lv)
|
||||
})
|
||||
|
||||
t.Run("LabelNames", func(t *testing.T) {
|
||||
ln, _, err := mixedQ.LabelNames(context.Background(), nil, c.ms...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.expLabelNames, ln)
|
||||
ln, _, err = mixedChunkQ.LabelNames(context.Background(), nil, c.ms...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.expLabelNames, ln)
|
||||
})
|
||||
require.NoError(t, mixedQ.Close())
|
||||
require.NoError(t, mixedChunkQ.Close())
|
||||
}
|
||||
}
|
||||
|
||||
func TestEscapedUTF8BlockQuerier(t *testing.T) {
|
||||
for _, c := range []blockQuerierTestCase{
|
||||
for _, c := range []querierSelectTestCase{
|
||||
{
|
||||
ms: []*labels.Matcher{},
|
||||
exp: newMockSeriesSet([]storage.Series{}),
|
||||
|
@ -603,12 +701,12 @@ func TestEscapedUTF8BlockQuerier(t *testing.T) {
|
|||
},
|
||||
es: model.UnderscoreEscaping,
|
||||
}
|
||||
testQueriers(t, c, escapedQ, escapedChunkQ)
|
||||
testQueriersSelect(t, c, escapedQ, escapedChunkQ)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) {
|
||||
for _, c := range []blockQuerierTestCase{
|
||||
for _, c := range []querierSelectTestCase{
|
||||
{
|
||||
mint: 0,
|
||||
maxt: 0,
|
||||
|
@ -713,14 +811,14 @@ func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer cr.Close()
|
||||
|
||||
testBlockQuerier(t, c, ir, cr, tombstones.NewMemTombstones())
|
||||
testBlockQueriersSelect(t, c, ir, cr, tombstones.NewMemTombstones())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockQuerier_TrimmingDoesNotModifyOriginalTombstoneIntervals(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
c := blockQuerierTestCase{
|
||||
c := querierSelectTestCase{
|
||||
mint: 2,
|
||||
maxt: 6,
|
||||
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "a", "a")},
|
||||
|
@ -750,7 +848,7 @@ func TestBlockQuerier_TrimmingDoesNotModifyOriginalTombstoneIntervals(t *testing
|
|||
for _, ref := range refs {
|
||||
stones.AddInterval(ref, tombstones.Interval{Mint: 1, Maxt: 2})
|
||||
}
|
||||
testBlockQuerier(t, c, ir, cr, stones)
|
||||
testBlockQueriersSelect(t, c, ir, cr, stones)
|
||||
for _, ref := range refs {
|
||||
intervals, err := stones.Get(ref)
|
||||
require.NoError(t, err)
|
||||
|
@ -790,7 +888,7 @@ func TestBlockQuerierDelete(t *testing.T) {
|
|||
{{Mint: 6, Maxt: 10}},
|
||||
})
|
||||
|
||||
for _, c := range []blockQuerierTestCase{
|
||||
for _, c := range []querierSelectTestCase{
|
||||
{
|
||||
mint: 0,
|
||||
maxt: 0,
|
||||
|
@ -870,7 +968,7 @@ func TestBlockQuerierDelete(t *testing.T) {
|
|||
} {
|
||||
t.Run("", func(t *testing.T) {
|
||||
ir, cr, _, _ := createIdxChkReaders(t, testData)
|
||||
testBlockQuerier(t, c, ir, cr, stones)
|
||||
testBlockQueriersSelect(t, c, ir, cr, stones)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -2468,12 +2566,16 @@ func (m mockIndex) LabelValues(_ context.Context, name string, matchers ...*labe
|
|||
}
|
||||
|
||||
for _, series := range m.series {
|
||||
ok := true
|
||||
for _, matcher := range matchers {
|
||||
if matcher.Matches(series.l.Get(matcher.Name)) {
|
||||
// TODO(colega): shouldn't we check all the matchers before adding this to the values?
|
||||
values = append(values, series.l.Get(name))
|
||||
if !matcher.Matches(series.l.Get(matcher.Name)) {
|
||||
ok = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if ok && !slices.Contains(values, series.l.Get(name)) {
|
||||
values = append(values, series.l.Get(name))
|
||||
}
|
||||
}
|
||||
|
||||
return values, nil
|
||||
|
@ -2573,7 +2675,7 @@ func (m mockIndex) LabelNames(_ context.Context, matchers ...*labels.Matcher) ([
|
|||
for _, series := range m.series {
|
||||
matches := true
|
||||
for _, matcher := range matchers {
|
||||
matches = matches || matcher.Matches(series.l.Get(matcher.Name))
|
||||
matches = matches && matcher.Matches(series.l.Get(matcher.Name))
|
||||
if !matches {
|
||||
break
|
||||
}
|
||||
|
|
28
util/strutil/collection.go
Normal file
28
util/strutil/collection.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package strutil
|
||||
|
||||
func MergeStrings(a, b []string) []string {
|
||||
maxl := len(a)
|
||||
if len(b) > len(a) {
|
||||
maxl = len(b)
|
||||
}
|
||||
res := make([]string, 0, maxl*10/9)
|
||||
|
||||
for len(a) > 0 && len(b) > 0 {
|
||||
switch {
|
||||
case a[0] == b[0]:
|
||||
res = append(res, a[0])
|
||||
a, b = a[1:], b[1:]
|
||||
case a[0] < b[0]:
|
||||
res = append(res, a[0])
|
||||
a = a[1:]
|
||||
default:
|
||||
res = append(res, b[0])
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// Append all remaining elements.
|
||||
res = append(res, a...)
|
||||
res = append(res, b...)
|
||||
return res
|
||||
}
|
Loading…
Reference in a new issue