From ae3d392aa9c3a5c5f92f8116738c5b32c98b09a7 Mon Sep 17 00:00:00 2001 From: ArthurSens Date: Sun, 17 Jan 2021 21:49:47 +0000 Subject: [PATCH] Protect DB against parallel reloads Signed-off-by: ArthurSens --- tsdb/db.go | 14 ++++++++++--- tsdb/db_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 4 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 8a2e8df712..a56ca7360c 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -979,6 +979,12 @@ func (db *DB) reloadBlocks() (err error) { db.metrics.reloads.Inc() }() + // Now that we reload TSDB every minute, there is high chance for race condition with a reload + // triggered by CleanTombstones(). We need to lock the reload to avoid the situation where + // a normal reload and CleanTombstones try to delete the same block. + db.mtx.Lock() + defer db.mtx.Unlock() + loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool) if err != nil { return err @@ -1044,10 +1050,8 @@ func (db *DB) reloadBlocks() (err error) { } // Swap new blocks first for subsequently created readers to be seen. - db.mtx.Lock() oldBlocks := db.blocks db.blocks = toLoad - db.mtx.Unlock() blockMetas := make([]BlockMeta, 0, len(toLoad)) for _, b := range toLoad { @@ -1554,7 +1558,11 @@ func (db *DB) CleanTombstones() (err error) { // In case tombstones of the old block covers the whole block, // then there would be no resultant block to tell the parent. + // The lock protects against race conditions when deleting blocks + // during an already running reload. + db.mtx.Lock() pb.meta.Compaction.Deletable = safeToDelete + db.mtx.Unlock() cleanUpCompleted = false if err = db.reloadBlocks(); err == nil { // Will try to delete old block. // Successful reload will change the existing blocks. @@ -1563,7 +1571,7 @@ func (db *DB) CleanTombstones() (err error) { } // Delete new block if it was created. - if uid != nil { + if uid != nil && *uid != (ulid.ULID{}) { dir := filepath.Join(db.Dir(), uid.String()) if err := os.RemoveAll(dir); err != nil { level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 607684cb8f..60274fa4f7 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1185,7 +1185,7 @@ func TestTombstoneCleanFail(t *testing.T) { // The compactor should trigger a failure here. require.Error(t, db.CleanTombstones()) - // Now check that the CleanTombstones replaced the old block even after a failure. + // Now check that the cleanTombstones replaced the old block even after a failure. actualBlockDirs, err := blockDirs(db.dir) require.NoError(t, err) // Only one block should have been replaced by a new block. @@ -1193,6 +1193,56 @@ func TestTombstoneCleanFail(t *testing.T) { require.Equal(t, len(intersection(oldBlockDirs, actualBlockDirs)), len(actualBlockDirs)-1) } +// TestTombstoneCleanRetentionLimitsRace tests that a CleanTombstones operation +// and retention limit policies, when triggered at the same time, +// won't race against each other. +func TestTombstoneCleanRetentionLimitsRace(t *testing.T) { + opts := DefaultOptions() + var wg sync.WaitGroup + + // We want to make sure that a race doesn't happen when a normal reload and a CleanTombstones() + // reload try to delete the same block. Without the correct lock placement, it can happen if a + // block is marked for deletion due to retention limits and also has tombstones to be cleaned at + // the same time. + // + // That is something tricky to trigger, so let's try several times just to make sure. + for i := 0; i < 20; i++ { + db := openTestDB(t, opts, nil) + totalBlocks := 20 + dbDir := db.Dir() + // Generate some blocks with old mint (near epoch). + for j := 0; j < totalBlocks; j++ { + blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1)) + block, err := OpenBlock(nil, blockDir, nil) + require.NoError(t, err) + // Cover block with tombstones so it can be deleted with CleanTombstones() as well. + tomb := tombstones.NewMemTombstones() + tomb.AddInterval(0, tombstones.Interval{Mint: int64(j), Maxt: int64(j) + 1}) + block.tombstones = tomb + + db.blocks = append(db.blocks, block) + } + + wg.Add(2) + // Run reload and cleanTombstones together, with a small time window randomization + go func() { + defer wg.Done() + time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond))) + require.NoError(t, db.reloadBlocks()) + }() + go func() { + defer wg.Done() + time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond))) + require.NoError(t, db.CleanTombstones()) + }() + + wg.Wait() + + require.NoError(t, db.Close()) + } + +} + func intersection(oldBlocks, actualBlocks []string) (intersection []string) { hash := make(map[string]bool) for _, e := range oldBlocks {