From 5965843bcd2060c226fb788415c53cbb15b0807a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=A1s=20Pazos?= Date: Wed, 11 Sep 2024 12:27:40 -0300 Subject: [PATCH] WIP partial PoC --- tsdb/db.go | 5 ++- tsdb/db_test.go | 43 +++++++++++++++++++++ tsdb/querier.go | 101 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 148 insertions(+), 1 deletion(-) diff --git a/tsdb/db.go b/tsdb/db.go index a5b3a5e602..f499e30ed8 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -33,6 +33,7 @@ import ( "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "go.uber.org/atomic" "golang.org/x/sync/errgroup" @@ -207,6 +208,8 @@ type Options struct { // BlockChunkQuerierFunc is a function to return storage.ChunkQuerier from a BlockReader. BlockChunkQuerierFunc BlockChunkQuerierFunc + + UTF8MigrationEscapingScheme model.EscapingScheme } type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) @@ -2088,7 +2091,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { blockQueriers = append(blockQueriers, q) } - return storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), nil + return NewUTF8MixedQuerier(storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), db.opts.UTF8MigrationEscapingScheme), nil } // blockChunkQuerierForRange returns individual block chunk queriers from the persistent blocks, in-order head block, and the diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 4e3a077f6a..96ab3d2615 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -36,6 +36,7 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/goleak" @@ -7633,3 +7634,45 @@ func TestGenerateCompactionDelay(t *testing.T) { assertDelay(db.generateCompactionDelay()) } } + +func TestUTF8(t *testing.T) { + tsdbCfg := DefaultOptions() + tsdbCfg.UTF8MigrationEscapingScheme = model.UnderscoreEscaping + db, err := Open(t.TempDir(), nil, nil, tsdbCfg, nil) + + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + app := db.Appender(context.Background()) + + l := labels.FromStrings("__name__", "with.dots") + _, err = app.Append(0, l, 100, 1.0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, 100))) + + q, err := db.Querier(math.MinInt, math.MaxInt64) + require.NoError(t, err) + require.Equal(t, map[string][]chunks.Sample{ + l.String(): {sample{t: 100, f: 1.0}}, + }, query(t, q, labels.MustNewMatcher(labels.MatchEqual, "__name__", "with.dots"))) + + q, err = db.Querier(math.MinInt, math.MaxInt64) + require.NoError(t, err) + require.Equal(t, map[string][]chunks.Sample{}, query(t, q, labels.MustNewMatcher(labels.MatchEqual, "__name__", "with_dots"))) + + app = db.Appender(context.Background()) + l = labels.FromStrings("__name__", "with_dots") + _, err = app.Append(0, l, 200, 2.0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + q, err = db.Querier(math.MinInt, math.MaxInt64) + require.NoError(t, err) + require.Equal(t, map[string][]chunks.Sample{ + labels.FromStrings("__name__", "with.dots").String(): {sample{t: 100, f: 1.0}, sample{t: 200, f: 2.0}}, + }, query(t, q, labels.MustNewMatcher(labels.MatchEqual, "__name__", "with.dots"))) +} diff --git a/tsdb/querier.go b/tsdb/querier.go index 912c950329..510e32e8d0 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -19,9 +19,11 @@ import ( "fmt" "math" "slices" + "strings" "github.com/oklog/ulid" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -101,6 +103,104 @@ func (q *blockBaseQuerier) Close() error { return errs.Err() } +type utf8MixedSeries struct { + s storage.Series +} + +func (u utf8MixedSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + return u.s.Iterator(it) +} + +func (u utf8MixedSeries) Labels() labels.Labels { + lbls := labels.ScratchBuilder{} + u.s.Labels().Range(func(l labels.Label) { + var n, v string + if l.Name == "__name__" { + n = l.Name + v = strings.ReplaceAll(l.Value, "_", ".") + } else { + n = strings.ReplaceAll(l.Name, "_", ".") + v = l.Value + } + lbls.Add(n, v) + }) + return lbls.Labels() +} + +type utf8MixedSeriesSet struct { + ss storage.SeriesSet +} + +func (u *utf8MixedSeriesSet) At() storage.Series { + return utf8MixedSeries{s: u.ss.At()} +} + +func (u *utf8MixedSeriesSet) Err() error { + return u.ss.Err() +} + +func (u *utf8MixedSeriesSet) Next() bool { + return u.ss.Next() +} + +func (u *utf8MixedSeriesSet) Warnings() annotations.Annotations { + return u.ss.Warnings() +} + +func NewUTF8MixedSeriesSet(ss storage.SeriesSet) storage.SeriesSet { + return &utf8MixedSeriesSet{ss: ss} +} + +type utf8MixedQuerier struct { + q storage.Querier + es model.EscapingScheme +} + +func (u *utf8MixedQuerier) Close() error { + return u.q.Close() +} + +func (u *utf8MixedQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return u.q.LabelNames(ctx, hints, matchers...) +} + +func (u *utf8MixedQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return u.q.LabelValues(ctx, name, hints, matchers...) +} + +func (u *utf8MixedQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + ms2 := make([]*labels.Matcher, 0, len(matchers)) + change := false + for i, m := range matchers { + ms2 = append(ms2, &labels.Matcher{}) + if m.Type == labels.MatchEqual { + ms2[i].Name = strings.ReplaceAll(m.Name, ".", "_") + if ms2[i].Name == "__name__" { + ms2[i].Value = strings.ReplaceAll(m.Value, ".", "_") + } else { + ms2[i].Value = m.Value + } + if m.Name != ms2[i].Name || m.Value != ms2[i].Value { + change = true + } + } else { + ms2[i] = m + } + ms2[i].Type = m.Type + } + sets := []storage.SeriesSet{ + u.q.Select(ctx, sortSeries, hints, matchers...), + } + if change { + sets = append(sets, NewUTF8MixedSeriesSet(u.q.Select(ctx, sortSeries, hints, ms2...))) + } + return storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) +} + +func NewUTF8MixedQuerier(q storage.Querier, es model.EscapingScheme) storage.Querier { + return &utf8MixedQuerier{q: q, es: es} +} + type blockQuerier struct { *blockBaseQuerier } @@ -111,6 +211,7 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) { if err != nil { return nil, err } + return &blockQuerier{blockBaseQuerier: q}, nil }