diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index d262e91767..3544917d1e 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1826,7 +1826,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { EnableDelayedCompaction: opts.EnableDelayedCompaction, CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, EnableOverlappingCompaction: opts.EnableOverlappingCompaction, - StartupMinRetentionTime: time.Now(), + StartupMinRetentionTime: int64(time.Duration(time.Now().Add(-time.Duration(opts.RetentionDuration) / time.Millisecond).UnixMilli())), } } diff --git a/tsdb/db.go b/tsdb/db.go index 163ddfd075..615ab8fea8 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -92,7 +92,7 @@ func DefaultOptions() *Options { CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent, CompactionDelay: time.Duration(0), PostingsDecoderFactory: DefaultPostingsDecoderFactory, - StartupMinRetentionTime: time.Now(), + StartupMinRetentionTime: time.Now().Add(-15 * 24 * time.Hour / time.Millisecond).UnixMilli(), } } @@ -228,7 +228,7 @@ type Options struct { // StartupMinRetentionTime is the used to delete blocks and ignore samples from the WAL // during the startup of the TSDB. - StartupMinRetentionTime time.Time + StartupMinRetentionTime int64 } type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) @@ -819,8 +819,9 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { if opts.OutOfOrderTimeWindow < 0 { opts.OutOfOrderTimeWindow = 0 } - if opts.StartupMinRetentionTime.IsZero() { - opts.StartupMinRetentionTime = time.Now() + if opts.StartupMinRetentionTime == 0 { + minStartupDuration := time.Now().Add(-time.Duration(opts.RetentionDuration) / time.Millisecond) + opts.StartupMinRetentionTime = int64(time.Duration(minStartupDuration.UnixMilli())) } if len(rngs) == 0 { @@ -932,6 +933,21 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn db.blockChunkQuerierFunc = opts.BlockChunkQuerierFunc } + // store the current blocksToDeleteFunc and replace it with a startup specific one. + originalBlocksToDelete := db.blocksToDelete + db.blocksToDelete = func(blocks []*Block) map[ulid.ULID]struct{} { + return deletableBlocks(db, blocks, beyondStartupTimeRetention, BeyondSizeRetention) + } + // Reload blocks so the retention policy is applied to the blocks. + // based on the current blocksToDeleteFunc. + err = db.reloadBlocks() + if err != nil { + return nil, err + } + + // Restore the original blocksToDeleteFunc. + db.blocksToDelete = originalBlocksToDelete + var wal, wbl *wlog.WL segmentSize := wlog.DefaultSegmentSize // Wal is enabled. @@ -1710,12 +1726,14 @@ func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc. // retention from the options of the db. func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc { return func(blocks []*Block) map[ulid.ULID]struct{} { - return deletableBlocks(db, blocks) + return deletableBlocks(db, blocks, BeyondSizeRetention, BeyondTimeRetention) } } +type DeletableFilterFunc func(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{}) + // deletableBlocks returns all currently loaded blocks past retention policy or already compacted into a new block. -func deletableBlocks(db *DB, blocks []*Block) map[ulid.ULID]struct{} { +func deletableBlocks(db *DB, blocks []*Block, filterFuncs ...DeletableFilterFunc) map[ulid.ULID]struct{} { deletable := make(map[ulid.ULID]struct{}) // Sort the blocks by time - newest to oldest (largest to smallest timestamp). @@ -1737,17 +1755,33 @@ func deletableBlocks(db *DB, blocks []*Block) map[ulid.ULID]struct{} { } } - for ulid := range BeyondTimeRetention(db, blocks) { - deletable[ulid] = struct{}{} - } - - for ulid := range BeyondSizeRetention(db, blocks) { - deletable[ulid] = struct{}{} + for _, filterFunc := range filterFuncs { + for ulid := range filterFunc(db, blocks) { + deletable[ulid] = struct{}{} + } } return deletable } +func beyondStartupTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{}) { + if len(blocks) == 0 || db.opts.StartupMinRetentionTime == 0 { + return + } + + deletable = make(map[ulid.ULID]struct{}) + for _, block := range blocks { + if block.Meta().MaxTime >= int64(db.opts.StartupMinRetentionTime) { + for _, b := range blocks { + deletable[b.meta.ULID] = struct{}{} + } + db.metrics.timeRetentionCount.Inc() + } + } + + return +} + // BeyondTimeRetention returns those blocks which are beyond the time retention // set in the db options. func BeyondTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{}) {