tsdb: Bug fix for further continued deletions after crash deletions; added more tests. (#7777)

* tsdb: Bug fix for further continued after crash deletions; added more tests.

Additionally: Added log line for block removal.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Addressed comment.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2020-08-11 15:53:23 +01:00 committed by GitHub
parent ce838ad6fc
commit f16cbc20d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 26 deletions

View file

@ -921,20 +921,23 @@ func (db *DB) reload() (err error) {
deletableULIDs := db.blocksToDelete(loadable) deletableULIDs := db.blocksToDelete(loadable)
deletable := make(map[ulid.ULID]*Block, len(deletableULIDs)) deletable := make(map[ulid.ULID]*Block, len(deletableULIDs))
// Corrupted blocks that have been superseded by a loadable block can be safely ignored. // Mark all parents of loaded blocks as deletable (no matter if they exists). This makes it resilient against the process
// This makes it resilient against the process crashing towards the end of a compaction. // crashing towards the end of a compaction but before deletions. By doing that, we can pick up the deletion where it left off during a crash.
// Creation of a new block and deletion of its parents cannot happen atomically.
// By creating blocks with their parents, we can pick up the deletion where it left off during a crash.
for _, block := range loadable { for _, block := range loadable {
if _, ok := deletableULIDs[block.meta.ULID]; ok { if _, ok := deletableULIDs[block.meta.ULID]; ok {
deletable[block.meta.ULID] = block deletable[block.meta.ULID] = block
} }
for _, b := range block.Meta().Compaction.Parents { for _, b := range block.Meta().Compaction.Parents {
if _, ok := corrupted[b.ULID]; ok {
delete(corrupted, b.ULID) delete(corrupted, b.ULID)
level.Warn(db.logger).Log("msg", "Found corrupted block, but replaced by compacted one so it's safe to delete. This should not happen with atomic deletes.", "block", b.ULID)
}
deletable[b.ULID] = nil deletable[b.ULID] = nil
} }
} }
if len(corrupted) > 0 { if len(corrupted) > 0 {
// Corrupted but no child loaded for it.
// Close all new blocks to release the lock for windows. // Close all new blocks to release the lock for windows.
for _, block := range loadable { for _, block := range loadable {
if _, open := getBlock(db.blocks, block.Meta().ULID); !open { if _, open := getBlock(db.blocks, block.Meta().ULID); !open {
@ -948,28 +951,28 @@ func (db *DB) reload() (err error) {
return merr.Err() return merr.Err()
} }
// All deletable blocks should not be loaded.
var ( var (
bb []*Block toLoad []*Block
blocksSize int64 blocksSize int64
) )
// All deletable blocks should be unloaded.
// NOTE: We need to loop through loadable one more time as there might be loadable ready to be removed (replaced by compacted block).
for _, block := range loadable { for _, block := range loadable {
if _, ok := deletable[block.Meta().ULID]; ok { if _, ok := deletable[block.Meta().ULID]; ok {
deletable[block.Meta().ULID] = block deletable[block.Meta().ULID] = block
continue continue
} }
bb = append(bb, block)
blocksSize += block.Size()
toLoad = append(toLoad, block)
blocksSize += block.Size()
} }
loadable = bb
db.metrics.blocksBytes.Set(float64(blocksSize)) db.metrics.blocksBytes.Set(float64(blocksSize))
sort.Slice(loadable, func(i, j int) bool { sort.Slice(toLoad, func(i, j int) bool {
return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime return toLoad[i].Meta().MinTime < toLoad[j].Meta().MinTime
}) })
if !db.opts.AllowOverlappingBlocks { if !db.opts.AllowOverlappingBlocks {
if err := validateBlockSequence(loadable); err != nil { if err := validateBlockSequence(toLoad); err != nil {
return errors.Wrap(err, "invalid block sequence") return errors.Wrap(err, "invalid block sequence")
} }
} }
@ -977,36 +980,33 @@ func (db *DB) reload() (err error) {
// Swap new blocks first for subsequently created readers to be seen. // Swap new blocks first for subsequently created readers to be seen.
db.mtx.Lock() db.mtx.Lock()
oldBlocks := db.blocks oldBlocks := db.blocks
db.blocks = loadable db.blocks = toLoad
db.mtx.Unlock() db.mtx.Unlock()
blockMetas := make([]BlockMeta, 0, len(loadable)) blockMetas := make([]BlockMeta, 0, len(toLoad))
for _, b := range loadable { for _, b := range toLoad {
blockMetas = append(blockMetas, b.Meta()) blockMetas = append(blockMetas, b.Meta())
} }
if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 {
level.Warn(db.logger).Log("msg", "Overlapping blocks found during reload", "detail", overlaps.String()) level.Warn(db.logger).Log("msg", "Overlapping blocks found during reload", "detail", overlaps.String())
} }
// Append blocks to old, deletable blocks, so we can close them.
for _, b := range oldBlocks { for _, b := range oldBlocks {
if _, ok := deletable[b.Meta().ULID]; ok { if _, ok := deletable[b.Meta().ULID]; ok {
deletable[b.Meta().ULID] = b deletable[b.Meta().ULID] = b
} }
} }
if err := db.deleteBlocks(deletable); err != nil { if err := db.deleteBlocks(deletable); err != nil {
return err return err
} }
// Garbage collect data in the head if the most recent persisted block // Garbage collect data in the head if the most recent persisted block
// covers data of its current time range. // covers data of its current time range.
if len(loadable) == 0 { if len(toLoad) == 0 {
return nil return nil
} }
return errors.Wrap(db.head.Truncate(toLoad[len(toLoad)-1].Meta().MaxTime), "head truncate failed")
maxt := loadable[len(loadable)-1].Meta().MaxTime
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
} }
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
@ -1019,7 +1019,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po
for _, bDir := range bDirs { for _, bDir := range bDirs {
meta, _, err := readMetaFile(bDir) meta, _, err := readMetaFile(bDir)
if err != nil { if err != nil {
level.Error(l).Log("msg", "failed to read meta.json for a block during reload; skipping", "dir", bDir, "err", err) level.Error(l).Log("msg", "Failed to read meta.json for a block during reload. Skipping", "dir", bDir, "err", err)
continue continue
} }
@ -1124,7 +1124,7 @@ func BeyondSizeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struc
return deletable return deletable
} }
// deleteBlocks closes and deletes blocks from the disk. // deleteBlocks closes the block if loaded and deletes blocks from the disk if exists.
// When the map contains a non nil block object it means it is loaded in memory // When the map contains a non nil block object it means it is loaded in memory
// so needs to be closed first as it might need to wait for pending readers to complete. // so needs to be closed first as it might need to wait for pending readers to complete.
func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error {
@ -1135,14 +1135,23 @@ func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error {
} }
} }
// Replace atomically to avoid partial block when process is crashing during this moment. toDelete := filepath.Join(db.dir, ulid.String())
if _, err := os.Stat(toDelete); os.IsNotExist(err) {
// Noop.
continue
} else if err != nil {
return errors.Wrapf(err, "stat dir %v", toDelete)
}
// Replace atomically to avoid partial block when process would crash during deletion.
tmpToDelete := filepath.Join(db.dir, fmt.Sprintf("%s%s", ulid, tmpForDeletionBlockDirSuffix)) tmpToDelete := filepath.Join(db.dir, fmt.Sprintf("%s%s", ulid, tmpForDeletionBlockDirSuffix))
if err := fileutil.Replace(filepath.Join(db.dir, ulid.String()), tmpToDelete); err != nil { if err := fileutil.Replace(toDelete, tmpToDelete); err != nil {
return errors.Wrapf(err, "replace of obsolete block for deletion %s", ulid) return errors.Wrapf(err, "replace of obsolete block for deletion %s", ulid)
} }
if err := os.RemoveAll(tmpToDelete); err != nil { if err := os.RemoveAll(tmpToDelete); err != nil {
return errors.Wrapf(err, "delete obsolete block %s", ulid) return errors.Wrapf(err, "delete obsolete block %s", ulid)
} }
level.Info(db.logger).Log("msg", "Deleting obsolete block", "block", ulid)
} }
return nil return nil

View file

@ -2761,6 +2761,29 @@ func TestOpen_VariousBlockStates(t *testing.T) {
testutil.Ok(t, fileutil.Replace(dir, dir+tmpForDeletionBlockDirSuffix)) testutil.Ok(t, fileutil.Replace(dir, dir+tmpForDeletionBlockDirSuffix))
expectedRemovedDirs[dir+tmpForDeletionBlockDirSuffix] = struct{}{} expectedRemovedDirs[dir+tmpForDeletionBlockDirSuffix] = struct{}{}
} }
{
// One ok block; but two should be replaced.
dir := createBlock(t, tmpDir, genSeries(10, 2, 50, 60))
expectedLoadedDirs[dir] = struct{}{}
m, _, err := readMetaFile(dir)
testutil.Ok(t, err)
compacted := createBlock(t, tmpDir, genSeries(10, 2, 50, 55))
expectedRemovedDirs[compacted] = struct{}{}
m.Compaction.Parents = append(m.Compaction.Parents,
BlockDesc{ULID: ulid.MustParse(filepath.Base(compacted))},
BlockDesc{ULID: ulid.MustNew(1, nil)},
BlockDesc{ULID: ulid.MustNew(123, nil)},
)
// Regression test: Already removed parent can be still in list, which was causing Open errors.
m.Compaction.Parents = append(m.Compaction.Parents, BlockDesc{ULID: ulid.MustParse(filepath.Base(compacted))})
m.Compaction.Parents = append(m.Compaction.Parents, BlockDesc{ULID: ulid.MustParse(filepath.Base(compacted))})
_, err = writeMetaFile(log.NewLogfmtLogger(os.Stderr), dir, m)
testutil.Ok(t, err)
}
opts := DefaultOptions() opts := DefaultOptions()
opts.RetentionDuration = 0 opts.RetentionDuration = 0