Fix directory cleanup in case of compaction failure.

This commit is contained in:
Peter Štibraný 2021-09-27 17:05:14 +02:00
parent ffd281ab9d
commit 861f9083d8
2 changed files with 31 additions and 8 deletions

View file

@ -602,13 +602,21 @@ func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks .
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(closers)).Err()
for _, ob := range outBlocks {
if ob.tmpDir == "" {
continue
if ob.tmpDir != "" {
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
if removeErr := os.RemoveAll(ob.tmpDir); removeErr != nil {
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "dir", ob.tmpDir, "err", removeErr.Error())
}
}
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
if err := os.RemoveAll(ob.tmpDir); err != nil {
level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
// If there was any error, and we have multiple output blocks, some blocks may have been generated, or at
// least have existing blockDir. In such case, we want to remove them.
// BlockDir may also not be set yet, if preparation for some previous blocks have failed.
if err != nil && ob.blockDir != "" {
// RemoveAll returns no error when tmp doesn't exist so it is safe to always run it.
if removeErr := os.RemoveAll(ob.blockDir); removeErr != nil {
level.Error(c.logger).Log("msg", "removed block folder after failed compaction", "dir", ob.blockDir, "err", removeErr.Error())
}
}
}
c.metrics.ran.Inc()

View file

@ -443,9 +443,24 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
require.NoError(t, os.RemoveAll(tmpdir))
}()
require.Error(t, compactor.write(tmpdir, []shardedBlock{{meta: &BlockMeta{}}}, erringBReader{}))
_, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + tmpForCreationBlockDirSuffix)
require.True(t, os.IsNotExist(err), "directory is not cleaned up")
shardedBlocks := []shardedBlock{
{meta: &BlockMeta{ULID: ulid.MustNew(ulid.Now(), rand.Reader)}},
{meta: &BlockMeta{ULID: ulid.MustNew(ulid.Now(), rand.Reader)}},
{meta: &BlockMeta{ULID: ulid.MustNew(ulid.Now(), rand.Reader)}},
}
require.Error(t, compactor.write(tmpdir, shardedBlocks, erringBReader{}))
// We rely on the fact that blockDir and tmpDir will be updated by compactor.write.
for _, b := range shardedBlocks {
require.NotEmpty(t, b.tmpDir)
_, err = os.Stat(b.tmpDir)
require.True(t, os.IsNotExist(err), "tmp directory is not cleaned up")
require.NotEmpty(t, b.blockDir)
_, err = os.Stat(b.blockDir)
require.True(t, os.IsNotExist(err), "block directory is not cleaned up")
}
}
func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta {