mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-02 08:31:11 -08:00
simplify if-else,test before the tombstone failure, more comments
Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
parent
f31a0d6457
commit
6094f35aa2
7
db.go
7
db.go
|
@ -861,13 +861,10 @@ func (db *DB) CleanTombstones() (err error) {
|
||||||
|
|
||||||
deletable := []string{}
|
deletable := []string{}
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
uid, er := b.CleanTombstones(db.Dir(), db.compactor)
|
if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil {
|
||||||
if er != nil {
|
|
||||||
err = errors.Wrapf(er, "clean tombstones: %s", b.Dir())
|
err = errors.Wrapf(er, "clean tombstones: %s", b.Dir())
|
||||||
return err
|
return err
|
||||||
}
|
} else if uid != nil { // New block was created.
|
||||||
|
|
||||||
if uid != nil { // New block was created.
|
|
||||||
deletable = append(deletable, b.Dir())
|
deletable = append(deletable, b.Dir())
|
||||||
newUIDs = append(newUIDs, *uid)
|
newUIDs = append(newUIDs, *uid)
|
||||||
}
|
}
|
||||||
|
|
44
db_test.go
44
db_test.go
|
@ -783,6 +783,9 @@ func TestTombstoneClean(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind.
|
||||||
|
// When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so
|
||||||
|
// if TombstoneClean leaves any blocks behind these will overlap.
|
||||||
func TestTombstoneCleanFail(t *testing.T) {
|
func TestTombstoneCleanFail(t *testing.T) {
|
||||||
|
|
||||||
db, close := openTestDB(t, nil)
|
db, close := openTestDB(t, nil)
|
||||||
|
@ -790,8 +793,10 @@ func TestTombstoneCleanFail(t *testing.T) {
|
||||||
|
|
||||||
var expectedBlockDirs []string
|
var expectedBlockDirs []string
|
||||||
|
|
||||||
// Create 2 empty blocks with some fake tombstones to have something to work with.
|
// Create some empty blocks pending for compaction.
|
||||||
for i := 0; i < 2; i++ {
|
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
|
||||||
|
totalBlocks := 2
|
||||||
|
for i := 0; i < totalBlocks; i++ {
|
||||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
uid := ulid.MustNew(ulid.Now(), entropy)
|
uid := ulid.MustNew(ulid.Now(), entropy)
|
||||||
meta := &BlockMeta{
|
meta := &BlockMeta{
|
||||||
|
@ -800,6 +805,8 @@ func TestTombstoneCleanFail(t *testing.T) {
|
||||||
}
|
}
|
||||||
blockDir := filepath.Join(db.Dir(), uid.String())
|
blockDir := filepath.Join(db.Dir(), uid.String())
|
||||||
block := createEmptyBlock(t, blockDir, meta)
|
block := createEmptyBlock(t, blockDir, meta)
|
||||||
|
|
||||||
|
// Add some some fake tombstones to trigger the compaction.
|
||||||
tomb := memTombstones{}
|
tomb := memTombstones{}
|
||||||
tomb[0] = Intervals{{0, 1}}
|
tomb[0] = Intervals{{0, 1}}
|
||||||
block.tombstones = tomb
|
block.tombstones = tomb
|
||||||
|
@ -807,29 +814,39 @@ func TestTombstoneCleanFail(t *testing.T) {
|
||||||
db.blocks = append(db.blocks, block)
|
db.blocks = append(db.blocks, block)
|
||||||
expectedBlockDirs = append(expectedBlockDirs, blockDir)
|
expectedBlockDirs = append(expectedBlockDirs, blockDir)
|
||||||
}
|
}
|
||||||
db.compactor = &mockCompactorFailing{t: t}
|
|
||||||
|
|
||||||
// This should fail as we are using the failing compactor!
|
// Initialize the mockCompactorFailing with a room for a single compaction iteration.
|
||||||
|
// mockCompactorFailing will fail on the second iteration so we can check if the cleanup works as expected.
|
||||||
|
db.compactor = &mockCompactorFailing{
|
||||||
|
t: t,
|
||||||
|
blocks: db.blocks,
|
||||||
|
max: totalBlocks + 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
// The compactor should trigger a failure here.
|
||||||
testutil.NotOk(t, db.CleanTombstones())
|
testutil.NotOk(t, db.CleanTombstones())
|
||||||
|
|
||||||
|
// Now check that the CleanTombstones didn't leave any blocks behind after a failure.
|
||||||
actualBlockDirs, err := blockDirs(db.dir)
|
actualBlockDirs, err := blockDirs(db.dir)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
testutil.Equals(t, expectedBlockDirs, actualBlockDirs)
|
testutil.Equals(t, expectedBlockDirs, actualBlockDirs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total.
|
||||||
type mockCompactorFailing struct {
|
type mockCompactorFailing struct {
|
||||||
t *testing.T
|
t *testing.T
|
||||||
blocks []*Block
|
blocks []*Block
|
||||||
|
max int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*mockCompactorFailing) Plan(dir string) ([]string, error) {
|
func (*mockCompactorFailing) Plan(dir string) ([]string, error) {
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) {
|
func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) {
|
||||||
if len(c.blocks) > 0 {
|
if len(c.blocks) >= c.max {
|
||||||
return ulid.ULID{}, fmt.Errorf("the compactor already did one block so forcefuly failing")
|
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
|
||||||
}
|
}
|
||||||
|
|
||||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
uid := ulid.MustNew(ulid.Now(), entropy)
|
uid := ulid.MustNew(ulid.Now(), entropy)
|
||||||
meta := &BlockMeta{
|
meta := &BlockMeta{
|
||||||
|
@ -837,9 +854,20 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6
|
||||||
ULID: uid,
|
ULID: uid,
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println(dest)
|
|
||||||
block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta)
|
block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta)
|
||||||
c.blocks = append(c.blocks, block)
|
c.blocks = append(c.blocks, block)
|
||||||
|
|
||||||
|
// Now check that all expected blocks are actually persisted on disk.
|
||||||
|
// This way we make sure that the we have some blocks that are supposed to be removed.
|
||||||
|
var expectedBlocks []string
|
||||||
|
for _, b := range c.blocks {
|
||||||
|
expectedBlocks = append(expectedBlocks, filepath.Join(dest, b.Meta().ULID.String()))
|
||||||
|
}
|
||||||
|
actualBlockDirs, err := blockDirs(dest)
|
||||||
|
testutil.Ok(c.t, err)
|
||||||
|
|
||||||
|
testutil.Equals(c.t, expectedBlocks, actualBlockDirs)
|
||||||
|
|
||||||
return block.Meta().ULID, nil
|
return block.Meta().ULID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue