[CHORE] deleting blocks on startup

Signed-off-by: Nicolas Takashi <nicolas.tcs@hotmail.com>
This commit is contained in:
Nicolas Takashi 2024-12-20 23:27:35 +00:00
parent 9d307e8417
commit b7cf351e06
No known key found for this signature in database
GPG key ID: 65BE5B80BBC7A707
2 changed files with 47 additions and 13 deletions

View file

@ -1826,7 +1826,7 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
EnableDelayedCompaction: opts.EnableDelayedCompaction, EnableDelayedCompaction: opts.EnableDelayedCompaction,
CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent, CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction, EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
StartupMinRetentionTime: time.Now(), StartupMinRetentionTime: int64(time.Duration(time.Now().Add(-time.Duration(opts.RetentionDuration) / time.Millisecond).UnixMilli())),
} }
} }

View file

@ -92,7 +92,7 @@ func DefaultOptions() *Options {
CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent, CompactionDelayMaxPercent: DefaultCompactionDelayMaxPercent,
CompactionDelay: time.Duration(0), CompactionDelay: time.Duration(0),
PostingsDecoderFactory: DefaultPostingsDecoderFactory, PostingsDecoderFactory: DefaultPostingsDecoderFactory,
StartupMinRetentionTime: time.Now(), StartupMinRetentionTime: time.Now().Add(-15 * 24 * time.Hour / time.Millisecond).UnixMilli(),
} }
} }
@ -228,7 +228,7 @@ type Options struct {
// StartupMinRetentionTime is the used to delete blocks and ignore samples from the WAL // StartupMinRetentionTime is the used to delete blocks and ignore samples from the WAL
// during the startup of the TSDB. // during the startup of the TSDB.
StartupMinRetentionTime time.Time StartupMinRetentionTime int64
} }
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l *slog.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
@ -819,8 +819,9 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
if opts.OutOfOrderTimeWindow < 0 { if opts.OutOfOrderTimeWindow < 0 {
opts.OutOfOrderTimeWindow = 0 opts.OutOfOrderTimeWindow = 0
} }
if opts.StartupMinRetentionTime.IsZero() { if opts.StartupMinRetentionTime == 0 {
opts.StartupMinRetentionTime = time.Now() minStartupDuration := time.Now().Add(-time.Duration(opts.RetentionDuration) / time.Millisecond)
opts.StartupMinRetentionTime = int64(time.Duration(minStartupDuration.UnixMilli()))
} }
if len(rngs) == 0 { if len(rngs) == 0 {
@ -932,6 +933,21 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
db.blockChunkQuerierFunc = opts.BlockChunkQuerierFunc db.blockChunkQuerierFunc = opts.BlockChunkQuerierFunc
} }
// store the current blocksToDeleteFunc and replace it with a startup specific one.
originalBlocksToDelete := db.blocksToDelete
db.blocksToDelete = func(blocks []*Block) map[ulid.ULID]struct{} {
return deletableBlocks(db, blocks, beyondStartupTimeRetention, BeyondSizeRetention)
}
// Reload blocks so the retention policy is applied to the blocks.
// based on the current blocksToDeleteFunc.
err = db.reloadBlocks()
if err != nil {
return nil, err
}
// Restore the original blocksToDeleteFunc.
db.blocksToDelete = originalBlocksToDelete
var wal, wbl *wlog.WL var wal, wbl *wlog.WL
segmentSize := wlog.DefaultSegmentSize segmentSize := wlog.DefaultSegmentSize
// Wal is enabled. // Wal is enabled.
@ -1710,12 +1726,14 @@ func openBlocks(l *slog.Logger, dir string, loaded []*Block, chunkPool chunkenc.
// retention from the options of the db. // retention from the options of the db.
func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc { func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc {
return func(blocks []*Block) map[ulid.ULID]struct{} { return func(blocks []*Block) map[ulid.ULID]struct{} {
return deletableBlocks(db, blocks) return deletableBlocks(db, blocks, BeyondSizeRetention, BeyondTimeRetention)
} }
} }
type DeletableFilterFunc func(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{})
// deletableBlocks returns all currently loaded blocks past retention policy or already compacted into a new block. // deletableBlocks returns all currently loaded blocks past retention policy or already compacted into a new block.
func deletableBlocks(db *DB, blocks []*Block) map[ulid.ULID]struct{} { func deletableBlocks(db *DB, blocks []*Block, filterFuncs ...DeletableFilterFunc) map[ulid.ULID]struct{} {
deletable := make(map[ulid.ULID]struct{}) deletable := make(map[ulid.ULID]struct{})
// Sort the blocks by time - newest to oldest (largest to smallest timestamp). // Sort the blocks by time - newest to oldest (largest to smallest timestamp).
@ -1737,17 +1755,33 @@ func deletableBlocks(db *DB, blocks []*Block) map[ulid.ULID]struct{} {
} }
} }
for ulid := range BeyondTimeRetention(db, blocks) { for _, filterFunc := range filterFuncs {
deletable[ulid] = struct{}{} for ulid := range filterFunc(db, blocks) {
} deletable[ulid] = struct{}{}
}
for ulid := range BeyondSizeRetention(db, blocks) {
deletable[ulid] = struct{}{}
} }
return deletable return deletable
} }
func beyondStartupTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{}) {
if len(blocks) == 0 || db.opts.StartupMinRetentionTime == 0 {
return
}
deletable = make(map[ulid.ULID]struct{})
for _, block := range blocks {
if block.Meta().MaxTime >= int64(db.opts.StartupMinRetentionTime) {
for _, b := range blocks {
deletable[b.meta.ULID] = struct{}{}
}
db.metrics.timeRetentionCount.Inc()
}
}
return
}
// BeyondTimeRetention returns those blocks which are beyond the time retention // BeyondTimeRetention returns those blocks which are beyond the time retention
// set in the db options. // set in the db options.
func BeyondTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{}) { func BeyondTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{}) {