diff --git a/tsdb/db.go b/tsdb/db.go index 300348435..2c15b13ff 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -921,20 +921,23 @@ func (db *DB) reload() (err error) { deletableULIDs := db.blocksToDelete(loadable) deletable := make(map[ulid.ULID]*Block, len(deletableULIDs)) - // Corrupted blocks that have been superseded by a loadable block can be safely ignored. - // This makes it resilient against the process crashing towards the end of a compaction. - // Creation of a new block and deletion of its parents cannot happen atomically. - // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. + // Mark all parents of loaded blocks as deletable (no matter if they exists). This makes it resilient against the process + // crashing towards the end of a compaction but before deletions. By doing that, we can pick up the deletion where it left off during a crash. for _, block := range loadable { if _, ok := deletableULIDs[block.meta.ULID]; ok { deletable[block.meta.ULID] = block } for _, b := range block.Meta().Compaction.Parents { - delete(corrupted, b.ULID) + if _, ok := corrupted[b.ULID]; ok { + delete(corrupted, b.ULID) + level.Warn(db.logger).Log("msg", "Found corrupted block, but replaced by compacted one so it's safe to delete. This should not happen with atomic deletes.", "block", b.ULID) + } deletable[b.ULID] = nil } } + if len(corrupted) > 0 { + // Corrupted but no child loaded for it. // Close all new blocks to release the lock for windows. for _, block := range loadable { if _, open := getBlock(db.blocks, block.Meta().ULID); !open { @@ -948,28 +951,28 @@ func (db *DB) reload() (err error) { return merr.Err() } - // All deletable blocks should not be loaded. var ( - bb []*Block + toLoad []*Block blocksSize int64 ) + // All deletable blocks should be unloaded. + // NOTE: We need to loop through loadable one more time as there might be loadable ready to be removed (replaced by compacted block). for _, block := range loadable { if _, ok := deletable[block.Meta().ULID]; ok { deletable[block.Meta().ULID] = block continue } - bb = append(bb, block) - blocksSize += block.Size() + toLoad = append(toLoad, block) + blocksSize += block.Size() } - loadable = bb db.metrics.blocksBytes.Set(float64(blocksSize)) - sort.Slice(loadable, func(i, j int) bool { - return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime + sort.Slice(toLoad, func(i, j int) bool { + return toLoad[i].Meta().MinTime < toLoad[j].Meta().MinTime }) if !db.opts.AllowOverlappingBlocks { - if err := validateBlockSequence(loadable); err != nil { + if err := validateBlockSequence(toLoad); err != nil { return errors.Wrap(err, "invalid block sequence") } } @@ -977,36 +980,33 @@ func (db *DB) reload() (err error) { // Swap new blocks first for subsequently created readers to be seen. db.mtx.Lock() oldBlocks := db.blocks - db.blocks = loadable + db.blocks = toLoad db.mtx.Unlock() - blockMetas := make([]BlockMeta, 0, len(loadable)) - for _, b := range loadable { + blockMetas := make([]BlockMeta, 0, len(toLoad)) + for _, b := range toLoad { blockMetas = append(blockMetas, b.Meta()) } if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { level.Warn(db.logger).Log("msg", "Overlapping blocks found during reload", "detail", overlaps.String()) } + // Append blocks to old, deletable blocks, so we can close them. for _, b := range oldBlocks { if _, ok := deletable[b.Meta().ULID]; ok { deletable[b.Meta().ULID] = b } } - if err := db.deleteBlocks(deletable); err != nil { return err } // Garbage collect data in the head if the most recent persisted block // covers data of its current time range. - if len(loadable) == 0 { + if len(toLoad) == 0 { return nil } - - maxt := loadable[len(loadable)-1].Meta().MaxTime - - return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") + return errors.Wrap(db.head.Truncate(toLoad[len(toLoad)-1].Meta().MaxTime), "head truncate failed") } func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { @@ -1019,7 +1019,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po for _, bDir := range bDirs { meta, _, err := readMetaFile(bDir) if err != nil { - level.Error(l).Log("msg", "failed to read meta.json for a block during reload; skipping", "dir", bDir, "err", err) + level.Error(l).Log("msg", "Failed to read meta.json for a block during reload. Skipping", "dir", bDir, "err", err) continue } @@ -1124,7 +1124,7 @@ func BeyondSizeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struc return deletable } -// deleteBlocks closes and deletes blocks from the disk. +// deleteBlocks closes the block if loaded and deletes blocks from the disk if exists. // When the map contains a non nil block object it means it is loaded in memory // so needs to be closed first as it might need to wait for pending readers to complete. func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { @@ -1135,14 +1135,23 @@ func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { } } - // Replace atomically to avoid partial block when process is crashing during this moment. + toDelete := filepath.Join(db.dir, ulid.String()) + if _, err := os.Stat(toDelete); os.IsNotExist(err) { + // Noop. + continue + } else if err != nil { + return errors.Wrapf(err, "stat dir %v", toDelete) + } + + // Replace atomically to avoid partial block when process would crash during deletion. tmpToDelete := filepath.Join(db.dir, fmt.Sprintf("%s%s", ulid, tmpForDeletionBlockDirSuffix)) - if err := fileutil.Replace(filepath.Join(db.dir, ulid.String()), tmpToDelete); err != nil { + if err := fileutil.Replace(toDelete, tmpToDelete); err != nil { return errors.Wrapf(err, "replace of obsolete block for deletion %s", ulid) } if err := os.RemoveAll(tmpToDelete); err != nil { return errors.Wrapf(err, "delete obsolete block %s", ulid) } + level.Info(db.logger).Log("msg", "Deleting obsolete block", "block", ulid) } return nil diff --git a/tsdb/db_test.go b/tsdb/db_test.go index e400974aa..64fb80e75 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -2761,6 +2761,29 @@ func TestOpen_VariousBlockStates(t *testing.T) { testutil.Ok(t, fileutil.Replace(dir, dir+tmpForDeletionBlockDirSuffix)) expectedRemovedDirs[dir+tmpForDeletionBlockDirSuffix] = struct{}{} } + { + // One ok block; but two should be replaced. + dir := createBlock(t, tmpDir, genSeries(10, 2, 50, 60)) + expectedLoadedDirs[dir] = struct{}{} + + m, _, err := readMetaFile(dir) + testutil.Ok(t, err) + + compacted := createBlock(t, tmpDir, genSeries(10, 2, 50, 55)) + expectedRemovedDirs[compacted] = struct{}{} + + m.Compaction.Parents = append(m.Compaction.Parents, + BlockDesc{ULID: ulid.MustParse(filepath.Base(compacted))}, + BlockDesc{ULID: ulid.MustNew(1, nil)}, + BlockDesc{ULID: ulid.MustNew(123, nil)}, + ) + + // Regression test: Already removed parent can be still in list, which was causing Open errors. + m.Compaction.Parents = append(m.Compaction.Parents, BlockDesc{ULID: ulid.MustParse(filepath.Base(compacted))}) + m.Compaction.Parents = append(m.Compaction.Parents, BlockDesc{ULID: ulid.MustParse(filepath.Base(compacted))}) + _, err = writeMetaFile(log.NewLogfmtLogger(os.Stderr), dir, m) + testutil.Ok(t, err) + } opts := DefaultOptions() opts.RetentionDuration = 0