Add a feature flag to control native histogram ingestion (#11253)

* Add runtime config to control native histogram ingestion

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Make the config into a CLI flag

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ganesh Vernekar 2022-09-14 17:38:34 +05:30 committed by GitHub
parent b2d01cbc57
commit d354f20c2a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 94 additions and 1 deletions

View file

@ -196,6 +196,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
case "no-default-scrape-port": case "no-default-scrape-port":
c.scrape.NoDefaultPort = true c.scrape.NoDefaultPort = true
level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.") level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.")
case "native-histograms":
c.tsdb.EnableNativeHistograms = true
level.Info(logger).Log("msg", "Experimental native histogram support enabled.")
case "": case "":
continue continue
case "promql-at-modifier", "promql-negative-offset": case "promql-at-modifier", "promql-negative-offset":
@ -1542,6 +1545,7 @@ type tsdbOptions struct {
EnableExemplarStorage bool EnableExemplarStorage bool
MaxExemplars int64 MaxExemplars int64
EnableMemorySnapshotOnShutdown bool EnableMemorySnapshotOnShutdown bool
EnableNativeHistograms bool
} }
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
@ -1560,6 +1564,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
EnableExemplarStorage: opts.EnableExemplarStorage, EnableExemplarStorage: opts.EnableExemplarStorage,
MaxExemplars: opts.MaxExemplars, MaxExemplars: opts.MaxExemplars,
EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown, EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown,
EnableNativeHistograms: opts.EnableNativeHistograms,
} }
} }

View file

@ -36,6 +36,7 @@ var (
ErrDuplicateExemplar = errors.New("duplicate exemplar") ErrDuplicateExemplar = errors.New("duplicate exemplar")
ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength) ErrExemplarLabelLength = fmt.Errorf("label length for exemplar exceeds maximum of %d UTF-8 characters", exemplar.ExemplarMaxLabelSetLength)
ErrExemplarsDisabled = fmt.Errorf("exemplar storage is disabled or max exemplars is less than or equal to 0") ErrExemplarsDisabled = fmt.Errorf("exemplar storage is disabled or max exemplars is less than or equal to 0")
ErrNativeHistogramsDisabled = fmt.Errorf("native histograms are disabled")
ErrHistogramSpanNegativeOffset = errors.New("histogram has a span whose offset is negative") ErrHistogramSpanNegativeOffset = errors.New("histogram has a span whose offset is negative")
ErrHistogramSpansBucketsMismatch = errors.New("histogram spans specify different number of buckets than provided") ErrHistogramSpansBucketsMismatch = errors.New("histogram spans specify different number of buckets than provided")
ErrHistogramNegativeBucketCount = errors.New("histogram has a bucket whose observation count is negative") ErrHistogramNegativeBucketCount = errors.New("histogram has a bucket whose observation count is negative")

View file

@ -71,6 +71,7 @@ func (w *BlockWriter) initHead() error {
opts := DefaultHeadOptions() opts := DefaultHeadOptions()
opts.ChunkRange = w.blockSize opts.ChunkRange = w.blockSize
opts.ChunkDirRoot = w.chunkDir opts.ChunkDirRoot = w.chunkDir
opts.EnableNativeHistograms.Store(true)
h, err := NewHead(nil, w.logger, nil, opts, NewHeadStats()) h, err := NewHead(nil, w.logger, nil, opts, NewHeadStats())
if err != nil { if err != nil {
return errors.Wrap(err, "tsdb.NewHead") return errors.Wrap(err, "tsdb.NewHead")

View file

@ -1677,6 +1677,7 @@ func TestSparseHistogramCompactionAndQuery(t *testing.T) {
require.NoError(t, os.RemoveAll(dir)) require.NoError(t, os.RemoveAll(dir))
}) })
opts := DefaultOptions() opts := DefaultOptions()
opts.EnableNativeHistograms = true
// Exactly 3 times so that level 2 of compaction happens and tombstone // Exactly 3 times so that level 2 of compaction happens and tombstone
// deletion and compaction considers the level 2 blocks to be big enough. // deletion and compaction considers the level 2 blocks to be big enough.
opts.MaxBlockDuration = 3 * opts.MinBlockDuration opts.MaxBlockDuration = 3 * opts.MinBlockDuration

View file

@ -160,6 +160,9 @@ type Options struct {
// Disables isolation between reads and in-flight appends. // Disables isolation between reads and in-flight appends.
IsolationDisabled bool IsolationDisabled bool
// EnableNativeHistograms enables the ingestion of native histograms.
EnableNativeHistograms bool
} }
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
@ -719,6 +722,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.EnableExemplarStorage = opts.EnableExemplarStorage headOpts.EnableExemplarStorage = opts.EnableExemplarStorage
headOpts.MaxExemplars.Store(opts.MaxExemplars) headOpts.MaxExemplars.Store(opts.MaxExemplars)
headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown
headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms)
if opts.IsolationDisabled { if opts.IsolationDisabled {
// We only override this flag if isolation is disabled at DB level. We use the default otherwise. // We only override this flag if isolation is disabled at DB level. We use the default otherwise.
headOpts.IsolationDisabled = opts.IsolationDisabled headOpts.IsolationDisabled = opts.IsolationDisabled
@ -850,6 +854,16 @@ func (db *DB) ApplyConfig(conf *config.Config) error {
return db.head.ApplyConfig(conf) return db.head.ApplyConfig(conf)
} }
// EnableNativeHistograms enables the native histogram feature.
func (db *DB) EnableNativeHistograms() {
db.head.EnableNativeHistograms()
}
// DisableNativeHistograms disables the native histogram feature.
func (db *DB) DisableNativeHistograms() {
db.head.DisableNativeHistograms()
}
// dbAppender wraps the DB's head appender and triggers compactions on commit // dbAppender wraps the DB's head appender and triggers compactions on commit
// if necessary. // if necessary.
type dbAppender struct { type dbAppender struct {

View file

@ -68,6 +68,11 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) {
tmpdir := t.TempDir() tmpdir := t.TempDir()
var err error var err error
if opts == nil {
opts = DefaultOptions()
}
opts.EnableNativeHistograms = true
if len(rngs) == 0 { if len(rngs) == 0 {
db, err = Open(tmpdir, nil, nil, opts, nil) db, err = Open(tmpdir, nil, nil, opts, nil)
} else { } else {
@ -4023,3 +4028,48 @@ func TestQueryHistogramFromBlocks(t *testing.T) {
) )
}) })
} }
func TestNativeHistogramFlag(t *testing.T) {
dir := t.TempDir()
db, err := Open(dir, nil, nil, nil, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close())
})
h := &histogram.Histogram{
Count: 6,
ZeroCount: 4,
ZeroThreshold: 0.001,
Sum: 35.5,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 2, Length: 2},
},
PositiveBuckets: []int64{1, 1, -1, 0},
}
l := labels.FromStrings("foo", "bar")
app := db.Appender(context.Background())
// Disabled by default.
_, err = app.AppendHistogram(0, l, 100, h)
require.Equal(t, storage.ErrNativeHistogramsDisabled, err)
// Enable and append.
db.EnableNativeHistograms()
_, err = app.AppendHistogram(0, l, 200, h)
require.NoError(t, err)
db.DisableNativeHistograms()
_, err = app.AppendHistogram(0, l, 300, h)
require.Equal(t, storage.ErrNativeHistogramsDisabled, err)
require.NoError(t, app.Commit())
q, err := db.Querier(context.Background(), math.MinInt, math.MaxInt64)
require.NoError(t, err)
act := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.Equal(t, map[string][]tsdbutil.Sample{l.String(): {sample{t: 200, h: h}}}, act)
}

View file

@ -126,6 +126,9 @@ type HeadOptions struct {
// https://pkg.go.dev/sync/atomic#pkg-note-BUG // https://pkg.go.dev/sync/atomic#pkg-note-BUG
MaxExemplars atomic.Int64 MaxExemplars atomic.Int64
// EnableNativeHistograms enables the ingestion of native histograms.
EnableNativeHistograms atomic.Bool
ChunkRange int64 ChunkRange int64
// ChunkDirRoot is the parent directory of the chunks directory. // ChunkDirRoot is the parent directory of the chunks directory.
ChunkDirRoot string ChunkDirRoot string
@ -745,6 +748,16 @@ func (h *Head) ApplyConfig(cfg *config.Config) error {
return nil return nil
} }
// EnableNativeHistograms enables the native histogram feature.
func (h *Head) EnableNativeHistograms() {
h.opts.EnableNativeHistograms.Store(true)
}
// DisableNativeHistograms disables the native histogram feature.
func (h *Head) DisableNativeHistograms() {
h.opts.EnableNativeHistograms.Store(false)
}
// PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names. // PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names.
func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats { func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.PostingsStats {
h.cardinalityMutex.Lock() h.cardinalityMutex.Lock()

View file

@ -439,6 +439,10 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels,
} }
func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) { func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram) (storage.SeriesRef, error) {
if !a.head.opts.EnableNativeHistograms.Load() {
return 0, storage.ErrNativeHistogramsDisabled
}
if t < a.minValidTime { if t < a.minValidTime {
a.head.metrics.outOfBoundSamples.Inc() a.head.metrics.outOfBoundSamples.Inc()
return 0, storage.ErrOutOfBounds return 0, storage.ErrOutOfBounds

View file

@ -61,6 +61,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.
opts.ChunkDirRoot = dir opts.ChunkDirRoot = dir
opts.EnableExemplarStorage = true opts.EnableExemplarStorage = true
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
opts.EnableNativeHistograms.Store(true)
h, err := NewHead(nil, nil, wlog, opts, nil) h, err := NewHead(nil, nil, wlog, opts, nil)
require.NoError(t, err) require.NoError(t, err)
@ -3476,7 +3477,9 @@ func TestHistogramCounterResetHeader(t *testing.T) {
func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
db, err := Open(dir, nil, nil, DefaultOptions(), nil) opts := DefaultOptions()
opts.EnableNativeHistograms = true
db, err := Open(dir, nil, nil, opts, nil)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, db.Close()) require.NoError(t, db.Close())

View file

@ -39,6 +39,7 @@ func New(t testutil.T) *TestStorage {
opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond) opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond) opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.RetentionDuration = 0 opts.RetentionDuration = 0
opts.EnableNativeHistograms = true
db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats()) db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats())
require.NoError(t, err, "unexpected error while opening test storage") require.NoError(t, err, "unexpected error while opening test storage")
reg := prometheus.NewRegistry() reg := prometheus.NewRegistry()