From 6a3d55db0a753386941b364740cd056b32f6617e Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Wed, 17 Feb 2021 02:32:43 -0300 Subject: [PATCH] Rolling tombstones clean up (#8007) * CleanupTombstones refactored, now reloading blocks after every compaction. The goal is to remove deletable blocks after every compaction and, thus, decrease disk space used when cleaning tombstones. Signed-off-by: arthursens * Protect DB against parallel reloads Signed-off-by: ArthurSens * Fix typos Signed-off-by: Ganesh Vernekar Co-authored-by: Ganesh Vernekar --- tsdb/block.go | 11 ++-- tsdb/db.go | 62 ++++++++++++++--------- tsdb/db_test.go | 132 ++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 168 insertions(+), 37 deletions(-) diff --git a/tsdb/block.go b/tsdb/block.go index 7ae8d5bbf1..47c842319f 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -569,7 +569,9 @@ Outer: // CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). // If there was a rewrite, then it returns the ULID of the new block written, else nil. -func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) { +// If the resultant block is empty (tombstones covered the whole block), then it deletes the new block and return nil UID. +// It returns a boolean indicating if the parent block can be deleted safely of not. +func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, bool, error) { numStones := 0 if err := pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error { @@ -580,15 +582,16 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) { panic(err) } if numStones == 0 { - return nil, nil + return nil, false, nil } meta := pb.Meta() uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta) if err != nil { - return nil, err + return nil, false, err } - return &uid, nil + + return &uid, true, nil } // Snapshot creates snapshot of the block into dir. diff --git a/tsdb/db.go b/tsdb/db.go index 5c95e02c02..a56ca7360c 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -979,6 +979,12 @@ func (db *DB) reloadBlocks() (err error) { db.metrics.reloads.Inc() }() + // Now that we reload TSDB every minute, there is high chance for race condition with a reload + // triggered by CleanTombstones(). We need to lock the reload to avoid the situation where + // a normal reload and CleanTombstones try to delete the same block. + db.mtx.Lock() + defer db.mtx.Unlock() + loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool) if err != nil { return err @@ -1044,10 +1050,8 @@ func (db *DB) reloadBlocks() (err error) { } // Swap new blocks first for subsequently created readers to be seen. - db.mtx.Lock() oldBlocks := db.blocks db.blocks = toLoad - db.mtx.Unlock() blockMetas := make([]BlockMeta, 0, len(toLoad)) for _, b := range toLoad { @@ -1537,34 +1541,44 @@ func (db *DB) CleanTombstones() (err error) { start := time.Now() defer db.metrics.tombCleanTimer.Observe(time.Since(start).Seconds()) - newUIDs := []ulid.ULID{} - defer func() { - // If any error is caused, we need to delete all the new directory created. - if err != nil { - for _, uid := range newUIDs { + cleanUpCompleted := false + // Repeat cleanup until there is no tombstones left. + for !cleanUpCompleted { + cleanUpCompleted = true + + for _, pb := range db.Blocks() { + uid, safeToDelete, cleanErr := pb.CleanTombstones(db.Dir(), db.compactor) + if cleanErr != nil { + return errors.Wrapf(cleanErr, "clean tombstones: %s", pb.Dir()) + } + if !safeToDelete { + // There was nothing to clean. + continue + } + + // In case tombstones of the old block covers the whole block, + // then there would be no resultant block to tell the parent. + // The lock protects against race conditions when deleting blocks + // during an already running reload. + db.mtx.Lock() + pb.meta.Compaction.Deletable = safeToDelete + db.mtx.Unlock() + cleanUpCompleted = false + if err = db.reloadBlocks(); err == nil { // Will try to delete old block. + // Successful reload will change the existing blocks. + // We need to loop over the new set of blocks. + break + } + + // Delete new block if it was created. + if uid != nil && *uid != (ulid.ULID{}) { dir := filepath.Join(db.Dir(), uid.String()) if err := os.RemoveAll(dir); err != nil { level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err) } } + return errors.Wrap(err, "reload blocks") } - }() - - db.mtx.RLock() - blocks := db.blocks[:] - db.mtx.RUnlock() - - for _, b := range blocks { - if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil { - err = errors.Wrapf(er, "clean tombstones: %s", b.Dir()) - return err - } else if uid != nil { // New block was created. - newUIDs = append(newUIDs, *uid) - } - } - - if err := db.reloadBlocks(); err != nil { - return errors.Wrap(err, "reload blocks") } return nil } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index e00f0a8f85..6f8fb89bb1 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1030,7 +1030,7 @@ func TestTombstoneClean(t *testing.T) { for _, c := range cases { // Delete the ranges. - // create snapshot + // Create snapshot. snap, err := ioutil.TempDir("", "snap") require.NoError(t, err) @@ -1040,7 +1040,7 @@ func TestTombstoneClean(t *testing.T) { require.NoError(t, db.Snapshot(snap, true)) require.NoError(t, db.Close()) - // reopen DB from snapshot + // Reopen DB from snapshot. db, err = Open(snap, nil, nil, nil) require.NoError(t, err) defer db.Close() @@ -1099,6 +1099,54 @@ func TestTombstoneClean(t *testing.T) { } } +// TestTombstoneCleanResultEmptyBlock tests that a TombstoneClean that results in empty blocks (no timeseries) +// will also delete the resultant block. +func TestTombstoneCleanResultEmptyBlock(t *testing.T) { + numSamples := int64(10) + + db := openTestDB(t, nil, nil) + + ctx := context.Background() + app := db.Appender(ctx) + + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i]) + } + + require.NoError(t, app.Commit()) + // Interval should cover the whole block. + intervals := tombstones.Intervals{{Mint: 0, Maxt: numSamples}} + + // Create snapshot. + snap, err := ioutil.TempDir("", "snap") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(snap)) + }() + require.NoError(t, db.Snapshot(snap, true)) + require.NoError(t, db.Close()) + + // Reopen DB from snapshot. + db, err = Open(snap, nil, nil, nil) + require.NoError(t, err) + defer db.Close() + + // Create tombstones by deleting all samples. + for _, r := range intervals { + require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) + } + + require.NoError(t, db.CleanTombstones()) + + // After cleaning tombstones that covers the entire block, no blocks should be left behind. + actualBlockDirs, err := blockDirs(db.dir) + require.NoError(t, err) + require.Equal(t, 0, len(actualBlockDirs)) +} + // TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind. // When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so // if TombstoneClean leaves any blocks behind these will overlap. @@ -1108,22 +1156,22 @@ func TestTombstoneCleanFail(t *testing.T) { require.NoError(t, db.Close()) }() - var expectedBlockDirs []string + var oldBlockDirs []string - // Create some empty blocks pending for compaction. + // Create some blocks pending for compaction. // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. totalBlocks := 2 for i := 0; i < totalBlocks; i++ { - blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 1)) + blockDir := createBlock(t, db.Dir(), genSeries(1, 1, int64(i), int64(i)+1)) block, err := OpenBlock(nil, blockDir, nil) require.NoError(t, err) // Add some fake tombstones to trigger the compaction. tomb := tombstones.NewMemTombstones() - tomb.AddInterval(0, tombstones.Interval{Mint: 0, Maxt: 1}) + tomb.AddInterval(0, tombstones.Interval{Mint: int64(i), Maxt: int64(i) + 1}) block.tombstones = tomb db.blocks = append(db.blocks, block) - expectedBlockDirs = append(expectedBlockDirs, blockDir) + oldBlockDirs = append(oldBlockDirs, blockDir) } // Initialize the mockCompactorFailing with a room for a single compaction iteration. @@ -1137,10 +1185,76 @@ func TestTombstoneCleanFail(t *testing.T) { // The compactor should trigger a failure here. require.Error(t, db.CleanTombstones()) - // Now check that the CleanTombstones didn't leave any blocks behind after a failure. + // Now check that the CleanTombstones replaced the old block even after a failure. actualBlockDirs, err := blockDirs(db.dir) require.NoError(t, err) - require.Equal(t, expectedBlockDirs, actualBlockDirs) + // Only one block should have been replaced by a new block. + require.Equal(t, len(oldBlockDirs), len(actualBlockDirs)) + require.Equal(t, len(intersection(oldBlockDirs, actualBlockDirs)), len(actualBlockDirs)-1) +} + +// TestTombstoneCleanRetentionLimitsRace tests that a CleanTombstones operation +// and retention limit policies, when triggered at the same time, +// won't race against each other. +func TestTombstoneCleanRetentionLimitsRace(t *testing.T) { + opts := DefaultOptions() + var wg sync.WaitGroup + + // We want to make sure that a race doesn't happen when a normal reload and a CleanTombstones() + // reload try to delete the same block. Without the correct lock placement, it can happen if a + // block is marked for deletion due to retention limits and also has tombstones to be cleaned at + // the same time. + // + // That is something tricky to trigger, so let's try several times just to make sure. + for i := 0; i < 20; i++ { + db := openTestDB(t, opts, nil) + totalBlocks := 20 + dbDir := db.Dir() + // Generate some blocks with old mint (near epoch). + for j := 0; j < totalBlocks; j++ { + blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1)) + block, err := OpenBlock(nil, blockDir, nil) + require.NoError(t, err) + // Cover block with tombstones so it can be deleted with CleanTombstones() as well. + tomb := tombstones.NewMemTombstones() + tomb.AddInterval(0, tombstones.Interval{Mint: int64(j), Maxt: int64(j) + 1}) + block.tombstones = tomb + + db.blocks = append(db.blocks, block) + } + + wg.Add(2) + // Run reload and CleanTombstones together, with a small time window randomization + go func() { + defer wg.Done() + time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond))) + require.NoError(t, db.reloadBlocks()) + }() + go func() { + defer wg.Done() + time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond))) + require.NoError(t, db.CleanTombstones()) + }() + + wg.Wait() + + require.NoError(t, db.Close()) + } + +} + +func intersection(oldBlocks, actualBlocks []string) (intersection []string) { + hash := make(map[string]bool) + for _, e := range oldBlocks { + hash[e] = true + } + for _, e := range actualBlocks { + // If block present in the hashmap then append intersection list. + if hash[e] { + intersection = append(intersection, e) + } + } + return } // mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total.