mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
Rolling tombstones clean up (#8007)
* CleanupTombstones refactored, now reloading blocks after every compaction. The goal is to remove deletable blocks after every compaction and, thus, decrease disk space used when cleaning tombstones. Signed-off-by: arthursens <arthursens2005@gmail.com> * Protect DB against parallel reloads Signed-off-by: ArthurSens <arthursens2005@gmail.com> * Fix typos Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> Co-authored-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
parent
9b85ed7056
commit
6a3d55db0a
|
@ -569,7 +569,9 @@ Outer:
|
||||||
|
|
||||||
// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones).
|
// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones).
|
||||||
// If there was a rewrite, then it returns the ULID of the new block written, else nil.
|
// If there was a rewrite, then it returns the ULID of the new block written, else nil.
|
||||||
func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
|
// If the resultant block is empty (tombstones covered the whole block), then it deletes the new block and return nil UID.
|
||||||
|
// It returns a boolean indicating if the parent block can be deleted safely of not.
|
||||||
|
func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, bool, error) {
|
||||||
numStones := 0
|
numStones := 0
|
||||||
|
|
||||||
if err := pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error {
|
if err := pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error {
|
||||||
|
@ -580,15 +582,16 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
if numStones == 0 {
|
if numStones == 0 {
|
||||||
return nil, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
meta := pb.Meta()
|
meta := pb.Meta()
|
||||||
uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta)
|
uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
return &uid, nil
|
|
||||||
|
return &uid, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshot creates snapshot of the block into dir.
|
// Snapshot creates snapshot of the block into dir.
|
||||||
|
|
62
tsdb/db.go
62
tsdb/db.go
|
@ -979,6 +979,12 @@ func (db *DB) reloadBlocks() (err error) {
|
||||||
db.metrics.reloads.Inc()
|
db.metrics.reloads.Inc()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// Now that we reload TSDB every minute, there is high chance for race condition with a reload
|
||||||
|
// triggered by CleanTombstones(). We need to lock the reload to avoid the situation where
|
||||||
|
// a normal reload and CleanTombstones try to delete the same block.
|
||||||
|
db.mtx.Lock()
|
||||||
|
defer db.mtx.Unlock()
|
||||||
|
|
||||||
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool)
|
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -1044,10 +1050,8 @@ func (db *DB) reloadBlocks() (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Swap new blocks first for subsequently created readers to be seen.
|
// Swap new blocks first for subsequently created readers to be seen.
|
||||||
db.mtx.Lock()
|
|
||||||
oldBlocks := db.blocks
|
oldBlocks := db.blocks
|
||||||
db.blocks = toLoad
|
db.blocks = toLoad
|
||||||
db.mtx.Unlock()
|
|
||||||
|
|
||||||
blockMetas := make([]BlockMeta, 0, len(toLoad))
|
blockMetas := make([]BlockMeta, 0, len(toLoad))
|
||||||
for _, b := range toLoad {
|
for _, b := range toLoad {
|
||||||
|
@ -1537,34 +1541,44 @@ func (db *DB) CleanTombstones() (err error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer db.metrics.tombCleanTimer.Observe(time.Since(start).Seconds())
|
defer db.metrics.tombCleanTimer.Observe(time.Since(start).Seconds())
|
||||||
|
|
||||||
newUIDs := []ulid.ULID{}
|
cleanUpCompleted := false
|
||||||
defer func() {
|
// Repeat cleanup until there is no tombstones left.
|
||||||
// If any error is caused, we need to delete all the new directory created.
|
for !cleanUpCompleted {
|
||||||
if err != nil {
|
cleanUpCompleted = true
|
||||||
for _, uid := range newUIDs {
|
|
||||||
|
for _, pb := range db.Blocks() {
|
||||||
|
uid, safeToDelete, cleanErr := pb.CleanTombstones(db.Dir(), db.compactor)
|
||||||
|
if cleanErr != nil {
|
||||||
|
return errors.Wrapf(cleanErr, "clean tombstones: %s", pb.Dir())
|
||||||
|
}
|
||||||
|
if !safeToDelete {
|
||||||
|
// There was nothing to clean.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// In case tombstones of the old block covers the whole block,
|
||||||
|
// then there would be no resultant block to tell the parent.
|
||||||
|
// The lock protects against race conditions when deleting blocks
|
||||||
|
// during an already running reload.
|
||||||
|
db.mtx.Lock()
|
||||||
|
pb.meta.Compaction.Deletable = safeToDelete
|
||||||
|
db.mtx.Unlock()
|
||||||
|
cleanUpCompleted = false
|
||||||
|
if err = db.reloadBlocks(); err == nil { // Will try to delete old block.
|
||||||
|
// Successful reload will change the existing blocks.
|
||||||
|
// We need to loop over the new set of blocks.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete new block if it was created.
|
||||||
|
if uid != nil && *uid != (ulid.ULID{}) {
|
||||||
dir := filepath.Join(db.Dir(), uid.String())
|
dir := filepath.Join(db.Dir(), uid.String())
|
||||||
if err := os.RemoveAll(dir); err != nil {
|
if err := os.RemoveAll(dir); err != nil {
|
||||||
level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err)
|
level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return errors.Wrap(err, "reload blocks")
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
|
|
||||||
db.mtx.RLock()
|
|
||||||
blocks := db.blocks[:]
|
|
||||||
db.mtx.RUnlock()
|
|
||||||
|
|
||||||
for _, b := range blocks {
|
|
||||||
if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil {
|
|
||||||
err = errors.Wrapf(er, "clean tombstones: %s", b.Dir())
|
|
||||||
return err
|
|
||||||
} else if uid != nil { // New block was created.
|
|
||||||
newUIDs = append(newUIDs, *uid)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := db.reloadBlocks(); err != nil {
|
|
||||||
return errors.Wrap(err, "reload blocks")
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
132
tsdb/db_test.go
132
tsdb/db_test.go
|
@ -1030,7 +1030,7 @@ func TestTombstoneClean(t *testing.T) {
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
// Delete the ranges.
|
// Delete the ranges.
|
||||||
|
|
||||||
// create snapshot
|
// Create snapshot.
|
||||||
snap, err := ioutil.TempDir("", "snap")
|
snap, err := ioutil.TempDir("", "snap")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -1040,7 +1040,7 @@ func TestTombstoneClean(t *testing.T) {
|
||||||
require.NoError(t, db.Snapshot(snap, true))
|
require.NoError(t, db.Snapshot(snap, true))
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
// reopen DB from snapshot
|
// Reopen DB from snapshot.
|
||||||
db, err = Open(snap, nil, nil, nil)
|
db, err = Open(snap, nil, nil, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
@ -1099,6 +1099,54 @@ func TestTombstoneClean(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestTombstoneCleanResultEmptyBlock tests that a TombstoneClean that results in empty blocks (no timeseries)
|
||||||
|
// will also delete the resultant block.
|
||||||
|
func TestTombstoneCleanResultEmptyBlock(t *testing.T) {
|
||||||
|
numSamples := int64(10)
|
||||||
|
|
||||||
|
db := openTestDB(t, nil, nil)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
app := db.Appender(ctx)
|
||||||
|
|
||||||
|
smpls := make([]float64, numSamples)
|
||||||
|
for i := int64(0); i < numSamples; i++ {
|
||||||
|
smpls[i] = rand.Float64()
|
||||||
|
app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
// Interval should cover the whole block.
|
||||||
|
intervals := tombstones.Intervals{{Mint: 0, Maxt: numSamples}}
|
||||||
|
|
||||||
|
// Create snapshot.
|
||||||
|
snap, err := ioutil.TempDir("", "snap")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, os.RemoveAll(snap))
|
||||||
|
}()
|
||||||
|
require.NoError(t, db.Snapshot(snap, true))
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
|
// Reopen DB from snapshot.
|
||||||
|
db, err = Open(snap, nil, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
// Create tombstones by deleting all samples.
|
||||||
|
for _, r := range intervals {
|
||||||
|
require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, db.CleanTombstones())
|
||||||
|
|
||||||
|
// After cleaning tombstones that covers the entire block, no blocks should be left behind.
|
||||||
|
actualBlockDirs, err := blockDirs(db.dir)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, len(actualBlockDirs))
|
||||||
|
}
|
||||||
|
|
||||||
// TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind.
|
// TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind.
|
||||||
// When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so
|
// When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so
|
||||||
// if TombstoneClean leaves any blocks behind these will overlap.
|
// if TombstoneClean leaves any blocks behind these will overlap.
|
||||||
|
@ -1108,22 +1156,22 @@ func TestTombstoneCleanFail(t *testing.T) {
|
||||||
require.NoError(t, db.Close())
|
require.NoError(t, db.Close())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var expectedBlockDirs []string
|
var oldBlockDirs []string
|
||||||
|
|
||||||
// Create some empty blocks pending for compaction.
|
// Create some blocks pending for compaction.
|
||||||
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
|
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
|
||||||
totalBlocks := 2
|
totalBlocks := 2
|
||||||
for i := 0; i < totalBlocks; i++ {
|
for i := 0; i < totalBlocks; i++ {
|
||||||
blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 1))
|
blockDir := createBlock(t, db.Dir(), genSeries(1, 1, int64(i), int64(i)+1))
|
||||||
block, err := OpenBlock(nil, blockDir, nil)
|
block, err := OpenBlock(nil, blockDir, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// Add some fake tombstones to trigger the compaction.
|
// Add some fake tombstones to trigger the compaction.
|
||||||
tomb := tombstones.NewMemTombstones()
|
tomb := tombstones.NewMemTombstones()
|
||||||
tomb.AddInterval(0, tombstones.Interval{Mint: 0, Maxt: 1})
|
tomb.AddInterval(0, tombstones.Interval{Mint: int64(i), Maxt: int64(i) + 1})
|
||||||
block.tombstones = tomb
|
block.tombstones = tomb
|
||||||
|
|
||||||
db.blocks = append(db.blocks, block)
|
db.blocks = append(db.blocks, block)
|
||||||
expectedBlockDirs = append(expectedBlockDirs, blockDir)
|
oldBlockDirs = append(oldBlockDirs, blockDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize the mockCompactorFailing with a room for a single compaction iteration.
|
// Initialize the mockCompactorFailing with a room for a single compaction iteration.
|
||||||
|
@ -1137,10 +1185,76 @@ func TestTombstoneCleanFail(t *testing.T) {
|
||||||
// The compactor should trigger a failure here.
|
// The compactor should trigger a failure here.
|
||||||
require.Error(t, db.CleanTombstones())
|
require.Error(t, db.CleanTombstones())
|
||||||
|
|
||||||
// Now check that the CleanTombstones didn't leave any blocks behind after a failure.
|
// Now check that the CleanTombstones replaced the old block even after a failure.
|
||||||
actualBlockDirs, err := blockDirs(db.dir)
|
actualBlockDirs, err := blockDirs(db.dir)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expectedBlockDirs, actualBlockDirs)
|
// Only one block should have been replaced by a new block.
|
||||||
|
require.Equal(t, len(oldBlockDirs), len(actualBlockDirs))
|
||||||
|
require.Equal(t, len(intersection(oldBlockDirs, actualBlockDirs)), len(actualBlockDirs)-1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestTombstoneCleanRetentionLimitsRace tests that a CleanTombstones operation
|
||||||
|
// and retention limit policies, when triggered at the same time,
|
||||||
|
// won't race against each other.
|
||||||
|
func TestTombstoneCleanRetentionLimitsRace(t *testing.T) {
|
||||||
|
opts := DefaultOptions()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// We want to make sure that a race doesn't happen when a normal reload and a CleanTombstones()
|
||||||
|
// reload try to delete the same block. Without the correct lock placement, it can happen if a
|
||||||
|
// block is marked for deletion due to retention limits and also has tombstones to be cleaned at
|
||||||
|
// the same time.
|
||||||
|
//
|
||||||
|
// That is something tricky to trigger, so let's try several times just to make sure.
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
db := openTestDB(t, opts, nil)
|
||||||
|
totalBlocks := 20
|
||||||
|
dbDir := db.Dir()
|
||||||
|
// Generate some blocks with old mint (near epoch).
|
||||||
|
for j := 0; j < totalBlocks; j++ {
|
||||||
|
blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1))
|
||||||
|
block, err := OpenBlock(nil, blockDir, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
// Cover block with tombstones so it can be deleted with CleanTombstones() as well.
|
||||||
|
tomb := tombstones.NewMemTombstones()
|
||||||
|
tomb.AddInterval(0, tombstones.Interval{Mint: int64(j), Maxt: int64(j) + 1})
|
||||||
|
block.tombstones = tomb
|
||||||
|
|
||||||
|
db.blocks = append(db.blocks, block)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(2)
|
||||||
|
// Run reload and CleanTombstones together, with a small time window randomization
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond)))
|
||||||
|
require.NoError(t, db.reloadBlocks())
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond)))
|
||||||
|
require.NoError(t, db.CleanTombstones())
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func intersection(oldBlocks, actualBlocks []string) (intersection []string) {
|
||||||
|
hash := make(map[string]bool)
|
||||||
|
for _, e := range oldBlocks {
|
||||||
|
hash[e] = true
|
||||||
|
}
|
||||||
|
for _, e := range actualBlocks {
|
||||||
|
// If block present in the hashmap then append intersection list.
|
||||||
|
if hash[e] {
|
||||||
|
intersection = append(intersection, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total.
|
// mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total.
|
||||||
|
|
Loading…
Reference in a new issue