From 6094f35aa29ccf8e6ab3bf3a8c84c68b71c1fd15 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 4 Jun 2018 20:18:44 +0100 Subject: [PATCH] simplify if-else,test before the tombstone failure, more comments Signed-off-by: Krasi Georgiev --- db.go | 7 ++----- db_test.go | 44 ++++++++++++++++++++++++++++++++++++-------- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/db.go b/db.go index de753c25b..28cb14f33 100644 --- a/db.go +++ b/db.go @@ -861,13 +861,10 @@ func (db *DB) CleanTombstones() (err error) { deletable := []string{} for _, b := range blocks { - uid, er := b.CleanTombstones(db.Dir(), db.compactor) - if er != nil { + if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil { err = errors.Wrapf(er, "clean tombstones: %s", b.Dir()) return err - } - - if uid != nil { // New block was created. + } else if uid != nil { // New block was created. deletable = append(deletable, b.Dir()) newUIDs = append(newUIDs, *uid) } diff --git a/db_test.go b/db_test.go index f5364836d..f0f3142c3 100644 --- a/db_test.go +++ b/db_test.go @@ -783,6 +783,9 @@ func TestTombstoneClean(t *testing.T) { } } +// TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind. +// When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so +// if TombstoneClean leaves any blocks behind these will overlap. func TestTombstoneCleanFail(t *testing.T) { db, close := openTestDB(t, nil) @@ -790,8 +793,10 @@ func TestTombstoneCleanFail(t *testing.T) { var expectedBlockDirs []string - // Create 2 empty blocks with some fake tombstones to have something to work with. - for i := 0; i < 2; i++ { + // Create some empty blocks pending for compaction. + // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. + totalBlocks := 2 + for i := 0; i < totalBlocks; i++ { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) meta := &BlockMeta{ @@ -800,6 +805,8 @@ func TestTombstoneCleanFail(t *testing.T) { } blockDir := filepath.Join(db.Dir(), uid.String()) block := createEmptyBlock(t, blockDir, meta) + + // Add some some fake tombstones to trigger the compaction. tomb := memTombstones{} tomb[0] = Intervals{{0, 1}} block.tombstones = tomb @@ -807,29 +814,39 @@ func TestTombstoneCleanFail(t *testing.T) { db.blocks = append(db.blocks, block) expectedBlockDirs = append(expectedBlockDirs, blockDir) } - db.compactor = &mockCompactorFailing{t: t} - // This should fail as we are using the failing compactor! + // Initialize the mockCompactorFailing with a room for a single compaction iteration. + // mockCompactorFailing will fail on the second iteration so we can check if the cleanup works as expected. + db.compactor = &mockCompactorFailing{ + t: t, + blocks: db.blocks, + max: totalBlocks + 1, + } + + // The compactor should trigger a failure here. testutil.NotOk(t, db.CleanTombstones()) + // Now check that the CleanTombstones didn't leave any blocks behind after a failure. actualBlockDirs, err := blockDirs(db.dir) testutil.Ok(t, err) testutil.Equals(t, expectedBlockDirs, actualBlockDirs) } +// mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total. type mockCompactorFailing struct { t *testing.T blocks []*Block + max int } func (*mockCompactorFailing) Plan(dir string) ([]string, error) { - return nil, nil } func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { - if len(c.blocks) > 0 { - return ulid.ULID{}, fmt.Errorf("the compactor already did one block so forcefuly failing") + if len(c.blocks) >= c.max { + return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) meta := &BlockMeta{ @@ -837,9 +854,20 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 ULID: uid, } - fmt.Println(dest) block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta) c.blocks = append(c.blocks, block) + + // Now check that all expected blocks are actually persisted on disk. + // This way we make sure that the we have some blocks that are supposed to be removed. + var expectedBlocks []string + for _, b := range c.blocks { + expectedBlocks = append(expectedBlocks, filepath.Join(dest, b.Meta().ULID.String())) + } + actualBlockDirs, err := blockDirs(dest) + testutil.Ok(c.t, err) + + testutil.Equals(c.t, expectedBlocks, actualBlockDirs) + return block.Meta().ULID, nil }