Merge pull request #12920 from prymitive/compactLock

Fix locks in db.reloadBlocks()
This commit is contained in:
Bryan Boreham 2025-02-10 17:35:09 +00:00 committed by GitHub
commit b74cebf6bf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 17 additions and 63 deletions

View file

@ -992,9 +992,14 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
db.metrics.maxBytes.Set(float64(maxBytes))
db.metrics.retentionDuration.Set((time.Duration(opts.RetentionDuration) * time.Millisecond).Seconds())
// Calling db.reload() calls db.reloadBlocks() which requires cmtx to be locked.
db.cmtx.Lock()
if err := db.reload(); err != nil {
db.cmtx.Unlock()
return nil, err
}
db.cmtx.Unlock()
// Set the min valid time for the ingested samples
// to be no lower than the maxt of the last block.
minValidTime := int64(math.MinInt64)
@ -1363,6 +1368,7 @@ func (db *DB) CompactOOOHead(ctx context.Context) error {
// Callback for testing.
var compactOOOHeadTestingCallback func()
// The db.cmtx mutex should be held before calling this method.
func (db *DB) compactOOOHead(ctx context.Context) error {
if !db.oooWasEnabled.Load() {
return nil
@ -1417,6 +1423,7 @@ func (db *DB) compactOOOHead(ctx context.Context) error {
// compactOOO creates a new block per possible block range in the compactor's directory from the OOO Head given.
// Each ULID in the result corresponds to a block in a unique time range.
// The db.cmtx mutex should be held before calling this method.
func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID, err error) {
start := time.Now()
@ -1461,7 +1468,7 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID
}
// compactHead compacts the given RangeHead.
// The compaction mutex should be held before calling this method.
// The db.cmtx should be held before calling this method.
func (db *DB) compactHead(head *RangeHead) error {
uids, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil)
if err != nil {
@ -1487,7 +1494,7 @@ func (db *DB) compactHead(head *RangeHead) error {
}
// compactBlocks compacts all the eligible on-disk blocks.
// The compaction mutex should be held before calling this method.
// The db.cmtx should be held before calling this method.
func (db *DB) compactBlocks() (err error) {
// Check for compactions of multiple blocks.
for {
@ -1544,6 +1551,7 @@ func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) {
}
// reload reloads blocks and truncates the head and its WAL.
// The db.cmtx mutex should be held before calling this method.
func (db *DB) reload() error {
if err := db.reloadBlocks(); err != nil {
return fmt.Errorf("reloadBlocks: %w", err)
@ -1560,6 +1568,7 @@ func (db *DB) reload() error {
// reloadBlocks reloads blocks without touching head.
// Blocks that are obsolete due to replacement or retention will be deleted.
// The db.cmtx mutex should be held before calling this method.
func (db *DB) reloadBlocks() (err error) {
defer func() {
if err != nil {
@ -1568,13 +1577,9 @@ func (db *DB) reloadBlocks() (err error) {
db.metrics.reloads.Inc()
}()
// Now that we reload TSDB every minute, there is a high chance for a race condition with a reload
// triggered by CleanTombstones(). We need to lock the reload to avoid the situation where
// a normal reload and CleanTombstones try to delete the same block.
db.mtx.Lock()
defer db.mtx.Unlock()
db.mtx.RLock()
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory)
db.mtx.RUnlock()
if err != nil {
return err
}
@ -1600,11 +1605,13 @@ func (db *DB) reloadBlocks() (err error) {
if len(corrupted) > 0 {
// Corrupted but no child loaded for it.
// Close all new blocks to release the lock for windows.
db.mtx.RLock()
for _, block := range loadable {
if _, open := getBlock(db.blocks, block.Meta().ULID); !open {
block.Close()
}
}
db.mtx.RUnlock()
errs := tsdb_errors.NewMulti()
for ulid, err := range corrupted {
if err != nil {
@ -1643,8 +1650,10 @@ func (db *DB) reloadBlocks() (err error) {
})
// Swap new blocks first for subsequently created readers to be seen.
db.mtx.Lock()
oldBlocks := db.blocks
db.blocks = toLoad
db.mtx.Unlock()
// Only check overlapping blocks when overlapping compaction is enabled.
if db.opts.EnableOverlappingCompaction {

View file

@ -1352,61 +1352,6 @@ func TestTombstoneCleanFail(t *testing.T) {
require.Len(t, intersection(oldBlockDirs, actualBlockDirs), len(actualBlockDirs)-1)
}
// TestTombstoneCleanRetentionLimitsRace tests that a CleanTombstones operation
// and retention limit policies, when triggered at the same time,
// won't race against each other.
func TestTombstoneCleanRetentionLimitsRace(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
opts := DefaultOptions()
var wg sync.WaitGroup
// We want to make sure that a race doesn't happen when a normal reload and a CleanTombstones()
// reload try to delete the same block. Without the correct lock placement, it can happen if a
// block is marked for deletion due to retention limits and also has tombstones to be cleaned at
// the same time.
//
// That is something tricky to trigger, so let's try several times just to make sure.
for i := 0; i < 20; i++ {
t.Run(fmt.Sprintf("iteration%d", i), func(t *testing.T) {
db := openTestDB(t, opts, nil)
totalBlocks := 20
dbDir := db.Dir()
// Generate some blocks with old mint (near epoch).
for j := 0; j < totalBlocks; j++ {
blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1))
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
// Cover block with tombstones so it can be deleted with CleanTombstones() as well.
tomb := tombstones.NewMemTombstones()
tomb.AddInterval(0, tombstones.Interval{Mint: int64(j), Maxt: int64(j) + 1})
block.tombstones = tomb
db.blocks = append(db.blocks, block)
}
wg.Add(2)
// Run reload and CleanTombstones together, with a small time window randomization
go func() {
defer wg.Done()
time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond)))
require.NoError(t, db.reloadBlocks())
}()
go func() {
defer wg.Done()
time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond)))
require.NoError(t, db.CleanTombstones())
}()
wg.Wait()
require.NoError(t, db.Close())
})
}
}
func intersection(oldBlocks, actualBlocks []string) (intersection []string) {
hash := make(map[string]bool)
for _, e := range oldBlocks {