Protect DB against parallel reloads

Signed-off-by: ArthurSens <arthursens2005@gmail.com>
This commit is contained in:
ArthurSens 2021-01-17 21:49:47 +00:00
parent 9e0351e161
commit ae3d392aa9
2 changed files with 62 additions and 4 deletions

View file

@ -979,6 +979,12 @@ func (db *DB) reloadBlocks() (err error) {
db.metrics.reloads.Inc()
}()
// Now that we reload TSDB every minute, there is high chance for race condition with a reload
// triggered by CleanTombstones(). We need to lock the reload to avoid the situation where
// a normal reload and CleanTombstones try to delete the same block.
db.mtx.Lock()
defer db.mtx.Unlock()
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool)
if err != nil {
return err
@ -1044,10 +1050,8 @@ func (db *DB) reloadBlocks() (err error) {
}
// Swap new blocks first for subsequently created readers to be seen.
db.mtx.Lock()
oldBlocks := db.blocks
db.blocks = toLoad
db.mtx.Unlock()
blockMetas := make([]BlockMeta, 0, len(toLoad))
for _, b := range toLoad {
@ -1554,7 +1558,11 @@ func (db *DB) CleanTombstones() (err error) {
// In case tombstones of the old block covers the whole block,
// then there would be no resultant block to tell the parent.
// The lock protects against race conditions when deleting blocks
// during an already running reload.
db.mtx.Lock()
pb.meta.Compaction.Deletable = safeToDelete
db.mtx.Unlock()
cleanUpCompleted = false
if err = db.reloadBlocks(); err == nil { // Will try to delete old block.
// Successful reload will change the existing blocks.
@ -1563,7 +1571,7 @@ func (db *DB) CleanTombstones() (err error) {
}
// Delete new block if it was created.
if uid != nil {
if uid != nil && *uid != (ulid.ULID{}) {
dir := filepath.Join(db.Dir(), uid.String())
if err := os.RemoveAll(dir); err != nil {
level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err)

View file

@ -1185,7 +1185,7 @@ func TestTombstoneCleanFail(t *testing.T) {
// The compactor should trigger a failure here.
require.Error(t, db.CleanTombstones())
// Now check that the CleanTombstones replaced the old block even after a failure.
// Now check that the cleanTombstones replaced the old block even after a failure.
actualBlockDirs, err := blockDirs(db.dir)
require.NoError(t, err)
// Only one block should have been replaced by a new block.
@ -1193,6 +1193,56 @@ func TestTombstoneCleanFail(t *testing.T) {
require.Equal(t, len(intersection(oldBlockDirs, actualBlockDirs)), len(actualBlockDirs)-1)
}
// TestTombstoneCleanRetentionLimitsRace tests that a CleanTombstones operation
// and retention limit policies, when triggered at the same time,
// won't race against each other.
func TestTombstoneCleanRetentionLimitsRace(t *testing.T) {
opts := DefaultOptions()
var wg sync.WaitGroup
// We want to make sure that a race doesn't happen when a normal reload and a CleanTombstones()
// reload try to delete the same block. Without the correct lock placement, it can happen if a
// block is marked for deletion due to retention limits and also has tombstones to be cleaned at
// the same time.
//
// That is something tricky to trigger, so let's try several times just to make sure.
for i := 0; i < 20; i++ {
db := openTestDB(t, opts, nil)
totalBlocks := 20
dbDir := db.Dir()
// Generate some blocks with old mint (near epoch).
for j := 0; j < totalBlocks; j++ {
blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1))
block, err := OpenBlock(nil, blockDir, nil)
require.NoError(t, err)
// Cover block with tombstones so it can be deleted with CleanTombstones() as well.
tomb := tombstones.NewMemTombstones()
tomb.AddInterval(0, tombstones.Interval{Mint: int64(j), Maxt: int64(j) + 1})
block.tombstones = tomb
db.blocks = append(db.blocks, block)
}
wg.Add(2)
// Run reload and cleanTombstones together, with a small time window randomization
go func() {
defer wg.Done()
time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond)))
require.NoError(t, db.reloadBlocks())
}()
go func() {
defer wg.Done()
time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond)))
require.NoError(t, db.CleanTombstones())
}()
wg.Wait()
require.NoError(t, db.Close())
}
}
func intersection(oldBlocks, actualBlocks []string) (intersection []string) {
hash := make(map[string]bool)
for _, e := range oldBlocks {