From cdf4b3ec3a72c98ac6c9b035d71c4330d9653c2b Mon Sep 17 00:00:00 2001 From: Lukasz Mierzwa Date: Tue, 25 Feb 2025 12:43:24 +0000 Subject: [PATCH] Add a TSDB PreInitFunc hook and move mapCommonLabelSymbols() call there We need to call mapCommonLabelSymbols() once TSDB opens all blocks, but before we start to reply the WAL and populate the HEAD. There doesn't seem to be a way to do this right now, so add a hook we can use for it. Signed-off-by: Lukasz Mierzwa --- cmd/prometheus/main.go | 12 ++++++++---- tsdb/db.go | 9 +++++++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index c43e39f927..5b05f225cc 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -853,6 +853,12 @@ func main() { cfg.web.Flags = map[string]string{} + cfg.tsdb.PreInitFunc = func(db *tsdb.DB) { + if err = mapCommonLabelSymbols(db, logger); err != nil { + logger.Warn("Failed to map common strings in labels", slog.Any("err", err)) + } + } + // Exclude kingpin default flags to expose only Prometheus ones. boilerplateFlags := kingpin.New("", "").Version("") for _, f := range a.Model().Flags { @@ -1242,10 +1248,6 @@ func main() { return fmt.Errorf("opening storage failed: %w", err) } - if err = mapCommonLabelSymbols(db, logger); err != nil { - logger.Warn("Failed to map common strings in labels", slog.Any("err", err)) - } - switch fsType := prom_runtime.Statfs(localStoragePath); fsType { case "NFS_SUPER_MAGIC": logger.Warn("This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.", "fs_type", fsType) @@ -1801,6 +1803,7 @@ type tsdbOptions struct { CompactionDelayMaxPercent int EnableOverlappingCompaction bool EnableOOONativeHistograms bool + PreInitFunc tsdb.PreInitFunc } func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { @@ -1825,6 +1828,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { EnableDelayedCompaction: opts.EnableDelayedCompaction, CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, + PreInitFunc: opts.PreInitFunc, } } diff --git a/tsdb/db.go b/tsdb/db.go index 9ab150c5b4..bd02ea4103 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -224,6 +224,9 @@ type Options struct { // PostingsDecoderFactory allows users to customize postings decoders based on BlockMeta. // By default, DefaultPostingsDecoderFactory will be used to create raw posting decoder. PostingsDecoderFactory PostingsDecoderFactory + + // PreInitFunc is a function that will be called before the HEAD is initialized. + PreInitFunc PreInitFunc } type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) @@ -234,6 +237,8 @@ type BlockQuerierFunc func(b BlockReader, mint, maxt int64) (storage.Querier, er type BlockChunkQuerierFunc func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) +type PreInitFunc func(*DB) + // DB handles reads and writes of time series falling into // a hashed partition of a seriedb. type DB struct { @@ -1011,6 +1016,10 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn minValidTime = inOrderMaxTime } + if db.opts.PreInitFunc != nil { + db.opts.PreInitFunc(db) + } + if initErr := db.head.Init(minValidTime); initErr != nil { db.head.metrics.walCorruptionsTotal.Inc() var e *errLoadWbl