tsdb: Extend compactor interface to allow compactions to create multiple output blocks (#14143)

* add hook to allow head compaction to create multiple output blocks

Signed-off-by: Ben Ye <benye@amazon.com>

* change Compact interface; remove BlockPopulator changes

Signed-off-by: Ben Ye <benye@amazon.com>

* rebase main

Signed-off-by: Ben Ye <benye@amazon.com>

* fix lint

Signed-off-by: Ben Ye <benye@amazon.com>

* fix unit test

Signed-off-by: Ben Ye <benye@amazon.com>

* address feedbacks; add unit test

Signed-off-by: Ben Ye <benye@amazon.com>

* Apply suggestions from code review

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Update tsdb/compact_test.go

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Ben Ye 2024-06-12 14:31:25 -07:00 committed by GitHub
parent 05380aa0ac
commit 5a218708f1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 141 additions and 99 deletions

View file

@ -22,6 +22,7 @@ import (
"time" "time"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/textparse"
@ -191,6 +192,10 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
if quiet { if quiet {
break break
} }
// Empty block, don't print.
if block.Compare(ulid.ULID{}) == 0 {
break
}
blocks, err := db.Blocks() blocks, err := db.Blocks()
if err != nil { if err != nil {
return fmt.Errorf("get blocks: %w", err) return fmt.Errorf("get blocks: %w", err)

View file

@ -646,10 +646,10 @@ Outer:
} }
// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). // CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones).
// If there was a rewrite, then it returns the ULID of the new block written, else nil. // If there was a rewrite, then it returns the ULID of new blocks written, else nil.
// If the resultant block is empty (tombstones covered the whole block), then it deletes the new block and return nil UID. // If a resultant block is empty (tombstones covered the whole block), then it returns an empty slice.
// It returns a boolean indicating if the parent block can be deleted safely of not. // It returns a boolean indicating if the parent block can be deleted safely of not.
func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, bool, error) { func (pb *Block) CleanTombstones(dest string, c Compactor) ([]ulid.ULID, bool, error) {
numStones := 0 numStones := 0
if err := pb.tombstones.Iter(func(id storage.SeriesRef, ivs tombstones.Intervals) error { if err := pb.tombstones.Iter(func(id storage.SeriesRef, ivs tombstones.Intervals) error {
@ -664,12 +664,12 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, bool, er
} }
meta := pb.Meta() meta := pb.Meta()
uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta) uids, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
return &uid, true, nil return uids, true, nil
} }
// Snapshot creates snapshot of the block into dir. // Snapshot creates snapshot of the block into dir.

View file

@ -346,9 +346,10 @@ func TestBlockSize(t *testing.T) {
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
require.NoError(t, err) require.NoError(t, err)
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) blockDirsAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
require.NoError(t, err) require.NoError(t, err)
blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirAfterCompact.String()), nil) require.Len(t, blockDirsAfterCompact, 1)
blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirsAfterCompact[0].String()), nil)
require.NoError(t, err) require.NoError(t, err)
defer func() { defer func() {
require.NoError(t, blockAfterCompact.Close()) require.NoError(t, blockAfterCompact.Close())
@ -605,9 +606,10 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes. // Because of this block intervals are always +1 than the total samples it includes.
ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) ulids, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil)
require.NoError(tb, err) require.NoError(tb, err)
return filepath.Join(dir, ulid.String()) require.Len(tb, ulids, 1)
return filepath.Join(dir, ulids[0].String())
} }
func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string { func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string {
@ -618,9 +620,10 @@ func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead)
// Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
// Because of this block intervals are always +1 than the total samples it includes. // Because of this block intervals are always +1 than the total samples it includes.
ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) ulids, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil)
require.NoError(tb, err) require.NoError(tb, err)
return filepath.Join(dir, ulid.String()) require.Len(tb, ulids, 1)
return filepath.Join(dir, ulids[0].String())
} }
func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir string) *Head { func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir string) *Head {

View file

@ -105,12 +105,17 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
if err != nil { if err != nil {
return ulid.ULID{}, fmt.Errorf("create leveled compactor: %w", err) return ulid.ULID{}, fmt.Errorf("create leveled compactor: %w", err)
} }
id, err := compactor.Write(w.destinationDir, w.head, mint, maxt, nil) ids, err := compactor.Write(w.destinationDir, w.head, mint, maxt, nil)
if err != nil { if err != nil {
return ulid.ULID{}, fmt.Errorf("compactor write: %w", err) return ulid.ULID{}, fmt.Errorf("compactor write: %w", err)
} }
return id, nil // No block was produced. Caller is responsible to check empty
// ulid.ULID based on its use case.
if len(ids) == 0 {
return ulid.ULID{}, nil
}
return ids[0], nil
} }
func (w *BlockWriter) Close() error { func (w *BlockWriter) Close() error {

View file

@ -58,19 +58,23 @@ type Compactor interface {
// Results returned when compactions are in progress are undefined. // Results returned when compactions are in progress are undefined.
Plan(dir string) ([]string, error) Plan(dir string) ([]string, error)
// Write persists a Block into a directory. // Write persists one or more Blocks into a directory.
// No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. // No Block is written when resulting Block has 0 samples and returns an empty slice.
Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) (ulid.ULID, error) // Prometheus always return one or no block. The interface allows returning more than one
// block for downstream users to experiment with compactor.
Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) ([]ulid.ULID, error)
// Compact runs compaction against the provided directories. Must // Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan(). // only be called concurrently with results of Plan().
// Can optionally pass a list of already open blocks, // Can optionally pass a list of already open blocks,
// to avoid having to reopen them. // to avoid having to reopen them.
// When resulting Block has 0 samples // Prometheus always return one or no block. The interface allows returning more than one
// block for downstream users to experiment with compactor.
// When one resulting Block has 0 samples
// * No block is written. // * No block is written.
// * The source dirs are marked Deletable. // * The source dirs are marked Deletable.
// * Returns empty ulid.ULID{}. // * Block is not included in the result.
Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) Compact(dest string, dirs []string, open []*Block) ([]ulid.ULID, error)
} }
// LeveledCompactor implements the Compactor interface. // LeveledCompactor implements the Compactor interface.
@ -441,11 +445,11 @@ func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
// Compact creates a new block in the compactor's directory from the blocks in the // Compact creates a new block in the compactor's directory from the blocks in the
// provided directories. // provided directories.
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) ([]ulid.ULID, error) {
return c.CompactWithBlockPopulator(dest, dirs, open, DefaultBlockPopulator{}) return c.CompactWithBlockPopulator(dest, dirs, open, DefaultBlockPopulator{})
} }
func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator) (uid ulid.ULID, err error) { func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator) ([]ulid.ULID, error) {
var ( var (
blocks []BlockReader blocks []BlockReader
bs []*Block bs []*Block
@ -457,7 +461,7 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string,
for _, d := range dirs { for _, d := range dirs {
meta, _, err := readMetaFile(d) meta, _, err := readMetaFile(d)
if err != nil { if err != nil {
return uid, err return nil, err
} }
var b *Block var b *Block
@ -475,7 +479,7 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string,
var err error var err error
b, err = OpenBlock(c.logger, d, c.chunkPool) b, err = OpenBlock(c.logger, d, c.chunkPool)
if err != nil { if err != nil {
return uid, err return nil, err
} }
defer b.Close() defer b.Close()
} }
@ -486,10 +490,10 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string,
uids = append(uids, meta.ULID.String()) uids = append(uids, meta.ULID.String())
} }
uid = ulid.MustNew(ulid.Now(), rand.Reader) uid := ulid.MustNew(ulid.Now(), rand.Reader)
meta := CompactBlockMetas(uid, metas...) meta := CompactBlockMetas(uid, metas...)
err = c.write(dest, meta, blockPopulator, blocks...) err := c.write(dest, meta, blockPopulator, blocks...)
if err == nil { if err == nil {
if meta.Stats.NumSamples == 0 { if meta.Stats.NumSamples == 0 {
for _, b := range bs { for _, b := range bs {
@ -503,25 +507,25 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string,
} }
b.numBytesMeta = n b.numBytesMeta = n
} }
uid = ulid.ULID{}
level.Info(c.logger).Log( level.Info(c.logger).Log(
"msg", "compact blocks resulted in empty block", "msg", "compact blocks resulted in empty block",
"count", len(blocks), "count", len(blocks),
"sources", fmt.Sprintf("%v", uids), "sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start), "duration", time.Since(start),
) )
} else { return nil, nil
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
} }
return uid, nil
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
return []ulid.ULID{uid}, nil
} }
errs := tsdb_errors.NewMulti(err) errs := tsdb_errors.NewMulti(err)
@ -533,10 +537,10 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string,
} }
} }
return uid, errs.Err() return nil, errs.Err()
} }
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) (ulid.ULID, error) { func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) ([]ulid.ULID, error) {
start := time.Now() start := time.Now()
uid := ulid.MustNew(ulid.Now(), rand.Reader) uid := ulid.MustNew(ulid.Now(), rand.Reader)
@ -560,7 +564,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, b
err := c.write(dest, meta, DefaultBlockPopulator{}, b) err := c.write(dest, meta, DefaultBlockPopulator{}, b)
if err != nil { if err != nil {
return uid, err return nil, err
} }
if meta.Stats.NumSamples == 0 { if meta.Stats.NumSamples == 0 {
@ -570,7 +574,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, b
"maxt", meta.MaxTime, "maxt", meta.MaxTime,
"duration", time.Since(start), "duration", time.Since(start),
) )
return ulid.ULID{}, nil return nil, nil
} }
level.Info(c.logger).Log( level.Info(c.logger).Log(
@ -581,7 +585,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, b
"duration", time.Since(start), "duration", time.Since(start),
"ooo", meta.Compaction.FromOutOfOrder(), "ooo", meta.Compaction.FromOutOfOrder(),
) )
return uid, nil return []ulid.ULID{uid}, nil
} }
// instrumentedChunkWriter is used for level 1 compactions to record statistics // instrumentedChunkWriter is used for level 1 compactions to record statistics

View file

@ -1484,12 +1484,12 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
require.NoError(t, err) require.NoError(t, err)
id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil) ids, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil)
require.NoError(t, err) require.NoError(t, err)
require.NotEqual(t, ulid.ULID{}, id) require.Len(t, ids, 1)
// Open the block and query it and check the histograms. // Open the block and query it and check the histograms.
block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, id.String()), nil) block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, ids[0].String()), nil)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, block.Close()) require.NoError(t, block.Close())
@ -1598,8 +1598,8 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
sparseApp := sparseHead.Appender(context.Background()) sparseApp := sparseHead.Appender(context.Background())
numOldSeriesPerHistogram := 0 numOldSeriesPerHistogram := 0
var oldULID ulid.ULID var oldULIDs []ulid.ULID
var sparseULID ulid.ULID var sparseULIDs []ulid.ULID
var wg sync.WaitGroup var wg sync.WaitGroup
@ -1626,9 +1626,9 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
require.NoError(t, err) require.NoError(t, err)
sparseULID, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil) sparseULIDs, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil)
require.NoError(t, err) require.NoError(t, err)
require.NotEqual(t, ulid.ULID{}, sparseULID) require.Len(t, sparseULIDs, 1)
}() }()
wg.Add(1) wg.Add(1)
@ -1677,15 +1677,15 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
require.NoError(t, err) require.NoError(t, err)
oldULID, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil) oldULIDs, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil)
require.NoError(t, err) require.NoError(t, err)
require.NotEqual(t, ulid.ULID{}, oldULID) require.Len(t, oldULIDs, 1)
}() }()
wg.Wait() wg.Wait()
oldBlockDir := filepath.Join(oldHead.opts.ChunkDirRoot, oldULID.String()) oldBlockDir := filepath.Join(oldHead.opts.ChunkDirRoot, oldULIDs[0].String())
sparseBlockDir := filepath.Join(sparseHead.opts.ChunkDirRoot, sparseULID.String()) sparseBlockDir := filepath.Join(sparseHead.opts.ChunkDirRoot, sparseULIDs[0].String())
oldSize, err := fileutil.DirSize(oldBlockDir) oldSize, err := fileutil.DirSize(oldBlockDir)
require.NoError(t, err) require.NoError(t, err)
@ -1846,3 +1846,22 @@ func TestCompactBlockMetas(t *testing.T) {
} }
require.Equal(t, expected, output) require.Equal(t, expected, output)
} }
func TestCompactEmptyResultBlockWithTombstone(t *testing.T) {
ctx := context.Background()
tmpdir := t.TempDir()
blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 10))
block, err := OpenBlock(nil, blockDir, nil)
require.NoError(t, err)
// Write tombstone covering the whole block.
err = block.Delete(ctx, 0, 10, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "0"))
require.NoError(t, err)
c, err := NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{0}, nil, nil)
require.NoError(t, err)
ulids, err := c.Compact(tmpdir, []string{blockDir}, []*Block{block})
require.NoError(t, err)
require.Nil(t, ulids)
require.NoError(t, block.Close())
}

View file

@ -1336,13 +1336,11 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID
for t := blockSize * (oooHeadMint / blockSize); t <= oooHeadMaxt; t += blockSize { for t := blockSize * (oooHeadMint / blockSize); t <= oooHeadMaxt; t += blockSize {
mint, maxt := t, t+blockSize mint, maxt := t, t+blockSize
// Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes. // Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.
uid, err := db.compactor.Write(dest, oooHead.CloneForTimeRange(mint, maxt-1), mint, maxt, meta) uids, err := db.compactor.Write(dest, oooHead.CloneForTimeRange(mint, maxt-1), mint, maxt, meta)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if uid.Compare(ulid.ULID{}) != 0 { ulids = append(ulids, uids...)
ulids = append(ulids, uid)
}
} }
if len(ulids) == 0 { if len(ulids) == 0 {
@ -1364,19 +1362,19 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID
// compactHead compacts the given RangeHead. // compactHead compacts the given RangeHead.
// The compaction mutex should be held before calling this method. // The compaction mutex should be held before calling this method.
func (db *DB) compactHead(head *RangeHead) error { func (db *DB) compactHead(head *RangeHead) error {
uid, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil) uids, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil)
if err != nil { if err != nil {
return fmt.Errorf("persist head block: %w", err) return fmt.Errorf("persist head block: %w", err)
} }
if err := db.reloadBlocks(); err != nil { if err := db.reloadBlocks(); err != nil {
if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil { multiErr := tsdb_errors.NewMulti(fmt.Errorf("reloadBlocks blocks: %w", err))
return tsdb_errors.NewMulti( for _, uid := range uids {
fmt.Errorf("reloadBlocks blocks: %w", err), if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil {
fmt.Errorf("delete persisted head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll), multiErr.Add(fmt.Errorf("delete persisted head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll))
).Err() }
} }
return fmt.Errorf("reloadBlocks blocks: %w", err) return multiErr.Err()
} }
if err = db.head.truncateMemory(head.BlockMaxTime()); err != nil { if err = db.head.truncateMemory(head.BlockMaxTime()); err != nil {
return fmt.Errorf("head memory truncate: %w", err) return fmt.Errorf("head memory truncate: %w", err)
@ -1411,16 +1409,19 @@ func (db *DB) compactBlocks() (err error) {
default: default:
} }
uid, err := db.compactor.Compact(db.dir, plan, db.blocks) uids, err := db.compactor.Compact(db.dir, plan, db.blocks)
if err != nil { if err != nil {
return fmt.Errorf("compact %s: %w", plan, err) return fmt.Errorf("compact %s: %w", plan, err)
} }
if err := db.reloadBlocks(); err != nil { if err := db.reloadBlocks(); err != nil {
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { errs := tsdb_errors.NewMulti(fmt.Errorf("reloadBlocks blocks: %w", err))
return fmt.Errorf("delete compacted block after failed db reloadBlocks:%s: %w", uid, err) for _, uid := range uids {
if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil {
errs.Add(fmt.Errorf("delete persisted block after failed db reloadBlocks:%s: %w", uid, errRemoveAll))
}
} }
return fmt.Errorf("reloadBlocks blocks: %w", err) return errs.Err()
} }
} }
@ -1541,12 +1542,15 @@ func (db *DB) reloadBlocks() (err error) {
oldBlocks := db.blocks oldBlocks := db.blocks
db.blocks = toLoad db.blocks = toLoad
blockMetas := make([]BlockMeta, 0, len(toLoad)) // Only check overlapping blocks when overlapping compaction is enabled.
for _, b := range toLoad { if db.opts.EnableOverlappingCompaction {
blockMetas = append(blockMetas, b.Meta()) blockMetas := make([]BlockMeta, 0, len(toLoad))
} for _, b := range toLoad {
if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { blockMetas = append(blockMetas, b.Meta())
level.Warn(db.logger).Log("msg", "Overlapping blocks found during reloadBlocks", "detail", overlaps.String()) }
if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 {
level.Warn(db.logger).Log("msg", "Overlapping blocks found during reloadBlocks", "detail", overlaps.String())
}
} }
// Append blocks to old, deletable blocks, so we can close them. // Append blocks to old, deletable blocks, so we can close them.
@ -2149,7 +2153,7 @@ func (db *DB) CleanTombstones() (err error) {
cleanUpCompleted = true cleanUpCompleted = true
for _, pb := range db.Blocks() { for _, pb := range db.Blocks() {
uid, safeToDelete, cleanErr := pb.CleanTombstones(db.Dir(), db.compactor) uids, safeToDelete, cleanErr := pb.CleanTombstones(db.Dir(), db.compactor)
if cleanErr != nil { if cleanErr != nil {
return fmt.Errorf("clean tombstones: %s: %w", pb.Dir(), cleanErr) return fmt.Errorf("clean tombstones: %s: %w", pb.Dir(), cleanErr)
} }
@ -2173,7 +2177,7 @@ func (db *DB) CleanTombstones() (err error) {
} }
// Delete new block if it was created. // Delete new block if it was created.
if uid != nil && *uid != (ulid.ULID{}) { for _, uid := range uids {
dir := filepath.Join(db.Dir(), uid.String()) dir := filepath.Join(db.Dir(), uid.String())
if err := os.RemoveAll(dir); err != nil { if err := os.RemoveAll(dir); err != nil {
level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err) level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err)

View file

@ -1431,9 +1431,9 @@ func (*mockCompactorFailing) Plan(string) ([]string, error) {
return nil, nil return nil, nil
} }
func (c *mockCompactorFailing) Write(dest string, _ BlockReader, _, _ int64, _ *BlockMeta) (ulid.ULID, error) { func (c *mockCompactorFailing) Write(dest string, _ BlockReader, _, _ int64, _ *BlockMeta) ([]ulid.ULID, error) {
if len(c.blocks) >= c.max { if len(c.blocks) >= c.max {
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") return []ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
} }
block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil) block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil)
@ -1452,11 +1452,11 @@ func (c *mockCompactorFailing) Write(dest string, _ BlockReader, _, _ int64, _ *
require.Equal(c.t, expectedBlocks, actualBlockDirs) require.Equal(c.t, expectedBlocks, actualBlockDirs)
return block.Meta().ULID, nil return []ulid.ULID{block.Meta().ULID}, nil
} }
func (*mockCompactorFailing) Compact(string, []string, []*Block) (ulid.ULID, error) { func (*mockCompactorFailing) Compact(string, []string, []*Block) ([]ulid.ULID, error) {
return ulid.ULID{}, nil return []ulid.ULID{}, nil
} }
func (*mockCompactorFailing) CompactOOO(string, *OOOCompactionHead) (result []ulid.ULID, err error) { func (*mockCompactorFailing) CompactOOO(string, *OOOCompactionHead) (result []ulid.ULID, err error) {
@ -6804,9 +6804,9 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) {
for _, b := range blocks { for _, b := range blocks {
blockDirs = append(blockDirs, b.Dir()) blockDirs = append(blockDirs, b.Dir())
} }
id, err := db.compactor.Compact(db.Dir(), blockDirs, blocks) ids, err := db.compactor.Compact(db.Dir(), blockDirs, blocks)
require.NoError(t, err) require.NoError(t, err)
require.NotEqual(t, ulid.ULID{}, id) require.Len(t, ids, 1)
require.NoError(t, db.reload()) require.NoError(t, db.reload())
require.Len(t, db.Blocks(), 1) require.Len(t, db.Blocks(), 1)
@ -7068,19 +7068,19 @@ func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) {
type mockCompactorFn struct { type mockCompactorFn struct {
planFn func() ([]string, error) planFn func() ([]string, error)
compactFn func() (ulid.ULID, error) compactFn func() ([]ulid.ULID, error)
writeFn func() (ulid.ULID, error) writeFn func() ([]ulid.ULID, error)
} }
func (c *mockCompactorFn) Plan(_ string) ([]string, error) { func (c *mockCompactorFn) Plan(_ string) ([]string, error) {
return c.planFn() return c.planFn()
} }
func (c *mockCompactorFn) Compact(_ string, _ []string, _ []*Block) (ulid.ULID, error) { func (c *mockCompactorFn) Compact(_ string, _ []string, _ []*Block) ([]ulid.ULID, error) {
return c.compactFn() return c.compactFn()
} }
func (c *mockCompactorFn) Write(_ string, _ BlockReader, _, _ int64, _ *BlockMeta) (ulid.ULID, error) { func (c *mockCompactorFn) Write(_ string, _ BlockReader, _, _ int64, _ *BlockMeta) ([]ulid.ULID, error) {
return c.writeFn() return c.writeFn()
} }
@ -7112,11 +7112,11 @@ func TestAbortBlockCompactions(t *testing.T) {
// Our custom Plan() will always return something to compact. // Our custom Plan() will always return something to compact.
return []string{"1", "2", "3"}, nil return []string{"1", "2", "3"}, nil
}, },
compactFn: func() (ulid.ULID, error) { compactFn: func() ([]ulid.ULID, error) {
return ulid.ULID{}, nil return []ulid.ULID{}, nil
}, },
writeFn: func() (ulid.ULID, error) { writeFn: func() ([]ulid.ULID, error) {
return ulid.ULID{}, nil return []ulid.ULID{}, nil
}, },
} }
@ -7135,11 +7135,11 @@ func TestNewCompactorFunc(t *testing.T) {
planFn: func() ([]string, error) { planFn: func() ([]string, error) {
return []string{block1.String(), block2.String()}, nil return []string{block1.String(), block2.String()}, nil
}, },
compactFn: func() (ulid.ULID, error) { compactFn: func() ([]ulid.ULID, error) {
return block1, nil return []ulid.ULID{block1}, nil
}, },
writeFn: func() (ulid.ULID, error) { writeFn: func() ([]ulid.ULID, error) {
return block2, nil return []ulid.ULID{block2}, nil
}, },
}, nil }, nil
} }
@ -7150,10 +7150,12 @@ func TestNewCompactorFunc(t *testing.T) {
plans, err := db.compactor.Plan("") plans, err := db.compactor.Plan("")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []string{block1.String(), block2.String()}, plans) require.Equal(t, []string{block1.String(), block2.String()}, plans)
ulid, err := db.compactor.Compact("", nil, nil) ulids, err := db.compactor.Compact("", nil, nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, block1, ulid) require.Len(t, ulids, 1)
ulid, err = db.compactor.Write("", nil, 0, 1, nil) require.Equal(t, block1, ulids[0])
ulids, err = db.compactor.Write("", nil, 0, 1, nil)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, block2, ulid) require.Len(t, ulids, 1)
require.Equal(t, block2, ulids[0])
} }