From d354f20c2a636aa43fc083ad80fdac524a9850f4 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Wed, 14 Sep 2022 17:38:34 +0530 Subject: [PATCH] Add a feature flag to control native histogram ingestion (#11253) * Add runtime config to control native histogram ingestion Signed-off-by: Ganesh Vernekar * Make the config into a CLI flag Signed-off-by: Ganesh Vernekar Signed-off-by: Ganesh Vernekar --- cmd/prometheus/main.go | 5 ++++ storage/interface.go | 1 + tsdb/blockwriter.go | 1 + tsdb/compact_test.go | 1 + tsdb/db.go | 14 +++++++++++ tsdb/db_test.go | 50 +++++++++++++++++++++++++++++++++++++ tsdb/head.go | 13 ++++++++++ tsdb/head_append.go | 4 +++ tsdb/head_test.go | 5 +++- util/teststorage/storage.go | 1 + 10 files changed, 94 insertions(+), 1 deletion(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index a02c82db7..5a16e5c18 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -196,6 +196,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "no-default-scrape-port": c.scrape.NoDefaultPort = true level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.") + case "native-histograms": + c.tsdb.EnableNativeHistograms = true + level.Info(logger).Log("msg", "Experimental native histogram support enabled.") case "": continue case "promql-at-modifier", "promql-negative-offset": @@ -1542,6 +1545,7 @@ type tsdbOptions struct { EnableExemplarStorage bool MaxExemplars int64 EnableMemorySnapshotOnShutdown bool + EnableNativeHistograms bool } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { @@ -1560,6 +1564,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { EnableExemplarStorage: opts.EnableExemplarStorage, MaxExemplars: opts.MaxExemplars, EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown, + EnableNativeHistograms: opts.EnableNativeHistograms, } } diff --git a/storage/interface.go b/storage/interface.go index dad3e895d..e535549a9 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -36,6 +36,7 @@ var ( ErrDuplicateExemplar = errors.New("duplicate exemplar") ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength) ErrExemplarsDisabled = fmt.Errorf("exemplar storage is disabled or max exemplars is less than or equal to 0") + ErrNativeHistogramsDisabled = fmt.Errorf("native histograms are disabled") ErrHistogramSpanNegativeOffset = errors.New("histogram has a span whose offset is negative") ErrHistogramSpansBucketsMismatch = errors.New("histogram spans specify different number of buckets than provided") ErrHistogramNegativeBucketCount = errors.New("histogram has a bucket whose observation count is negative") diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index 09b355368..459179e45 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -71,6 +71,7 @@ func (w *BlockWriter) initHead() error { opts := DefaultHeadOptions() opts.ChunkRange = w.blockSize opts.ChunkDirRoot = w.chunkDir + opts.EnableNativeHistograms.Store(true) h, err := NewHead(nil, w.logger, nil, opts, NewHeadStats()) if err != nil { return errors.Wrap(err, "tsdb.NewHead") diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index f407cc9d0..c86d8c4c4 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1677,6 +1677,7 @@ func TestSparseHistogramCompactionAndQuery(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }) opts := DefaultOptions() + opts.EnableNativeHistograms = true // Exactly 3 times so that level 2 of compaction happens and tombstone // deletion and compaction considers the level 2 blocks to be big enough. opts.MaxBlockDuration = 3 * opts.MinBlockDuration diff --git a/tsdb/db.go b/tsdb/db.go index 0678a6ea9..6d72dfc15 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -160,6 +160,9 @@ type Options struct { // Disables isolation between reads and in-flight appends. IsolationDisabled bool + + // EnableNativeHistograms enables the ingestion of native histograms. + EnableNativeHistograms bool } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} @@ -719,6 +722,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.EnableExemplarStorage = opts.EnableExemplarStorage headOpts.MaxExemplars.Store(opts.MaxExemplars) headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown + headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms) if opts.IsolationDisabled { // We only override this flag if isolation is disabled at DB level. We use the default otherwise. headOpts.IsolationDisabled = opts.IsolationDisabled @@ -850,6 +854,16 @@ func (db *DB) ApplyConfig(conf *config.Config) error { return db.head.ApplyConfig(conf) } +// EnableNativeHistograms enables the native histogram feature. +func (db *DB) EnableNativeHistograms() { + db.head.EnableNativeHistograms() +} + +// DisableNativeHistograms disables the native histogram feature. +func (db *DB) DisableNativeHistograms() { + db.head.DisableNativeHistograms() +} + // dbAppender wraps the DB's head appender and triggers compactions on commit // if necessary. type dbAppender struct { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 0601c04e7..1384a4613 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -68,6 +68,11 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) { tmpdir := t.TempDir() var err error + if opts == nil { + opts = DefaultOptions() + } + opts.EnableNativeHistograms = true + if len(rngs) == 0 { db, err = Open(tmpdir, nil, nil, opts, nil) } else { @@ -4023,3 +4028,48 @@ func TestQueryHistogramFromBlocks(t *testing.T) { ) }) } + +func TestNativeHistogramFlag(t *testing.T) { + dir := t.TempDir() + db, err := Open(dir, nil, nil, nil, nil) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + h := &histogram.Histogram{ + Count: 6, + ZeroCount: 4, + ZeroThreshold: 0.001, + Sum: 35.5, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 2}, + }, + PositiveBuckets: []int64{1, 1, -1, 0}, + } + + l := labels.FromStrings("foo", "bar") + + app := db.Appender(context.Background()) + + // Disabled by default. + _, err = app.AppendHistogram(0, l, 100, h) + require.Equal(t, storage.ErrNativeHistogramsDisabled, err) + + // Enable and append. + db.EnableNativeHistograms() + _, err = app.AppendHistogram(0, l, 200, h) + require.NoError(t, err) + + db.DisableNativeHistograms() + _, err = app.AppendHistogram(0, l, 300, h) + require.Equal(t, storage.ErrNativeHistogramsDisabled, err) + + require.NoError(t, app.Commit()) + + q, err := db.Querier(context.Background(), math.MinInt, math.MaxInt64) + require.NoError(t, err) + act := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Equal(t, map[string][]tsdbutil.Sample{l.String(): {sample{t: 200, h: h}}}, act) +} diff --git a/tsdb/head.go b/tsdb/head.go index 7f22063c3..b2e897b5a 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -126,6 +126,9 @@ type HeadOptions struct { // https://pkg.go.dev/sync/atomic#pkg-note-BUG MaxExemplars atomic.Int64 + // EnableNativeHistograms enables the ingestion of native histograms. + EnableNativeHistograms atomic.Bool + ChunkRange int64 // ChunkDirRoot is the parent directory of the chunks directory. ChunkDirRoot string @@ -745,6 +748,16 @@ func (h *Head) ApplyConfig(cfg *config.Config) error { return nil } +// EnableNativeHistograms enables the native histogram feature. +func (h *Head) EnableNativeHistograms() { + h.opts.EnableNativeHistograms.Store(true) +} + +// DisableNativeHistograms disables the native histogram feature. +func (h *Head) DisableNativeHistograms() { + h.opts.EnableNativeHistograms.Store(false) +} + // PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names. func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats { h.cardinalityMutex.Lock() diff --git a/tsdb/head_append.go b/tsdb/head_append.go index f3ec2173c..ef31bd8bf 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -439,6 +439,10 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels, } func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { + if !a.head.opts.EnableNativeHistograms.Load() { + return 0, storage.ErrNativeHistogramsDisabled + } + if t < a.minValidTime { a.head.metrics.outOfBoundSamples.Inc() return 0, storage.ErrOutOfBounds diff --git a/tsdb/head_test.go b/tsdb/head_test.go index db14ffb25..e747885b9 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -61,6 +61,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. opts.ChunkDirRoot = dir opts.EnableExemplarStorage = true opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) + opts.EnableNativeHistograms.Store(true) h, err := NewHead(nil, nil, wlog, opts, nil) require.NoError(t, err) @@ -3476,7 +3477,9 @@ func TestHistogramCounterResetHeader(t *testing.T) { func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { dir := t.TempDir() - db, err := Open(dir, nil, nil, DefaultOptions(), nil) + opts := DefaultOptions() + opts.EnableNativeHistograms = true + db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, db.Close()) diff --git a/util/teststorage/storage.go b/util/teststorage/storage.go index ff9b33bbd..5d95437e9 100644 --- a/util/teststorage/storage.go +++ b/util/teststorage/storage.go @@ -39,6 +39,7 @@ func New(t testutil.T) *TestStorage { opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond) opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond) opts.RetentionDuration = 0 + opts.EnableNativeHistograms = true db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats()) require.NoError(t, err, "unexpected error while opening test storage") reg := prometheus.NewRegistry()