From 528439aa933c599e7329f4126f87513bd1833d43 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 30 May 2018 22:09:30 -0400 Subject: [PATCH 1/3] Cleanup new blocks on 'CleanTombstones' faliure. Signed-off-by: Ganesh Vernekar --- block.go | 15 +++++++------- db.go | 33 ++++++++++++++++++++++-------- testdata/repair_index_version/lock | 0 3 files changed, 32 insertions(+), 16 deletions(-) create mode 100644 testdata/repair_index_version/lock diff --git a/block.go b/block.go index e3760df76a..9ae2adbde0 100644 --- a/block.go +++ b/block.go @@ -468,9 +468,9 @@ Outer: return writeMetaFile(pb.dir, &pb.meta) } -// CleanTombstones will rewrite the block if there any tombstones to remove them -// and returns if there was a re-write. -func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) { +// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). +// If there was a rewrite, then it returns the ULID of the new block written, else nil. +func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) { numStones := 0 pb.tombstones.Iter(func(id uint64, ivs Intervals) error { @@ -480,14 +480,15 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) { }) if numStones == 0 { - return false, nil + return nil, nil } - if _, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil { - return false, err + uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime) + if err != nil { + return nil, err } - return true, nil + return &uid, nil } // Snapshot creates snapshot of the block into dir. diff --git a/db.go b/db.go index db9564375a..de753c25be 100644 --- a/db.go +++ b/db.go @@ -835,34 +835,49 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { } // CleanTombstones re-writes any blocks with tombstones. -func (db *DB) CleanTombstones() error { +func (db *DB) CleanTombstones() (err error) { db.cmtx.Lock() defer db.cmtx.Unlock() start := time.Now() defer db.metrics.tombCleanTimer.Observe(time.Since(start).Seconds()) + newUIDs := []ulid.ULID{} + defer func() { + // If any error is caused, we need to delete all the new directory created. + if err != nil { + for _, uid := range newUIDs { + dir := filepath.Join(db.Dir(), uid.String()) + if err := os.RemoveAll(dir); err != nil { + level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err) + } + } + } + }() + db.mtx.RLock() blocks := db.blocks[:] db.mtx.RUnlock() - deleted := []string{} + deletable := []string{} for _, b := range blocks { - ok, err := b.CleanTombstones(db.Dir(), db.compactor) - if err != nil { - return errors.Wrapf(err, "clean tombstones: %s", b.Dir()) + uid, er := b.CleanTombstones(db.Dir(), db.compactor) + if er != nil { + err = errors.Wrapf(er, "clean tombstones: %s", b.Dir()) + return err } - if ok { - deleted = append(deleted, b.Dir()) + if uid != nil { // New block was created. + deletable = append(deletable, b.Dir()) + newUIDs = append(newUIDs, *uid) } } - if len(deleted) == 0 { + if len(deletable) == 0 { return nil } - return errors.Wrap(db.reload(deleted...), "reload blocks") + return errors.Wrap(db.reload(deletable...), "reload blocks") } func intervalOverlap(amin, amax, bmin, bmax int64) bool { diff --git a/testdata/repair_index_version/lock b/testdata/repair_index_version/lock new file mode 100644 index 0000000000..e69de29bb2 From f31a0d64577655e6267e24693fefe9279d4b0530 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 4 Jun 2018 13:35:36 +0100 Subject: [PATCH 2/3] add Test for Tombstone Cleaning after a failure Signed-off-by: Krasi Georgiev --- db_test.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/db_test.go b/db_test.go index 543ff9c76f..f5364836d2 100644 --- a/db_test.go +++ b/db_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "fmt" "io/ioutil" "math" "math/rand" @@ -21,6 +22,7 @@ import ( "path/filepath" "sort" "testing" + "time" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -781,6 +783,71 @@ func TestTombstoneClean(t *testing.T) { } } +func TestTombstoneCleanFail(t *testing.T) { + + db, close := openTestDB(t, nil) + defer close() + + var expectedBlockDirs []string + + // Create 2 empty blocks with some fake tombstones to have something to work with. + for i := 0; i < 2; i++ { + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + meta := &BlockMeta{ + Version: 2, + ULID: uid, + } + blockDir := filepath.Join(db.Dir(), uid.String()) + block := createEmptyBlock(t, blockDir, meta) + tomb := memTombstones{} + tomb[0] = Intervals{{0, 1}} + block.tombstones = tomb + + db.blocks = append(db.blocks, block) + expectedBlockDirs = append(expectedBlockDirs, blockDir) + } + db.compactor = &mockCompactorFailing{t: t} + + // This should fail as we are using the failing compactor! + testutil.NotOk(t, db.CleanTombstones()) + + actualBlockDirs, err := blockDirs(db.dir) + testutil.Ok(t, err) + testutil.Equals(t, expectedBlockDirs, actualBlockDirs) +} + +type mockCompactorFailing struct { + t *testing.T + blocks []*Block +} + +func (*mockCompactorFailing) Plan(dir string) ([]string, error) { + + return nil, nil +} +func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { + if len(c.blocks) > 0 { + return ulid.ULID{}, fmt.Errorf("the compactor already did one block so forcefuly failing") + } + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + meta := &BlockMeta{ + Version: 2, + ULID: uid, + } + + fmt.Println(dest) + block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta) + c.blocks = append(c.blocks, block) + return block.Meta().ULID, nil +} + +func (*mockCompactorFailing) Compact(dest string, dirs ...string) (ulid.ULID, error) { + return ulid.ULID{}, nil + +} + func TestDB_Retention(t *testing.T) { db, close := openTestDB(t, nil) defer close() From 6094f35aa29ccf8e6ab3bf3a8c84c68b71c1fd15 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 4 Jun 2018 20:18:44 +0100 Subject: [PATCH 3/3] simplify if-else,test before the tombstone failure, more comments Signed-off-by: Krasi Georgiev --- db.go | 7 ++----- db_test.go | 44 ++++++++++++++++++++++++++++++++++++-------- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/db.go b/db.go index de753c25be..28cb14f331 100644 --- a/db.go +++ b/db.go @@ -861,13 +861,10 @@ func (db *DB) CleanTombstones() (err error) { deletable := []string{} for _, b := range blocks { - uid, er := b.CleanTombstones(db.Dir(), db.compactor) - if er != nil { + if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil { err = errors.Wrapf(er, "clean tombstones: %s", b.Dir()) return err - } - - if uid != nil { // New block was created. + } else if uid != nil { // New block was created. deletable = append(deletable, b.Dir()) newUIDs = append(newUIDs, *uid) } diff --git a/db_test.go b/db_test.go index f5364836d2..f0f3142c37 100644 --- a/db_test.go +++ b/db_test.go @@ -783,6 +783,9 @@ func TestTombstoneClean(t *testing.T) { } } +// TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind. +// When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so +// if TombstoneClean leaves any blocks behind these will overlap. func TestTombstoneCleanFail(t *testing.T) { db, close := openTestDB(t, nil) @@ -790,8 +793,10 @@ func TestTombstoneCleanFail(t *testing.T) { var expectedBlockDirs []string - // Create 2 empty blocks with some fake tombstones to have something to work with. - for i := 0; i < 2; i++ { + // Create some empty blocks pending for compaction. + // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. + totalBlocks := 2 + for i := 0; i < totalBlocks; i++ { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) meta := &BlockMeta{ @@ -800,6 +805,8 @@ func TestTombstoneCleanFail(t *testing.T) { } blockDir := filepath.Join(db.Dir(), uid.String()) block := createEmptyBlock(t, blockDir, meta) + + // Add some some fake tombstones to trigger the compaction. tomb := memTombstones{} tomb[0] = Intervals{{0, 1}} block.tombstones = tomb @@ -807,29 +814,39 @@ func TestTombstoneCleanFail(t *testing.T) { db.blocks = append(db.blocks, block) expectedBlockDirs = append(expectedBlockDirs, blockDir) } - db.compactor = &mockCompactorFailing{t: t} - // This should fail as we are using the failing compactor! + // Initialize the mockCompactorFailing with a room for a single compaction iteration. + // mockCompactorFailing will fail on the second iteration so we can check if the cleanup works as expected. + db.compactor = &mockCompactorFailing{ + t: t, + blocks: db.blocks, + max: totalBlocks + 1, + } + + // The compactor should trigger a failure here. testutil.NotOk(t, db.CleanTombstones()) + // Now check that the CleanTombstones didn't leave any blocks behind after a failure. actualBlockDirs, err := blockDirs(db.dir) testutil.Ok(t, err) testutil.Equals(t, expectedBlockDirs, actualBlockDirs) } +// mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total. type mockCompactorFailing struct { t *testing.T blocks []*Block + max int } func (*mockCompactorFailing) Plan(dir string) ([]string, error) { - return nil, nil } func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { - if len(c.blocks) > 0 { - return ulid.ULID{}, fmt.Errorf("the compactor already did one block so forcefuly failing") + if len(c.blocks) >= c.max { + return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) meta := &BlockMeta{ @@ -837,9 +854,20 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 ULID: uid, } - fmt.Println(dest) block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta) c.blocks = append(c.blocks, block) + + // Now check that all expected blocks are actually persisted on disk. + // This way we make sure that the we have some blocks that are supposed to be removed. + var expectedBlocks []string + for _, b := range c.blocks { + expectedBlocks = append(expectedBlocks, filepath.Join(dest, b.Meta().ULID.String())) + } + actualBlockDirs, err := blockDirs(dest) + testutil.Ok(c.t, err) + + testutil.Equals(c.t, expectedBlocks, actualBlockDirs) + return block.Meta().ULID, nil }