diff --git a/cmd/promtool/backfill.go b/cmd/promtool/backfill.go index 79db428c7..400cae421 100644 --- a/cmd/promtool/backfill.go +++ b/cmd/promtool/backfill.go @@ -22,6 +22,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/oklog/ulid" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/textparse" @@ -191,6 +192,10 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn if quiet { break } + // Empty block, don't print. + if block.Compare(ulid.ULID{}) == 0 { + break + } blocks, err := db.Blocks() if err != nil { return fmt.Errorf("get blocks: %w", err) diff --git a/tsdb/block.go b/tsdb/block.go index d2e7aa6fa..2f32733f8 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -646,10 +646,10 @@ Outer: } // CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). -// If there was a rewrite, then it returns the ULID of the new block written, else nil. -// If the resultant block is empty (tombstones covered the whole block), then it deletes the new block and return nil UID. +// If there was a rewrite, then it returns the ULID of new blocks written, else nil. +// If a resultant block is empty (tombstones covered the whole block), then it returns an empty slice. // It returns a boolean indicating if the parent block can be deleted safely of not. -func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, bool, error) { +func (pb *Block) CleanTombstones(dest string, c Compactor) ([]ulid.ULID, bool, error) { numStones := 0 if err := pb.tombstones.Iter(func(id storage.SeriesRef, ivs tombstones.Intervals) error { @@ -664,12 +664,12 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, bool, er } meta := pb.Meta() - uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta) + uids, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta) if err != nil { return nil, false, err } - return &uid, true, nil + return uids, true, nil } // Snapshot creates snapshot of the block into dir. diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 42acc3c69..f2569e35b 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -346,9 +346,10 @@ func TestBlockSize(t *testing.T) { c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) require.NoError(t, err) - blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) + blockDirsAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) require.NoError(t, err) - blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirAfterCompact.String()), nil) + require.Len(t, blockDirsAfterCompact, 1) + blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirsAfterCompact[0].String()), nil) require.NoError(t, err) defer func() { require.NoError(t, blockAfterCompact.Close()) @@ -605,9 +606,10 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). // Because of this block intervals are always +1 than the total samples it includes. - ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) + ulids, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) require.NoError(tb, err) - return filepath.Join(dir, ulid.String()) + require.Len(tb, ulids, 1) + return filepath.Join(dir, ulids[0].String()) } func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string { @@ -618,9 +620,10 @@ func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). // Because of this block intervals are always +1 than the total samples it includes. - ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) + ulids, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil) require.NoError(tb, err) - return filepath.Join(dir, ulid.String()) + require.Len(tb, ulids, 1) + return filepath.Join(dir, ulids[0].String()) } func createHead(tb testing.TB, w *wlog.WL, series []storage.Series, chunkDir string) *Head { diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index 32346d69d..232ec2b91 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -105,12 +105,17 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { if err != nil { return ulid.ULID{}, fmt.Errorf("create leveled compactor: %w", err) } - id, err := compactor.Write(w.destinationDir, w.head, mint, maxt, nil) + ids, err := compactor.Write(w.destinationDir, w.head, mint, maxt, nil) if err != nil { return ulid.ULID{}, fmt.Errorf("compactor write: %w", err) } - return id, nil + // No block was produced. Caller is responsible to check empty + // ulid.ULID based on its use case. + if len(ids) == 0 { + return ulid.ULID{}, nil + } + return ids[0], nil } func (w *BlockWriter) Close() error { diff --git a/tsdb/compact.go b/tsdb/compact.go index c2ae23b2e..3c921520f 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -58,19 +58,23 @@ type Compactor interface { // Results returned when compactions are in progress are undefined. Plan(dir string) ([]string, error) - // Write persists a Block into a directory. - // No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. - Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) (ulid.ULID, error) + // Write persists one or more Blocks into a directory. + // No Block is written when resulting Block has 0 samples and returns an empty slice. + // Prometheus always return one or no block. The interface allows returning more than one + // block for downstream users to experiment with compactor. + Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) ([]ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). // Can optionally pass a list of already open blocks, // to avoid having to reopen them. - // When resulting Block has 0 samples + // Prometheus always return one or no block. The interface allows returning more than one + // block for downstream users to experiment with compactor. + // When one resulting Block has 0 samples // * No block is written. // * The source dirs are marked Deletable. - // * Returns empty ulid.ULID{}. - Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) + // * Block is not included in the result. + Compact(dest string, dirs []string, open []*Block) ([]ulid.ULID, error) } // LeveledCompactor implements the Compactor interface. @@ -441,11 +445,11 @@ func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { // Compact creates a new block in the compactor's directory from the blocks in the // provided directories. -func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { +func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) ([]ulid.ULID, error) { return c.CompactWithBlockPopulator(dest, dirs, open, DefaultBlockPopulator{}) } -func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator) (uid ulid.ULID, err error) { +func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator) ([]ulid.ULID, error) { var ( blocks []BlockReader bs []*Block @@ -457,7 +461,7 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, for _, d := range dirs { meta, _, err := readMetaFile(d) if err != nil { - return uid, err + return nil, err } var b *Block @@ -475,7 +479,7 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, var err error b, err = OpenBlock(c.logger, d, c.chunkPool) if err != nil { - return uid, err + return nil, err } defer b.Close() } @@ -486,10 +490,10 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, uids = append(uids, meta.ULID.String()) } - uid = ulid.MustNew(ulid.Now(), rand.Reader) + uid := ulid.MustNew(ulid.Now(), rand.Reader) meta := CompactBlockMetas(uid, metas...) - err = c.write(dest, meta, blockPopulator, blocks...) + err := c.write(dest, meta, blockPopulator, blocks...) if err == nil { if meta.Stats.NumSamples == 0 { for _, b := range bs { @@ -503,25 +507,25 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, } b.numBytesMeta = n } - uid = ulid.ULID{} level.Info(c.logger).Log( "msg", "compact blocks resulted in empty block", "count", len(blocks), "sources", fmt.Sprintf("%v", uids), "duration", time.Since(start), ) - } else { - level.Info(c.logger).Log( - "msg", "compact blocks", - "count", len(blocks), - "mint", meta.MinTime, - "maxt", meta.MaxTime, - "ulid", meta.ULID, - "sources", fmt.Sprintf("%v", uids), - "duration", time.Since(start), - ) + return nil, nil } - return uid, nil + + level.Info(c.logger).Log( + "msg", "compact blocks", + "count", len(blocks), + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + ) + return []ulid.ULID{uid}, nil } errs := tsdb_errors.NewMulti(err) @@ -533,10 +537,10 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, } } - return uid, errs.Err() + return nil, errs.Err() } -func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) (ulid.ULID, error) { +func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) ([]ulid.ULID, error) { start := time.Now() uid := ulid.MustNew(ulid.Now(), rand.Reader) @@ -560,7 +564,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, b err := c.write(dest, meta, DefaultBlockPopulator{}, b) if err != nil { - return uid, err + return nil, err } if meta.Stats.NumSamples == 0 { @@ -570,7 +574,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, b "maxt", meta.MaxTime, "duration", time.Since(start), ) - return ulid.ULID{}, nil + return nil, nil } level.Info(c.logger).Log( @@ -581,7 +585,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, b "duration", time.Since(start), "ooo", meta.Compaction.FromOutOfOrder(), ) - return uid, nil + return []ulid.ULID{uid}, nil } // instrumentedChunkWriter is used for level 1 compactions to record statistics diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 7a353a556..5ce163f1e 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1484,12 +1484,12 @@ func TestHeadCompactionWithHistograms(t *testing.T) { maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) require.NoError(t, err) - id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil) + ids, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil) require.NoError(t, err) - require.NotEqual(t, ulid.ULID{}, id) + require.Len(t, ids, 1) // Open the block and query it and check the histograms. - block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, id.String()), nil) + block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, ids[0].String()), nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, block.Close()) @@ -1598,8 +1598,8 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { sparseApp := sparseHead.Appender(context.Background()) numOldSeriesPerHistogram := 0 - var oldULID ulid.ULID - var sparseULID ulid.ULID + var oldULIDs []ulid.ULID + var sparseULIDs []ulid.ULID var wg sync.WaitGroup @@ -1626,9 +1626,9 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) require.NoError(t, err) - sparseULID, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil) + sparseULIDs, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil) require.NoError(t, err) - require.NotEqual(t, ulid.ULID{}, sparseULID) + require.Len(t, sparseULIDs, 1) }() wg.Add(1) @@ -1677,15 +1677,15 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) require.NoError(t, err) - oldULID, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil) + oldULIDs, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil) require.NoError(t, err) - require.NotEqual(t, ulid.ULID{}, oldULID) + require.Len(t, oldULIDs, 1) }() wg.Wait() - oldBlockDir := filepath.Join(oldHead.opts.ChunkDirRoot, oldULID.String()) - sparseBlockDir := filepath.Join(sparseHead.opts.ChunkDirRoot, sparseULID.String()) + oldBlockDir := filepath.Join(oldHead.opts.ChunkDirRoot, oldULIDs[0].String()) + sparseBlockDir := filepath.Join(sparseHead.opts.ChunkDirRoot, sparseULIDs[0].String()) oldSize, err := fileutil.DirSize(oldBlockDir) require.NoError(t, err) @@ -1846,3 +1846,22 @@ func TestCompactBlockMetas(t *testing.T) { } require.Equal(t, expected, output) } + +func TestCompactEmptyResultBlockWithTombstone(t *testing.T) { + ctx := context.Background() + tmpdir := t.TempDir() + blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 10)) + block, err := OpenBlock(nil, blockDir, nil) + require.NoError(t, err) + // Write tombstone covering the whole block. + err = block.Delete(ctx, 0, 10, labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "0")) + require.NoError(t, err) + + c, err := NewLeveledCompactor(ctx, nil, log.NewNopLogger(), []int64{0}, nil, nil) + require.NoError(t, err) + + ulids, err := c.Compact(tmpdir, []string{blockDir}, []*Block{block}) + require.NoError(t, err) + require.Nil(t, ulids) + require.NoError(t, block.Close()) +} diff --git a/tsdb/db.go b/tsdb/db.go index 5651b403e..c44737c69 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1336,13 +1336,11 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID for t := blockSize * (oooHeadMint / blockSize); t <= oooHeadMaxt; t += blockSize { mint, maxt := t, t+blockSize // Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes. - uid, err := db.compactor.Write(dest, oooHead.CloneForTimeRange(mint, maxt-1), mint, maxt, meta) + uids, err := db.compactor.Write(dest, oooHead.CloneForTimeRange(mint, maxt-1), mint, maxt, meta) if err != nil { return nil, err } - if uid.Compare(ulid.ULID{}) != 0 { - ulids = append(ulids, uid) - } + ulids = append(ulids, uids...) } if len(ulids) == 0 { @@ -1364,19 +1362,19 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID // compactHead compacts the given RangeHead. // The compaction mutex should be held before calling this method. func (db *DB) compactHead(head *RangeHead) error { - uid, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil) + uids, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil) if err != nil { return fmt.Errorf("persist head block: %w", err) } if err := db.reloadBlocks(); err != nil { - if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil { - return tsdb_errors.NewMulti( - fmt.Errorf("reloadBlocks blocks: %w", err), - fmt.Errorf("delete persisted head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll), - ).Err() + multiErr := tsdb_errors.NewMulti(fmt.Errorf("reloadBlocks blocks: %w", err)) + for _, uid := range uids { + if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil { + multiErr.Add(fmt.Errorf("delete persisted head block after failed db reloadBlocks:%s: %w", uid, errRemoveAll)) + } } - return fmt.Errorf("reloadBlocks blocks: %w", err) + return multiErr.Err() } if err = db.head.truncateMemory(head.BlockMaxTime()); err != nil { return fmt.Errorf("head memory truncate: %w", err) @@ -1411,16 +1409,19 @@ func (db *DB) compactBlocks() (err error) { default: } - uid, err := db.compactor.Compact(db.dir, plan, db.blocks) + uids, err := db.compactor.Compact(db.dir, plan, db.blocks) if err != nil { return fmt.Errorf("compact %s: %w", plan, err) } if err := db.reloadBlocks(); err != nil { - if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { - return fmt.Errorf("delete compacted block after failed db reloadBlocks:%s: %w", uid, err) + errs := tsdb_errors.NewMulti(fmt.Errorf("reloadBlocks blocks: %w", err)) + for _, uid := range uids { + if errRemoveAll := os.RemoveAll(filepath.Join(db.dir, uid.String())); errRemoveAll != nil { + errs.Add(fmt.Errorf("delete persisted block after failed db reloadBlocks:%s: %w", uid, errRemoveAll)) + } } - return fmt.Errorf("reloadBlocks blocks: %w", err) + return errs.Err() } } @@ -1541,12 +1542,15 @@ func (db *DB) reloadBlocks() (err error) { oldBlocks := db.blocks db.blocks = toLoad - blockMetas := make([]BlockMeta, 0, len(toLoad)) - for _, b := range toLoad { - blockMetas = append(blockMetas, b.Meta()) - } - if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { - level.Warn(db.logger).Log("msg", "Overlapping blocks found during reloadBlocks", "detail", overlaps.String()) + // Only check overlapping blocks when overlapping compaction is enabled. + if db.opts.EnableOverlappingCompaction { + blockMetas := make([]BlockMeta, 0, len(toLoad)) + for _, b := range toLoad { + blockMetas = append(blockMetas, b.Meta()) + } + if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { + level.Warn(db.logger).Log("msg", "Overlapping blocks found during reloadBlocks", "detail", overlaps.String()) + } } // Append blocks to old, deletable blocks, so we can close them. @@ -2149,7 +2153,7 @@ func (db *DB) CleanTombstones() (err error) { cleanUpCompleted = true for _, pb := range db.Blocks() { - uid, safeToDelete, cleanErr := pb.CleanTombstones(db.Dir(), db.compactor) + uids, safeToDelete, cleanErr := pb.CleanTombstones(db.Dir(), db.compactor) if cleanErr != nil { return fmt.Errorf("clean tombstones: %s: %w", pb.Dir(), cleanErr) } @@ -2173,7 +2177,7 @@ func (db *DB) CleanTombstones() (err error) { } // Delete new block if it was created. - if uid != nil && *uid != (ulid.ULID{}) { + for _, uid := range uids { dir := filepath.Join(db.Dir(), uid.String()) if err := os.RemoveAll(dir); err != nil { level.Error(db.logger).Log("msg", "failed to delete block after failed `CleanTombstones`", "dir", dir, "err", err) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 69c9f60e3..3d2fb2d99 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1431,9 +1431,9 @@ func (*mockCompactorFailing) Plan(string) ([]string, error) { return nil, nil } -func (c *mockCompactorFailing) Write(dest string, _ BlockReader, _, _ int64, _ *BlockMeta) (ulid.ULID, error) { +func (c *mockCompactorFailing) Write(dest string, _ BlockReader, _, _ int64, _ *BlockMeta) ([]ulid.ULID, error) { if len(c.blocks) >= c.max { - return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") + return []ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil) @@ -1452,11 +1452,11 @@ func (c *mockCompactorFailing) Write(dest string, _ BlockReader, _, _ int64, _ * require.Equal(c.t, expectedBlocks, actualBlockDirs) - return block.Meta().ULID, nil + return []ulid.ULID{block.Meta().ULID}, nil } -func (*mockCompactorFailing) Compact(string, []string, []*Block) (ulid.ULID, error) { - return ulid.ULID{}, nil +func (*mockCompactorFailing) Compact(string, []string, []*Block) ([]ulid.ULID, error) { + return []ulid.ULID{}, nil } func (*mockCompactorFailing) CompactOOO(string, *OOOCompactionHead) (result []ulid.ULID, err error) { @@ -6804,9 +6804,9 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { for _, b := range blocks { blockDirs = append(blockDirs, b.Dir()) } - id, err := db.compactor.Compact(db.Dir(), blockDirs, blocks) + ids, err := db.compactor.Compact(db.Dir(), blockDirs, blocks) require.NoError(t, err) - require.NotEqual(t, ulid.ULID{}, id) + require.Len(t, ids, 1) require.NoError(t, db.reload()) require.Len(t, db.Blocks(), 1) @@ -7068,19 +7068,19 @@ func requireEqualOOOSamples(t *testing.T, expectedSamples int, db *DB) { type mockCompactorFn struct { planFn func() ([]string, error) - compactFn func() (ulid.ULID, error) - writeFn func() (ulid.ULID, error) + compactFn func() ([]ulid.ULID, error) + writeFn func() ([]ulid.ULID, error) } func (c *mockCompactorFn) Plan(_ string) ([]string, error) { return c.planFn() } -func (c *mockCompactorFn) Compact(_ string, _ []string, _ []*Block) (ulid.ULID, error) { +func (c *mockCompactorFn) Compact(_ string, _ []string, _ []*Block) ([]ulid.ULID, error) { return c.compactFn() } -func (c *mockCompactorFn) Write(_ string, _ BlockReader, _, _ int64, _ *BlockMeta) (ulid.ULID, error) { +func (c *mockCompactorFn) Write(_ string, _ BlockReader, _, _ int64, _ *BlockMeta) ([]ulid.ULID, error) { return c.writeFn() } @@ -7112,11 +7112,11 @@ func TestAbortBlockCompactions(t *testing.T) { // Our custom Plan() will always return something to compact. return []string{"1", "2", "3"}, nil }, - compactFn: func() (ulid.ULID, error) { - return ulid.ULID{}, nil + compactFn: func() ([]ulid.ULID, error) { + return []ulid.ULID{}, nil }, - writeFn: func() (ulid.ULID, error) { - return ulid.ULID{}, nil + writeFn: func() ([]ulid.ULID, error) { + return []ulid.ULID{}, nil }, } @@ -7135,11 +7135,11 @@ func TestNewCompactorFunc(t *testing.T) { planFn: func() ([]string, error) { return []string{block1.String(), block2.String()}, nil }, - compactFn: func() (ulid.ULID, error) { - return block1, nil + compactFn: func() ([]ulid.ULID, error) { + return []ulid.ULID{block1}, nil }, - writeFn: func() (ulid.ULID, error) { - return block2, nil + writeFn: func() ([]ulid.ULID, error) { + return []ulid.ULID{block2}, nil }, }, nil } @@ -7150,10 +7150,12 @@ func TestNewCompactorFunc(t *testing.T) { plans, err := db.compactor.Plan("") require.NoError(t, err) require.Equal(t, []string{block1.String(), block2.String()}, plans) - ulid, err := db.compactor.Compact("", nil, nil) + ulids, err := db.compactor.Compact("", nil, nil) require.NoError(t, err) - require.Equal(t, block1, ulid) - ulid, err = db.compactor.Write("", nil, 0, 1, nil) + require.Len(t, ulids, 1) + require.Equal(t, block1, ulids[0]) + ulids, err = db.compactor.Write("", nil, 0, 1, nil) require.NoError(t, err) - require.Equal(t, block2, ulid) + require.Len(t, ulids, 1) + require.Equal(t, block2, ulids[0]) }