tsdb: Allow passing a custom compactor to override the default one (#14113)

* expose hook in tsdb to allow customizing compactor

Signed-off-by: Ben Ye <benye@amazon.com>

* address comment

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
This commit is contained in:
Ben Ye 2024-06-04 16:11:36 -07:00 committed by GitHub
parent 8c8ddd01d5
commit 8a08f452b6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 46 additions and 5 deletions

View file

@ -189,8 +189,13 @@ type Options struct {
// EnableSharding enables query sharding support in TSDB. // EnableSharding enables query sharding support in TSDB.
EnableSharding bool EnableSharding bool
// NewCompactorFunc is a function that returns a TSDB compactor.
NewCompactorFunc NewCompactorFunc
} }
type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)
type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
// DB handles reads and writes of time series falling into // DB handles reads and writes of time series falling into
@ -851,13 +856,17 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
if opts.NewCompactorFunc != nil {
db.compactor, err = opts.NewCompactorFunc(ctx, r, l, rngs, db.chunkPool, opts)
} else {
db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{ db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction, EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
}) })
}
if err != nil { if err != nil {
cancel() cancel()
return nil, fmt.Errorf("create leveled compactor: %w", err) return nil, fmt.Errorf("create compactor: %w", err)
} }
db.compactCancel = cancel db.compactCancel = cancel

View file

@ -7125,3 +7125,35 @@ func TestAbortBlockCompactions(t *testing.T) {
require.True(t, db.head.compactable(), "head should be compactable") require.True(t, db.head.compactable(), "head should be compactable")
require.Equal(t, 4, compactions, "expected 4 compactions to be completed") require.Equal(t, 4, compactions, "expected 4 compactions to be completed")
} }
func TestNewCompactorFunc(t *testing.T) {
opts := DefaultOptions()
block1 := ulid.MustNew(1, nil)
block2 := ulid.MustNew(2, nil)
opts.NewCompactorFunc = func(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error) {
return &mockCompactorFn{
planFn: func() ([]string, error) {
return []string{block1.String(), block2.String()}, nil
},
compactFn: func() (ulid.ULID, error) {
return block1, nil
},
writeFn: func() (ulid.ULID, error) {
return block2, nil
},
}, nil
}
db := openTestDB(t, opts, nil)
defer func() {
require.NoError(t, db.Close())
}()
plans, err := db.compactor.Plan("")
require.NoError(t, err)
require.Equal(t, []string{block1.String(), block2.String()}, plans)
ulid, err := db.compactor.Compact("", nil, nil)
require.NoError(t, err)
require.Equal(t, block1, ulid)
ulid, err = db.compactor.Write("", nil, 0, 1, nil)
require.NoError(t, err)
require.Equal(t, block2, ulid)
}