allow compact series merger to be configurable (#8836)

Signed-off-by: yeya24 <yb532204897@gmail.com>
This commit is contained in:
Ben Ye 2021-05-18 12:38:37 -04:00 committed by GitHub
parent 7e7efaba32
commit 0a8912433a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 22 additions and 15 deletions

View file

@ -323,7 +323,7 @@ func TestBlockSize(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size") require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size")
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil) c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
require.NoError(t, err) require.NoError(t, err)
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
require.NoError(t, err) require.NoError(t, err)
@ -426,7 +426,7 @@ func createBlock(tb testing.TB, dir string, series []storage.Series) string {
} }
func createBlockFromHead(tb testing.TB, dir string, head *Head) string { func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil) compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil)
require.NoError(tb, err) require.NoError(tb, err)
require.NoError(tb, os.MkdirAll(dir, 0777)) require.NoError(tb, os.MkdirAll(dir, 0777))

View file

@ -100,7 +100,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
nil, nil,
w.logger, w.logger,
[]int64{w.blockSize}, []int64{w.blockSize},
chunkenc.NewPool()) chunkenc.NewPool(), nil)
if err != nil { if err != nil {
return ulid.ULID{}, errors.Wrap(err, "create leveled compactor") return ulid.ULID{}, errors.Wrap(err, "create leveled compactor")
} }

View file

@ -82,6 +82,7 @@ type LeveledCompactor struct {
chunkPool chunkenc.Pool chunkPool chunkenc.Pool
ctx context.Context ctx context.Context
maxBlockChunkSegmentSize int64 maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc
} }
type compactorMetrics struct { type compactorMetrics struct {
@ -145,11 +146,11 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
} }
// NewLeveledCompactor returns a LeveledCompactor. // NewLeveledCompactor returns a LeveledCompactor.
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize) return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc)
} }
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64) (*LeveledCompactor, error) { func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
if len(ranges) == 0 { if len(ranges) == 0 {
return nil, errors.Errorf("at least one range must be provided") return nil, errors.Errorf("at least one range must be provided")
} }
@ -159,6 +160,9 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
} }
if mergeFunc == nil {
mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)
}
return &LeveledCompactor{ return &LeveledCompactor{
ranges: ranges, ranges: ranges,
chunkPool: pool, chunkPool: pool,
@ -166,6 +170,7 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
metrics: newCompactorMetrics(r), metrics: newCompactorMetrics(r),
ctx: ctx, ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc,
}, nil }, nil
} }
@ -746,8 +751,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
set := sets[0] set := sets[0]
if len(sets) > 1 { if len(sets) > 1 {
// Merge series using compacting chunk series merger. // Merge series using specified chunk series merger.
set = storage.NewMergeChunkSeriesSet(sets, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)) // The default one is the compacting series merger.
set = storage.NewMergeChunkSeriesSet(sets, c.mergeFunc)
} }
// Iterate over all sorted chunk series. // Iterate over all sorted chunk series.

View file

@ -158,7 +158,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) {
}, },
} }
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil) c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil)
require.NoError(t, err) require.NoError(t, err)
c.plan(metas) c.plan(metas)
@ -172,7 +172,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
180, 180,
540, 540,
1620, 1620,
}, nil) }, nil, nil)
require.NoError(t, err) require.NoError(t, err)
cases := map[string]struct { cases := map[string]struct {
@ -381,7 +381,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
240, 240,
720, 720,
2160, 2160,
}, nil) }, nil, nil)
require.NoError(t, err) require.NoError(t, err)
cases := []struct { cases := []struct {
@ -431,7 +431,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
240, 240,
720, 720,
2160, 2160,
}, nil) }, nil, nil)
require.NoError(t, err) require.NoError(t, err)
tmpdir, err := ioutil.TempDir("", "test") tmpdir, err := ioutil.TempDir("", "test")
@ -940,7 +940,7 @@ func TestCompaction_populateBlock(t *testing.T) {
blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt}) blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt})
} }
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil) c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil)
require.NoError(t, err) require.NoError(t, err)
meta := &BlockMeta{ meta := &BlockMeta{
@ -1065,7 +1065,7 @@ func BenchmarkCompaction(b *testing.B) {
blockDirs = append(blockDirs, block.Dir()) blockDirs = append(blockDirs, block.Dir())
} }
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil) c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
require.NoError(b, err) require.NoError(b, err)
b.ResetTimer() b.ResetTimer()

View file

@ -370,6 +370,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
db.logger, db.logger,
ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5), ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5),
chunkenc.NewPool(), chunkenc.NewPool(),
nil,
) )
if err != nil { if err != nil {
return errors.Wrap(err, "create leveled compactor") return errors.Wrap(err, "create leveled compactor")
@ -648,7 +649,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
var err error var err error
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize) db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil)
if err != nil { if err != nil {
cancel() cancel()
return nil, errors.Wrap(err, "create leveled compactor") return nil, errors.Wrap(err, "create leveled compactor")