diff --git a/tsdb/db.go b/tsdb/db.go index 2b0525ab6..e02ad5fae 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -118,19 +118,27 @@ type Options struct { // SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. // It is always a no-op in Prometheus and mainly meant for external users who import TSDB. SeriesLifecycleCallback SeriesLifecycleCallback + + // BlocksToDelete is a function which returns the blocks which can be deleted. + // It is always the default time and size based retention in Prometheus and + // mainly meant for external users who import TSDB. + BlocksToDelete BlocksToDeleteFunc } +type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} + // DB handles reads and writes of time series falling into // a hashed partition of a seriedb. type DB struct { dir string lockf fileutil.Releaser - logger log.Logger - metrics *dbMetrics - opts *Options - chunkPool chunkenc.Pool - compactor Compactor + logger log.Logger + metrics *dbMetrics + opts *Options + chunkPool chunkenc.Pool + compactor Compactor + blocksToDelete BlocksToDeleteFunc // Mutex for that must be held when modifying the general block layout. mtx sync.RWMutex @@ -560,14 +568,18 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } db = &DB{ - dir: dir, - logger: l, - opts: opts, - compactc: make(chan struct{}, 1), - donec: make(chan struct{}), - stopc: make(chan struct{}), - autoCompact: true, - chunkPool: chunkenc.NewPool(), + dir: dir, + logger: l, + opts: opts, + compactc: make(chan struct{}, 1), + donec: make(chan struct{}), + stopc: make(chan struct{}), + autoCompact: true, + chunkPool: chunkenc.NewPool(), + blocksToDelete: opts.BlocksToDelete, + } + if db.blocksToDelete == nil { + db.blocksToDelete = DefaultBlocksToDelete(db) } if !opts.NoLockfile { @@ -871,13 +883,17 @@ func (db *DB) reload() (err error) { return err } - deletable := db.deletableBlocks(loadable) + deletableULIDs := db.blocksToDelete(loadable) + deletable := make(map[ulid.ULID]*Block, len(deletableULIDs)) // Corrupted blocks that have been superseded by a loadable block can be safely ignored. // This makes it resilient against the process crashing towards the end of a compaction. // Creation of a new block and deletion of its parents cannot happen atomically. // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. for _, block := range loadable { + if _, ok := deletableULIDs[block.meta.ULID]; ok { + deletable[block.meta.ULID] = block + } for _, b := range block.Meta().Compaction.Parents { delete(corrupted, b.ULID) deletable[b.ULID] = nil @@ -986,9 +1002,17 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po return blocks, corrupted, nil } +// DefaultBlocksToDelete returns a filter which decides time based and size based +// retention from the options of the db. +func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc { + return func(blocks []*Block) map[ulid.ULID]struct{} { + return deletableBlocks(db, blocks) + } +} + // deletableBlocks returns all blocks past retention policy. -func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { - deletable := make(map[ulid.ULID]*Block) +func deletableBlocks(db *DB, blocks []*Block) map[ulid.ULID]struct{} { + deletable := make(map[ulid.ULID]struct{}) // Sort the blocks by time - newest to oldest (largest to smallest timestamp). // This ensures that the retentions will remove the oldest blocks. @@ -998,34 +1022,36 @@ func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { for _, block := range blocks { if block.Meta().Compaction.Deletable { - deletable[block.Meta().ULID] = block + deletable[block.Meta().ULID] = struct{}{} } } - for ulid, block := range db.beyondTimeRetention(blocks) { - deletable[ulid] = block + for ulid := range BeyondTimeRetention(db, blocks) { + deletable[ulid] = struct{}{} } - for ulid, block := range db.beyondSizeRetention(blocks) { - deletable[ulid] = block + for ulid := range BeyondSizeRetention(db, blocks) { + deletable[ulid] = struct{}{} } return deletable } -func (db *DB) beyondTimeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) { +// BeyondTimeRetention returns those blocks which are beyond the time retention +// set in the db options. +func BeyondTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{}) { // Time retention is disabled or no blocks to work with. if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 { return } - deletable = make(map[ulid.ULID]*Block) + deletable = make(map[ulid.ULID]struct{}) for i, block := range blocks { // The difference between the first block and this block is larger than // the retention period so any blocks after that are added as deletable. if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > db.opts.RetentionDuration { for _, b := range blocks[i:] { - deletable[b.meta.ULID] = b + deletable[b.meta.ULID] = struct{}{} } db.metrics.timeRetentionCount.Inc() break @@ -1034,13 +1060,15 @@ func (db *DB) beyondTimeRetention(blocks []*Block) (deletable map[ulid.ULID]*Blo return deletable } -func (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Block) { +// BeyondSizeRetention returns those blocks which are beyond the size retention +// set in the db options. +func BeyondSizeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{}) { // Size retention is disabled or no blocks to work with. if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 { return } - deletable = make(map[ulid.ULID]*Block) + deletable = make(map[ulid.ULID]struct{}) walSize, _ := db.Head().wal.Size() headChunksSize := db.Head().chunkDiskMapper.Size() @@ -1052,7 +1080,7 @@ func (db *DB) beyondSizeRetention(blocks []*Block) (deletable map[ulid.ULID]*Blo if blocksSize > int64(db.opts.MaxBytes) { // Add this and all following blocks for deletion. for _, b := range blocks[i:] { - deletable[b.meta.ULID] = b + deletable[b.meta.ULID] = struct{}{} } db.metrics.sizeRetentionCount.Inc() break