mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
WIP: per-block querier, mixedUTF8BlockQuerier and escapedUTF8BlockQuerier
This commit is contained in:
parent
3a7f044cfc
commit
eb5bb97bed
|
@ -2091,7 +2091,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
|
|||
blockQueriers = append(blockQueriers, q)
|
||||
}
|
||||
|
||||
return NewUTF8MixedQuerier(storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), db.opts.UTF8MigrationEscapingScheme), nil
|
||||
return storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), nil
|
||||
}
|
||||
|
||||
// blockChunkQuerierForRange returns individual block chunk queriers from the persistent blocks, in-order head block, and the
|
||||
|
|
308
tsdb/querier.go
308
tsdb/querier.go
|
@ -19,7 +19,6 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/oklog/ulid"
|
||||
|
||||
|
@ -103,118 +102,6 @@ func (q *blockBaseQuerier) Close() error {
|
|||
return errs.Err()
|
||||
}
|
||||
|
||||
type utf8MixedSeries struct {
|
||||
s storage.Series
|
||||
mappings map[string]string
|
||||
}
|
||||
|
||||
func (u utf8MixedSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
|
||||
return u.s.Iterator(it)
|
||||
}
|
||||
|
||||
func (u utf8MixedSeries) Labels() labels.Labels {
|
||||
lbls := labels.ScratchBuilder{}
|
||||
u.s.Labels().Range(func(l labels.Label) {
|
||||
var n, v string
|
||||
if l.Name == "__name__" {
|
||||
n = l.Name
|
||||
v = u.mappings[l.Value]
|
||||
if v == "" {
|
||||
v = l.Value
|
||||
}
|
||||
} else {
|
||||
n = strings.ReplaceAll(l.Name, "_", ".")
|
||||
v = l.Value
|
||||
}
|
||||
lbls.Add(n, v)
|
||||
})
|
||||
return lbls.Labels()
|
||||
}
|
||||
|
||||
type utf8MixedSeriesSet struct {
|
||||
ss storage.SeriesSet
|
||||
mappings map[string]string
|
||||
}
|
||||
|
||||
func (u *utf8MixedSeriesSet) At() storage.Series {
|
||||
return utf8MixedSeries{s: u.ss.At(), mappings: u.mappings}
|
||||
}
|
||||
|
||||
func (u *utf8MixedSeriesSet) Err() error {
|
||||
return u.ss.Err()
|
||||
}
|
||||
|
||||
func (u *utf8MixedSeriesSet) Next() bool {
|
||||
return u.ss.Next()
|
||||
}
|
||||
|
||||
func (u *utf8MixedSeriesSet) Warnings() annotations.Annotations {
|
||||
return u.ss.Warnings()
|
||||
}
|
||||
|
||||
func NewUTF8MixedSeriesSet(ss storage.SeriesSet, mappings map[string]string) storage.SeriesSet {
|
||||
return &utf8MixedSeriesSet{ss: ss, mappings: mappings}
|
||||
}
|
||||
|
||||
type utf8MixedQuerier struct {
|
||||
q storage.Querier
|
||||
es model.EscapingScheme
|
||||
}
|
||||
|
||||
func (u *utf8MixedQuerier) Close() error {
|
||||
return u.q.Close()
|
||||
}
|
||||
|
||||
func (u *utf8MixedQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
return u.q.LabelNames(ctx, hints, matchers...)
|
||||
}
|
||||
|
||||
func (u *utf8MixedQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
return u.q.LabelValues(ctx, name, hints, matchers...)
|
||||
}
|
||||
|
||||
func (u *utf8MixedQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
ms2 := make([]*labels.Matcher, 0, len(matchers))
|
||||
change := false
|
||||
mappings := map[string]string{}
|
||||
for i, m := range matchers {
|
||||
ms2 = append(ms2, &labels.Matcher{})
|
||||
if m.Type == labels.MatchEqual {
|
||||
ms2[i].Name = model.EscapeName(m.Name, u.es)
|
||||
if ms2[i].Name == model.MetricNameLabel {
|
||||
ms2[i].Value = model.EscapeName(m.Value, u.es)
|
||||
} else {
|
||||
ms2[i].Value = m.Value
|
||||
}
|
||||
if m.Name != ms2[i].Name {
|
||||
change = true
|
||||
mappings[ms2[i].Name] = m.Name
|
||||
}
|
||||
if m.Value != ms2[i].Value {
|
||||
change = true
|
||||
mappings[ms2[i].Value] = m.Value
|
||||
}
|
||||
} else {
|
||||
ms2[i] = m
|
||||
}
|
||||
ms2[i].Type = m.Type
|
||||
}
|
||||
sets := []storage.SeriesSet{
|
||||
// We need to sort for merge to work.
|
||||
// TODO: maybe only pass true if we indeed have to merge
|
||||
u.q.Select(ctx, true, hints, matchers...),
|
||||
}
|
||||
if change {
|
||||
// TODO: maybe utf8MixedSeriesSet should always sort afterwards, so there's no need to sort the underlying query?
|
||||
sets = append(sets, NewUTF8MixedSeriesSet(u.q.Select(ctx, true, hints, ms2...), mappings))
|
||||
}
|
||||
return storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
|
||||
}
|
||||
|
||||
func NewUTF8MixedQuerier(q storage.Querier, es model.EscapingScheme) storage.Querier {
|
||||
return &utf8MixedQuerier{q: q, es: es}
|
||||
}
|
||||
|
||||
type blockQuerier struct {
|
||||
*blockBaseQuerier
|
||||
}
|
||||
|
@ -1369,3 +1256,198 @@ func (cr nopChunkReader) ChunkOrIterable(chunks.Meta) (chunkenc.Chunk, chunkenc.
|
|||
}
|
||||
|
||||
func (cr nopChunkReader) Close() error { return nil }
|
||||
|
||||
type mixedUTF8BlockQuerier struct {
|
||||
*blockQuerier
|
||||
es model.EscapingScheme
|
||||
}
|
||||
|
||||
func (q *mixedUTF8BlockQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (q *mixedUTF8BlockQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
newMatchers, _, _, ok := escapeUTF8NameMatcher(matchers, q.es)
|
||||
if ok {
|
||||
matchers = newMatchers
|
||||
}
|
||||
return q.blockQuerier.LabelNames(ctx, hints, matchers...)
|
||||
|
||||
}
|
||||
|
||||
func (q *mixedUTF8BlockQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
newMatchers, escaped, original, ok := escapeUTF8NameMatcher(matchers, q.es)
|
||||
|
||||
if !ok {
|
||||
return q.blockQuerier.Select(ctx, sortSeries, hints, matchers...)
|
||||
}
|
||||
|
||||
// We need to sort for merge to work.
|
||||
return storage.NewMergeSeriesSet([]storage.SeriesSet{
|
||||
q.blockQuerier.Select(ctx, true, hints, matchers...),
|
||||
&metricRenameSeriesSet{SeriesSet: q.blockQuerier.Select(ctx, true, hints, newMatchers...), from: escaped, to: original},
|
||||
}, storage.ChainedSeriesMerge)
|
||||
}
|
||||
|
||||
type mixedUTF8BlockChunkQuerier struct {
|
||||
*blockChunkQuerier
|
||||
es model.EscapingScheme
|
||||
}
|
||||
|
||||
func (q *mixedUTF8BlockChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (q *mixedUTF8BlockChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
newMatchers, _, _, ok := escapeUTF8NameMatcher(matchers, q.es)
|
||||
if ok {
|
||||
matchers = newMatchers
|
||||
}
|
||||
return q.blockChunkQuerier.LabelNames(ctx, hints, matchers...)
|
||||
}
|
||||
|
||||
func (q *mixedUTF8BlockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||
newMatchers, escaped, original, ok := escapeUTF8NameMatcher(matchers, q.es)
|
||||
|
||||
if !ok {
|
||||
return q.blockChunkQuerier.Select(ctx, sortSeries, hints, matchers...)
|
||||
}
|
||||
|
||||
// We need to sort for merge to work.
|
||||
return storage.NewMergeChunkSeriesSet([]storage.ChunkSeriesSet{
|
||||
q.blockChunkQuerier.Select(ctx, true, hints, matchers...),
|
||||
&metricRenameChunkSeriesSet{ChunkSeriesSet: q.blockChunkQuerier.Select(ctx, true, hints, newMatchers...), from: escaped, to: original},
|
||||
}, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge))
|
||||
}
|
||||
|
||||
type escapedUTF8BlockQuerier struct {
|
||||
*blockQuerier
|
||||
es model.EscapingScheme
|
||||
}
|
||||
|
||||
func (q *escapedUTF8BlockQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (q *escapedUTF8BlockQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (q *escapedUTF8BlockQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
newMatchers, escaped, original, ok := escapeUTF8NameMatcher(matchers, q.es)
|
||||
if !ok {
|
||||
return q.blockQuerier.Select(ctx, sortSeries, hints, matchers...)
|
||||
}
|
||||
return &metricRenameSeriesSet{SeriesSet: q.blockQuerier.Select(ctx, sortSeries, hints, newMatchers...), from: escaped, to: original}
|
||||
}
|
||||
|
||||
func escapeUTF8NameMatcher(matchers []*labels.Matcher, es model.EscapingScheme) (newMatchers []*labels.Matcher, escaped, original string, ok bool) {
|
||||
// TODO: avoid allocation if there is nothing to escape?
|
||||
newMatchers = make([]*labels.Matcher, len(matchers))
|
||||
|
||||
for i, m := range matchers {
|
||||
m2 := *m
|
||||
if m.Type == labels.MatchEqual && m.Name == model.MetricNameLabel && !model.IsValidLegacyMetricName(m.Value) {
|
||||
// TODO: what if we get multiple and different __name__ matchers?
|
||||
// Leaning towards ignoring everything and querying the underlying querier as is. Results will and should be empty.
|
||||
original = m.Value
|
||||
m2.Value = model.EscapeName(m.Value, es)
|
||||
escaped = m2.Value
|
||||
ok = true
|
||||
}
|
||||
newMatchers[i] = &m2
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type escapedUTF8BlockChunkQuerier struct {
|
||||
*blockChunkQuerier
|
||||
es model.EscapingScheme
|
||||
}
|
||||
|
||||
func (q *escapedUTF8BlockChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (q *escapedUTF8BlockChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||
newMatchers, _, _, ok := escapeUTF8NameMatcher(matchers, q.es)
|
||||
if ok {
|
||||
matchers = newMatchers
|
||||
}
|
||||
return q.blockChunkQuerier.LabelNames(ctx, hints, matchers...)
|
||||
}
|
||||
|
||||
func (q *escapedUTF8BlockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
|
||||
newMatchers, escaped, original, ok := escapeUTF8NameMatcher(matchers, q.es)
|
||||
if !ok {
|
||||
return q.blockChunkQuerier.Select(ctx, sortSeries, hints, matchers...)
|
||||
}
|
||||
return &metricRenameChunkSeriesSet{ChunkSeriesSet: q.blockChunkQuerier.Select(ctx, sortSeries, hints,
|
||||
newMatchers...), from: escaped, to: original}
|
||||
}
|
||||
|
||||
type metricRenameSeriesSet struct {
|
||||
storage.SeriesSet
|
||||
from, to string
|
||||
}
|
||||
|
||||
func (u *metricRenameSeriesSet) At() storage.Series {
|
||||
lbls := labels.NewScratchBuilder(u.SeriesSet.At().Labels().Len())
|
||||
u.SeriesSet.At().Labels().Range(func(l labels.Label) {
|
||||
// TODO: what if we don't find the label we need to map? That would be
|
||||
// an important bug, because that would break our assumptions that keep
|
||||
// the series sorted. Panic? Return Next=false and Err= not nil?
|
||||
if l.Name == model.MetricNameLabel && l.Value == u.from {
|
||||
lbls.Add(l.Name, u.to)
|
||||
} else {
|
||||
lbls.Add(l.Name, l.Value)
|
||||
}
|
||||
})
|
||||
|
||||
return &storage.SeriesEntry{
|
||||
Lset: lbls.Labels(),
|
||||
SampleIteratorFn: func(it chunkenc.Iterator) chunkenc.Iterator {
|
||||
return u.SeriesSet.At().Iterator(it)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (u *metricRenameSeriesSet) Warnings() annotations.Annotations {
|
||||
// Warnings are for the whole set, so no sorting needed. However:
|
||||
// TODO: can a warning be referencing a metric name? Would that be a problem? I think not, but would be confusing.
|
||||
// TODO: should we add a warning about the renaming?
|
||||
return u.SeriesSet.Warnings()
|
||||
}
|
||||
|
||||
type metricRenameChunkSeriesSet struct {
|
||||
storage.ChunkSeriesSet
|
||||
from, to string
|
||||
}
|
||||
|
||||
func (u *metricRenameChunkSeriesSet) At() storage.ChunkSeries {
|
||||
lbls := labels.NewScratchBuilder(u.ChunkSeriesSet.At().Labels().Len())
|
||||
u.ChunkSeriesSet.At().Labels().Range(func(l labels.Label) {
|
||||
// TODO: what if we don't find the label we need to map? That would be
|
||||
// an important bug, because that would break our assumptions that keep
|
||||
// the series sorted. Panic? Return Next=false and Err= not nil?
|
||||
if l.Name == model.MetricNameLabel && l.Value == u.from {
|
||||
lbls.Add(l.Name, u.to)
|
||||
} else {
|
||||
lbls.Add(l.Name, l.Value)
|
||||
}
|
||||
})
|
||||
|
||||
return &storage.ChunkSeriesEntry{
|
||||
Lset: lbls.Labels(),
|
||||
ChunkIteratorFn: func(it chunks.Iterator) chunks.Iterator {
|
||||
return u.ChunkSeriesSet.At().Iterator(it)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (u *metricRenameChunkSeriesSet) Warnings() annotations.Annotations {
|
||||
// Warnings are for the whole set, so no sorting needed. However:
|
||||
// TODO: can a warning be referencing a metric name? Would that be a problem? I think not, but would be confusing.
|
||||
// TODO: should we add a warning about the renaming?
|
||||
return u.ChunkSeriesSet.Warnings()
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@ import (
|
|||
"github.com/oklog/ulid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
|
@ -185,6 +187,7 @@ func createIdxChkReaders(t *testing.T, tc []seriesSamples) (IndexReader, ChunkRe
|
|||
return mi, chkReader, blockMint, blockMaxt
|
||||
}
|
||||
|
||||
// TODO: rename to querierTestCase
|
||||
type blockQuerierTestCase struct {
|
||||
mint, maxt int64
|
||||
ms []*labels.Matcher
|
||||
|
@ -194,8 +197,8 @@ type blockQuerierTestCase struct {
|
|||
}
|
||||
|
||||
func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr ChunkReader, stones *tombstones.MemTombstones) {
|
||||
t.Run("sample", func(t *testing.T) {
|
||||
q := blockQuerier{
|
||||
testQueriers(t, c,
|
||||
&blockQuerier{
|
||||
blockBaseQuerier: &blockBaseQuerier{
|
||||
index: ir,
|
||||
chunks: cr,
|
||||
|
@ -204,8 +207,21 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
|
|||
mint: c.mint,
|
||||
maxt: c.maxt,
|
||||
},
|
||||
}
|
||||
},
|
||||
&blockChunkQuerier{
|
||||
blockBaseQuerier: &blockBaseQuerier{
|
||||
index: ir,
|
||||
chunks: cr,
|
||||
tombstones: stones,
|
||||
|
||||
mint: c.mint,
|
||||
maxt: c.maxt,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func testQueriers(t *testing.T, c blockQuerierTestCase, q storage.Querier, cq storage.ChunkQuerier) {
|
||||
t.Run("sample", func(t *testing.T) {
|
||||
res := q.Select(context.Background(), false, c.hints, c.ms...)
|
||||
defer func() { require.NoError(t, q.Close()) }()
|
||||
|
||||
|
@ -229,20 +245,9 @@ func testBlockQuerier(t *testing.T, c blockQuerierTestCase, ir IndexReader, cr C
|
|||
}
|
||||
require.NoError(t, res.Err())
|
||||
})
|
||||
|
||||
t.Run("chunk", func(t *testing.T) {
|
||||
q := blockChunkQuerier{
|
||||
blockBaseQuerier: &blockBaseQuerier{
|
||||
index: ir,
|
||||
chunks: cr,
|
||||
tombstones: stones,
|
||||
|
||||
mint: c.mint,
|
||||
maxt: c.maxt,
|
||||
},
|
||||
}
|
||||
res := q.Select(context.Background(), false, c.hints, c.ms...)
|
||||
defer func() { require.NoError(t, q.Close()) }()
|
||||
res := cq.Select(context.Background(), false, c.hints, c.ms...)
|
||||
defer func() { require.NoError(t, cq.Close()) }()
|
||||
|
||||
for {
|
||||
eok, rok := c.expChks.Next(), res.Next()
|
||||
|
@ -416,6 +421,192 @@ func TestBlockQuerier(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
var utf8Data = []seriesSamples{
|
||||
{
|
||||
lset: map[string]string{"a": "a", "__name__": "foo.bar"},
|
||||
chunks: [][]sample{
|
||||
{{1, 2, nil, nil}, {2, 3, nil, nil}, {3, 4, nil, nil}},
|
||||
{{5, 2, nil, nil}, {6, 3, nil, nil}, {7, 4, nil, nil}},
|
||||
},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"a": "b", "__name__": "foo.bar"},
|
||||
chunks: [][]sample{
|
||||
{{1, 1, nil, nil}, {2, 2, nil, nil}, {3, 3, nil, nil}},
|
||||
{{5, 3, nil, nil}, {6, 6, nil, nil}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var underscoreEscapedUTF8Data = []seriesSamples{
|
||||
{
|
||||
lset: map[string]string{"a": "c", "__name__": "foo_bar"},
|
||||
chunks: [][]sample{
|
||||
{{1, 3, nil, nil}, {2, 2, nil, nil}, {3, 6, nil, nil}},
|
||||
{{5, 1, nil, nil}, {6, 7, nil, nil}, {7, 2, nil, nil}},
|
||||
},
|
||||
},
|
||||
{
|
||||
lset: map[string]string{"__name__": "another_metric"},
|
||||
chunks: [][]sample{
|
||||
{{1, 41, nil, nil}, {2, 42, nil, nil}, {3, 43, nil, nil}},
|
||||
{{5, 45, nil, nil}, {6, 46, nil, nil}, {7, 47, nil, nil}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var mixedUTF8Data = append(utf8Data, underscoreEscapedUTF8Data...)
|
||||
|
||||
var s1 = storage.NewListSeries(labels.FromStrings("a", "a", "__name__", "foo.bar"),
|
||||
[]chunks.Sample{sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 4, nil, nil}, sample{5, 2, nil, nil}, sample{6, 3, nil, nil}, sample{7, 4, nil, nil}},
|
||||
)
|
||||
var c1 = storage.NewListChunkSeriesFromSamples(labels.FromStrings("a", "a", "__name__", "foo.bar"),
|
||||
[]chunks.Sample{sample{1, 2, nil, nil}, sample{2, 3, nil, nil}, sample{3, 4, nil, nil}}, []chunks.Sample{sample{5, 2, nil, nil}, sample{6, 3, nil, nil}, sample{7, 4, nil, nil}},
|
||||
)
|
||||
var s2 = storage.NewListSeries(labels.FromStrings("a", "b", "__name__", "foo.bar"),
|
||||
[]chunks.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}, sample{5, 3, nil, nil}, sample{6, 6, nil, nil}},
|
||||
)
|
||||
var c2 = storage.NewListChunkSeriesFromSamples(labels.FromStrings("a", "b", "__name__", "foo.bar"),
|
||||
[]chunks.Sample{sample{1, 1, nil, nil}, sample{2, 2, nil, nil}, sample{3, 3, nil, nil}}, []chunks.Sample{sample{5, 3, nil, nil}, sample{6, 6, nil, nil}},
|
||||
)
|
||||
|
||||
var s3 = storage.NewListSeries(labels.FromStrings("a", "c", "__name__", "foo_bar"),
|
||||
[]chunks.Sample{sample{1, 3, nil, nil}, sample{2, 2, nil, nil}, sample{3, 6, nil, nil}, sample{5, 1, nil, nil}, sample{6, 7, nil, nil}, sample{7, 2, nil, nil}},
|
||||
)
|
||||
var c3 = storage.NewListChunkSeriesFromSamples(labels.FromStrings("a", "c", "__name__", "foo_bar"),
|
||||
[]chunks.Sample{sample{1, 3, nil, nil}, sample{2, 2, nil, nil}, sample{3, 6, nil, nil}}, []chunks.Sample{sample{5, 1, nil, nil}, sample{6, 7, nil, nil}, sample{7, 2, nil, nil}},
|
||||
)
|
||||
var s3Unescaped = storage.NewListSeries(labels.FromStrings("a", "c", "__name__", "foo.bar"),
|
||||
[]chunks.Sample{sample{1, 3, nil, nil}, sample{2, 2, nil, nil}, sample{3, 6, nil, nil}, sample{5, 1, nil, nil}, sample{6, 7, nil, nil}, sample{7, 2, nil, nil}},
|
||||
)
|
||||
var c3Unescaped = storage.NewListChunkSeriesFromSamples(labels.FromStrings("a", "c", "__name__", "foo.bar"),
|
||||
[]chunks.Sample{sample{1, 3, nil, nil}, sample{2, 2, nil, nil}, sample{3, 6, nil, nil}}, []chunks.Sample{sample{5, 1, nil, nil}, sample{6, 7, nil, nil}, sample{7, 2, nil, nil}},
|
||||
)
|
||||
|
||||
func TestMixedUTF8BlockQuerier(t *testing.T) {
|
||||
// TODO(npazosmendez): test cases
|
||||
// * same label set is combines and samples are returned in order
|
||||
|
||||
for _, c := range []blockQuerierTestCase{
|
||||
{
|
||||
ms: []*labels.Matcher{},
|
||||
exp: newMockSeriesSet([]storage.Series{}),
|
||||
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{}),
|
||||
},
|
||||
{
|
||||
// No __name__= matcher, no-op
|
||||
mint: math.MinInt64,
|
||||
maxt: math.MaxInt64,
|
||||
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "a", ".+")},
|
||||
exp: newMockSeriesSet([]storage.Series{s1, s2, s3}),
|
||||
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{c1, c2, c3}),
|
||||
},
|
||||
{
|
||||
// __name__= matcher, explode query and relabel
|
||||
mint: math.MinInt64,
|
||||
maxt: math.MaxInt64,
|
||||
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "foo.bar")},
|
||||
exp: newMockSeriesSet([]storage.Series{s1, s2, s3Unescaped}),
|
||||
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{c1, c2, c3Unescaped}),
|
||||
},
|
||||
{
|
||||
// __name__= matcher plus other labels
|
||||
mint: math.MinInt64,
|
||||
maxt: math.MaxInt64,
|
||||
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "foo.bar"), labels.MustNewMatcher(labels.MatchNotEqual, "a", "b")},
|
||||
exp: newMockSeriesSet([]storage.Series{s1, s3Unescaped}),
|
||||
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{c1, c3Unescaped}),
|
||||
},
|
||||
{
|
||||
// No need to escape matcher, no-op
|
||||
mint: math.MinInt64,
|
||||
maxt: math.MaxInt64,
|
||||
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "foo_bar")},
|
||||
exp: newMockSeriesSet([]storage.Series{s3}),
|
||||
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{c3}),
|
||||
},
|
||||
} {
|
||||
ir, cr, _, _ := createIdxChkReaders(t, mixedUTF8Data)
|
||||
q := &blockQuerier{
|
||||
blockBaseQuerier: &blockBaseQuerier{
|
||||
index: ir,
|
||||
chunks: cr,
|
||||
tombstones: tombstones.NewMemTombstones(),
|
||||
|
||||
mint: c.mint,
|
||||
maxt: c.maxt,
|
||||
},
|
||||
}
|
||||
|
||||
mixedQ := &mixedUTF8BlockQuerier{
|
||||
blockQuerier: q,
|
||||
es: model.UnderscoreEscaping,
|
||||
}
|
||||
|
||||
mixedChunkQ := &mixedUTF8BlockChunkQuerier{
|
||||
blockChunkQuerier: &blockChunkQuerier{
|
||||
blockBaseQuerier: &blockBaseQuerier{
|
||||
index: ir,
|
||||
chunks: cr,
|
||||
tombstones: tombstones.NewMemTombstones(),
|
||||
mint: c.mint,
|
||||
maxt: c.maxt,
|
||||
},
|
||||
},
|
||||
es: model.UnderscoreEscaping,
|
||||
}
|
||||
testQueriers(t, c, mixedQ, mixedChunkQ)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEscapedUTF8BlockQuerier(t *testing.T) {
|
||||
for _, c := range []blockQuerierTestCase{
|
||||
{
|
||||
ms: []*labels.Matcher{},
|
||||
exp: newMockSeriesSet([]storage.Series{}),
|
||||
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{}),
|
||||
},
|
||||
{
|
||||
mint: math.MinInt64,
|
||||
maxt: math.MaxInt64,
|
||||
ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "a", ".+")},
|
||||
exp: newMockSeriesSet([]storage.Series{s3}),
|
||||
expChks: newMockChunkSeriesSet([]storage.ChunkSeries{c3}),
|
||||
},
|
||||
} {
|
||||
ir, cr, _, _ := createIdxChkReaders(t, underscoreEscapedUTF8Data)
|
||||
q := &blockQuerier{
|
||||
blockBaseQuerier: &blockBaseQuerier{
|
||||
index: ir,
|
||||
chunks: cr,
|
||||
tombstones: tombstones.NewMemTombstones(),
|
||||
|
||||
mint: c.mint,
|
||||
maxt: c.maxt,
|
||||
},
|
||||
}
|
||||
|
||||
escapedQ := &escapedUTF8BlockQuerier{
|
||||
blockQuerier: q,
|
||||
es: model.UnderscoreEscaping,
|
||||
}
|
||||
|
||||
escapedChunkQ := &escapedUTF8BlockChunkQuerier{
|
||||
blockChunkQuerier: &blockChunkQuerier{
|
||||
blockBaseQuerier: &blockBaseQuerier{
|
||||
index: ir,
|
||||
chunks: cr,
|
||||
tombstones: tombstones.NewMemTombstones(),
|
||||
mint: c.mint,
|
||||
maxt: c.maxt,
|
||||
},
|
||||
},
|
||||
es: model.UnderscoreEscaping,
|
||||
}
|
||||
testQueriers(t, c, escapedQ, escapedChunkQ)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) {
|
||||
for _, c := range []blockQuerierTestCase{
|
||||
{
|
||||
|
|
Loading…
Reference in a new issue