diff --git a/compact_test.go b/compact_test.go index 9a45229f2..b496e8eac 100644 --- a/compact_test.go +++ b/compact_test.go @@ -17,12 +17,14 @@ import ( "io/ioutil" "math" "os" + "path" "path/filepath" "testing" "time" "github.com/go-kit/kit/log" "github.com/pkg/errors" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" @@ -741,3 +743,77 @@ func TestDisableAutoCompactions(t *testing.T) { } testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.") } + +// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload imidiately after a compaction +// deletes the resulting block to avoid creatings blocks with the same time range. +func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { + + tests := map[string]func(*DB) int{ + "Test Head Compaction": func(db *DB) int { + rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1 + defaultLabel := labels.FromStrings("foo", "bar") + + // Add some data to the head that is enough to trigger a compaction. + app := db.Appender() + _, err := app.Add(defaultLabel, 1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 2, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + return 1 + }, + "Test Block Compaction": func(db *DB) int { + expBlocks := []*BlockMeta{ + {MinTime: 0, MaxTime: 100}, + {MinTime: 100, MaxTime: 150}, + {MinTime: 150, MaxTime: 200}, + } + for _, m := range expBlocks { + createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) + } + testutil.Ok(t, db.reload()) + testutil.Equals(t, len(expBlocks), len(db.Blocks()), "unexpected block count after a reload") + + return len(expBlocks) + 1 + }, + } + + for title, bootStrap := range tests { + t.Run(title, func(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{1, 100}, + }) + defer close() + defer db.Close() + db.DisableCompactions() + + expBlocks := bootStrap(db) + + // Create a block that will trigger the reloard to fail. + blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300)) + lastBlockIndex := path.Join(blockPath, indexFilename) + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, expBlocks, len(actBlocks)) + testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file. + + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reload' count metrics mismatch") + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch") + + // Do the compaction and check the metrics. + // Since the most recent block is not included in the compaction, + // the compaction should succeed, but the reload should fail and + // the new block created from the compaction should be deleted. + db.EnableCompactions() + testutil.NotOk(t, db.compact()) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reload' count metrics mismatch") + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch") + actBlocks, err = blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, expBlocks, len(actBlocks)) + }) + } +} diff --git a/db.go b/db.go index bd3388b20..8b2e795ca 100644 --- a/db.go +++ b/db.go @@ -425,6 +425,9 @@ func (db *DB) compact() (err error) { runtime.GC() if err := db.reload(); err != nil { + if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { + return errors.Wrapf(err, "delete persisted head block after unsuccessful db reload:%s", uid) + } return errors.Wrap(err, "reload blocks") } if (uid == ulid.ULID{}) { @@ -454,12 +457,16 @@ func (db *DB) compact() (err error) { default: } - if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil { + uid, err := db.compactor.Compact(db.dir, plan, db.blocks) + if err != nil { return errors.Wrapf(err, "compact %s", plan) } runtime.GC() if err := db.reload(); err != nil { + if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { + return errors.Wrapf(err, "delete compacted block after unsuccessful db reload:%s", uid) + } return errors.Wrap(err, "reload blocks") } runtime.GC() @@ -505,7 +512,7 @@ func (db *DB) reload() (err error) { } } if len(corrupted) > 0 { - return errors.Wrap(err, "unexpected corrupted block") + return fmt.Errorf("unexpected corrupted block:%v", corrupted) } // All deletable blocks should not be loaded.