diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index cd0775bbdd..d262e91767 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1826,6 +1826,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { EnableDelayedCompaction: opts.EnableDelayedCompaction, CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, + StartupMinRetentionTime: time.Now(), } } diff --git a/tsdb/db.go b/tsdb/db.go index 2cfa4f3638..163ddfd075 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -92,6 +92,7 @@ func DefaultOptions() *Options { CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent, CompactionDelay: time.Duration(0), PostingsDecoderFactory: DefaultPostingsDecoderFactory, + StartupMinRetentionTime: time.Now(), } } @@ -224,6 +225,10 @@ type Options struct { // PostingsDecoderFactory allows users to customize postings decoders based on BlockMeta. // By default, DefaultPostingsDecoderFactory will be used to create raw posting decoder. PostingsDecoderFactory PostingsDecoderFactory + + // StartupMinRetentionTime is the used to delete blocks and ignore samples from the WAL + // during the startup of the TSDB. + StartupMinRetentionTime time.Time } type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) @@ -814,6 +819,9 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { if opts.OutOfOrderTimeWindow < 0 { opts.OutOfOrderTimeWindow = 0 } + if opts.StartupMinRetentionTime.IsZero() { + opts.StartupMinRetentionTime = time.Now() + } if len(rngs) == 0 { // Start with smallest block duration and create exponential buckets until the exceed the @@ -885,10 +893,6 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn returnedErr = errs.Err() }() - if db.blocksToDelete == nil { - db.blocksToDelete = DefaultBlocksToDelete(db) - } - var err error db.locker, err = tsdbutil.NewDirLocker(dir, "tsdb", db.logger, r) if err != nil { @@ -952,6 +956,11 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn } } } + + if db.blocksToDelete == nil { + db.blocksToDelete = DefaultBlocksToDelete(db) + } + db.oooWasEnabled.Store(opts.OutOfOrderTimeWindow > 0) headOpts := DefaultHeadOptions() headOpts.ChunkRange = rngs[0]