diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 3c2990cf3..c292cdcf5 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -323,7 +323,7 @@ func TestBlockSize(t *testing.T) { require.NoError(t, err) require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size") - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil) + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) require.NoError(t, err) blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) require.NoError(t, err) @@ -426,7 +426,7 @@ func createBlock(tb testing.TB, dir string, series []storage.Series) string { } func createBlockFromHead(tb testing.TB, dir string, head *Head) string { - compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil) require.NoError(tb, err) require.NoError(tb, os.MkdirAll(dir, 0777)) diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index 2d19c7705..95d6f2854 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -100,7 +100,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { nil, w.logger, []int64{w.blockSize}, - chunkenc.NewPool()) + chunkenc.NewPool(), nil) if err != nil { return ulid.ULID{}, errors.Wrap(err, "create leveled compactor") } diff --git a/tsdb/compact.go b/tsdb/compact.go index 9befdc552..ca7baa191 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -82,6 +82,7 @@ type LeveledCompactor struct { chunkPool chunkenc.Pool ctx context.Context maxBlockChunkSegmentSize int64 + mergeFunc storage.VerticalChunkSeriesMergeFunc } type compactorMetrics struct { @@ -145,11 +146,11 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { } // NewLeveledCompactor returns a LeveledCompactor. -func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { - return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize) +func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { + return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc) } -func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64) (*LeveledCompactor, error) { +func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, errors.Errorf("at least one range must be provided") } @@ -159,6 +160,9 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register if l == nil { l = log.NewNopLogger() } + if mergeFunc == nil { + mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) + } return &LeveledCompactor{ ranges: ranges, chunkPool: pool, @@ -166,6 +170,7 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register metrics: newCompactorMetrics(r), ctx: ctx, maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, + mergeFunc: mergeFunc, }, nil } @@ -746,8 +751,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, set := sets[0] if len(sets) > 1 { - // Merge series using compacting chunk series merger. - set = storage.NewMergeChunkSeriesSet(sets, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)) + // Merge series using specified chunk series merger. + // The default one is the compacting series merger. + set = storage.NewMergeChunkSeriesSet(sets, c.mergeFunc) } // Iterate over all sorted chunk series. diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index a08464216..255a4c14d 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -158,7 +158,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) { }, } - c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil) require.NoError(t, err) c.plan(metas) @@ -172,7 +172,7 @@ func TestLeveledCompactor_plan(t *testing.T) { 180, 540, 1620, - }, nil) + }, nil, nil) require.NoError(t, err) cases := map[string]struct { @@ -381,7 +381,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { 240, 720, 2160, - }, nil) + }, nil, nil) require.NoError(t, err) cases := []struct { @@ -431,7 +431,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) { 240, 720, 2160, - }, nil) + }, nil, nil) require.NoError(t, err) tmpdir, err := ioutil.TempDir("", "test") @@ -940,7 +940,7 @@ func TestCompaction_populateBlock(t *testing.T) { blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt}) } - c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil) require.NoError(t, err) meta := &BlockMeta{ @@ -1065,7 +1065,7 @@ func BenchmarkCompaction(b *testing.B) { blockDirs = append(blockDirs, block.Dir()) } - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil) + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) require.NoError(b, err) b.ResetTimer() diff --git a/tsdb/db.go b/tsdb/db.go index d6036f5e5..2bb56a85c 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -370,6 +370,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { db.logger, ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5), chunkenc.NewPool(), + nil, ) if err != nil { return errors.Wrap(err, "create leveled compactor") @@ -648,7 +649,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs var err error ctx, cancel := context.WithCancel(context.Background()) - db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize) + db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil) if err != nil { cancel() return nil, errors.Wrap(err, "create leveled compactor")