mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
WIP partial PoC
This commit is contained in:
parent
8d3da31f87
commit
5965843bcd
|
@ -33,6 +33,7 @@ import (
|
||||||
"github.com/go-kit/log/level"
|
"github.com/go-kit/log/level"
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
|
@ -207,6 +208,8 @@ type Options struct {
|
||||||
|
|
||||||
// BlockChunkQuerierFunc is a function to return storage.ChunkQuerier from a BlockReader.
|
// BlockChunkQuerierFunc is a function to return storage.ChunkQuerier from a BlockReader.
|
||||||
BlockChunkQuerierFunc BlockChunkQuerierFunc
|
BlockChunkQuerierFunc BlockChunkQuerierFunc
|
||||||
|
|
||||||
|
UTF8MigrationEscapingScheme model.EscapingScheme
|
||||||
}
|
}
|
||||||
|
|
||||||
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
|
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
|
||||||
|
@ -2088,7 +2091,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
|
||||||
blockQueriers = append(blockQueriers, q)
|
blockQueriers = append(blockQueriers, q)
|
||||||
}
|
}
|
||||||
|
|
||||||
return storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), nil
|
return NewUTF8MixedQuerier(storage.NewMergeQuerier(blockQueriers, nil, storage.ChainedSeriesMerge), db.opts.UTF8MigrationEscapingScheme), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// blockChunkQuerierForRange returns individual block chunk queriers from the persistent blocks, in-order head block, and the
|
// blockChunkQuerierForRange returns individual block chunk queriers from the persistent blocks, in-order head block, and the
|
||||||
|
|
|
@ -36,6 +36,7 @@ import (
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/goleak"
|
"go.uber.org/goleak"
|
||||||
|
@ -7633,3 +7634,45 @@ func TestGenerateCompactionDelay(t *testing.T) {
|
||||||
assertDelay(db.generateCompactionDelay())
|
assertDelay(db.generateCompactionDelay())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUTF8(t *testing.T) {
|
||||||
|
tsdbCfg := DefaultOptions()
|
||||||
|
tsdbCfg.UTF8MigrationEscapingScheme = model.UnderscoreEscaping
|
||||||
|
db, err := Open(t.TempDir(), nil, nil, tsdbCfg, nil)
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
app := db.Appender(context.Background())
|
||||||
|
|
||||||
|
l := labels.FromStrings("__name__", "with.dots")
|
||||||
|
_, err = app.Append(0, l, 100, 1.0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
|
require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, 100)))
|
||||||
|
|
||||||
|
q, err := db.Querier(math.MinInt, math.MaxInt64)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, map[string][]chunks.Sample{
|
||||||
|
l.String(): {sample{t: 100, f: 1.0}},
|
||||||
|
}, query(t, q, labels.MustNewMatcher(labels.MatchEqual, "__name__", "with.dots")))
|
||||||
|
|
||||||
|
q, err = db.Querier(math.MinInt, math.MaxInt64)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, map[string][]chunks.Sample{}, query(t, q, labels.MustNewMatcher(labels.MatchEqual, "__name__", "with_dots")))
|
||||||
|
|
||||||
|
app = db.Appender(context.Background())
|
||||||
|
l = labels.FromStrings("__name__", "with_dots")
|
||||||
|
_, err = app.Append(0, l, 200, 2.0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
|
||||||
|
q, err = db.Querier(math.MinInt, math.MaxInt64)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, map[string][]chunks.Sample{
|
||||||
|
labels.FromStrings("__name__", "with.dots").String(): {sample{t: 100, f: 1.0}, sample{t: 200, f: 2.0}},
|
||||||
|
}, query(t, q, labels.MustNewMatcher(labels.MatchEqual, "__name__", "with.dots")))
|
||||||
|
}
|
||||||
|
|
101
tsdb/querier.go
101
tsdb/querier.go
|
@ -19,9 +19,11 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"slices"
|
"slices"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
|
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
"github.com/prometheus/prometheus/model/histogram"
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
@ -101,6 +103,104 @@ func (q *blockBaseQuerier) Close() error {
|
||||||
return errs.Err()
|
return errs.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type utf8MixedSeries struct {
|
||||||
|
s storage.Series
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u utf8MixedSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
|
||||||
|
return u.s.Iterator(it)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u utf8MixedSeries) Labels() labels.Labels {
|
||||||
|
lbls := labels.ScratchBuilder{}
|
||||||
|
u.s.Labels().Range(func(l labels.Label) {
|
||||||
|
var n, v string
|
||||||
|
if l.Name == "__name__" {
|
||||||
|
n = l.Name
|
||||||
|
v = strings.ReplaceAll(l.Value, "_", ".")
|
||||||
|
} else {
|
||||||
|
n = strings.ReplaceAll(l.Name, "_", ".")
|
||||||
|
v = l.Value
|
||||||
|
}
|
||||||
|
lbls.Add(n, v)
|
||||||
|
})
|
||||||
|
return lbls.Labels()
|
||||||
|
}
|
||||||
|
|
||||||
|
type utf8MixedSeriesSet struct {
|
||||||
|
ss storage.SeriesSet
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *utf8MixedSeriesSet) At() storage.Series {
|
||||||
|
return utf8MixedSeries{s: u.ss.At()}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *utf8MixedSeriesSet) Err() error {
|
||||||
|
return u.ss.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *utf8MixedSeriesSet) Next() bool {
|
||||||
|
return u.ss.Next()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *utf8MixedSeriesSet) Warnings() annotations.Annotations {
|
||||||
|
return u.ss.Warnings()
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewUTF8MixedSeriesSet(ss storage.SeriesSet) storage.SeriesSet {
|
||||||
|
return &utf8MixedSeriesSet{ss: ss}
|
||||||
|
}
|
||||||
|
|
||||||
|
type utf8MixedQuerier struct {
|
||||||
|
q storage.Querier
|
||||||
|
es model.EscapingScheme
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *utf8MixedQuerier) Close() error {
|
||||||
|
return u.q.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *utf8MixedQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
return u.q.LabelNames(ctx, hints, matchers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *utf8MixedQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
|
||||||
|
return u.q.LabelValues(ctx, name, hints, matchers...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *utf8MixedQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
|
||||||
|
ms2 := make([]*labels.Matcher, 0, len(matchers))
|
||||||
|
change := false
|
||||||
|
for i, m := range matchers {
|
||||||
|
ms2 = append(ms2, &labels.Matcher{})
|
||||||
|
if m.Type == labels.MatchEqual {
|
||||||
|
ms2[i].Name = strings.ReplaceAll(m.Name, ".", "_")
|
||||||
|
if ms2[i].Name == "__name__" {
|
||||||
|
ms2[i].Value = strings.ReplaceAll(m.Value, ".", "_")
|
||||||
|
} else {
|
||||||
|
ms2[i].Value = m.Value
|
||||||
|
}
|
||||||
|
if m.Name != ms2[i].Name || m.Value != ms2[i].Value {
|
||||||
|
change = true
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ms2[i] = m
|
||||||
|
}
|
||||||
|
ms2[i].Type = m.Type
|
||||||
|
}
|
||||||
|
sets := []storage.SeriesSet{
|
||||||
|
u.q.Select(ctx, sortSeries, hints, matchers...),
|
||||||
|
}
|
||||||
|
if change {
|
||||||
|
sets = append(sets, NewUTF8MixedSeriesSet(u.q.Select(ctx, sortSeries, hints, ms2...)))
|
||||||
|
}
|
||||||
|
return storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewUTF8MixedQuerier(q storage.Querier, es model.EscapingScheme) storage.Querier {
|
||||||
|
return &utf8MixedQuerier{q: q, es: es}
|
||||||
|
}
|
||||||
|
|
||||||
type blockQuerier struct {
|
type blockQuerier struct {
|
||||||
*blockBaseQuerier
|
*blockBaseQuerier
|
||||||
}
|
}
|
||||||
|
@ -111,6 +211,7 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &blockQuerier{blockBaseQuerier: q}, nil
|
return &blockQuerier{blockBaseQuerier: q}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue