Merge pull request #1 from krasi-georgiev/pr/341

Add Test for Tombstone Cleaning after a failure
This commit is contained in:
Ganesh Vernekar 2018-06-04 15:36:44 -04:00
commit 0c93850cd5
3 changed files with 97 additions and 5 deletions

7
db.go
View file

@ -861,13 +861,10 @@ func (db *DB) CleanTombstones() (err error) {
deletable := []string{}
for _, b := range blocks {
uid, er := b.CleanTombstones(db.Dir(), db.compactor)
if er != nil {
if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil {
err = errors.Wrapf(er, "clean tombstones: %s", b.Dir())
return err
}
if uid != nil { // New block was created.
} else if uid != nil { // New block was created.
deletable = append(deletable, b.Dir())
newUIDs = append(newUIDs, *uid)
}

View file

@ -14,6 +14,7 @@
package tsdb
import (
"fmt"
"io/ioutil"
"math"
"math/rand"
@ -21,6 +22,7 @@ import (
"path/filepath"
"sort"
"testing"
"time"
"github.com/oklog/ulid"
"github.com/pkg/errors"
@ -781,6 +783,99 @@ func TestTombstoneClean(t *testing.T) {
}
}
// TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind.
// When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so
// if TombstoneClean leaves any blocks behind these will overlap.
func TestTombstoneCleanFail(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
var expectedBlockDirs []string
// Create some empty blocks pending for compaction.
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
totalBlocks := 2
for i := 0; i < totalBlocks; i++ {
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
meta := &BlockMeta{
Version: 2,
ULID: uid,
}
blockDir := filepath.Join(db.Dir(), uid.String())
block := createEmptyBlock(t, blockDir, meta)
// Add some some fake tombstones to trigger the compaction.
tomb := memTombstones{}
tomb[0] = Intervals{{0, 1}}
block.tombstones = tomb
db.blocks = append(db.blocks, block)
expectedBlockDirs = append(expectedBlockDirs, blockDir)
}
// Initialize the mockCompactorFailing with a room for a single compaction iteration.
// mockCompactorFailing will fail on the second iteration so we can check if the cleanup works as expected.
db.compactor = &mockCompactorFailing{
t: t,
blocks: db.blocks,
max: totalBlocks + 1,
}
// The compactor should trigger a failure here.
testutil.NotOk(t, db.CleanTombstones())
// Now check that the CleanTombstones didn't leave any blocks behind after a failure.
actualBlockDirs, err := blockDirs(db.dir)
testutil.Ok(t, err)
testutil.Equals(t, expectedBlockDirs, actualBlockDirs)
}
// mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total.
type mockCompactorFailing struct {
t *testing.T
blocks []*Block
max int
}
func (*mockCompactorFailing) Plan(dir string) ([]string, error) {
return nil, nil
}
func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) {
if len(c.blocks) >= c.max {
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
}
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
meta := &BlockMeta{
Version: 2,
ULID: uid,
}
block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta)
c.blocks = append(c.blocks, block)
// Now check that all expected blocks are actually persisted on disk.
// This way we make sure that the we have some blocks that are supposed to be removed.
var expectedBlocks []string
for _, b := range c.blocks {
expectedBlocks = append(expectedBlocks, filepath.Join(dest, b.Meta().ULID.String()))
}
actualBlockDirs, err := blockDirs(dest)
testutil.Ok(c.t, err)
testutil.Equals(c.t, expectedBlocks, actualBlockDirs)
return block.Meta().ULID, nil
}
func (*mockCompactorFailing) Compact(dest string, dirs ...string) (ulid.ULID, error) {
return ulid.ULID{}, nil
}
func TestDB_Retention(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()

View file