tsdb: add enable overlapping compaction

This functionality is needed in downstream projects because they have a
separate component that does compaction.

Upstreaming
7c8e9a2a76/tsdb/compact.go (L323-L325).

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
This commit is contained in:
Giedrius Statkevičius 2024-01-12 09:56:44 +02:00
parent 6150e1ca0e
commit 3a48adc54f

View file

@ -75,14 +75,15 @@ type Compactor interface {
// LeveledCompactor implements the Compactor interface. // LeveledCompactor implements the Compactor interface.
type LeveledCompactor struct { type LeveledCompactor struct {
metrics *CompactorMetrics metrics *CompactorMetrics
logger log.Logger logger log.Logger
ranges []int64 ranges []int64
chunkPool chunkenc.Pool chunkPool chunkenc.Pool
ctx context.Context ctx context.Context
maxBlockChunkSegmentSize int64 maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc mergeFunc storage.VerticalChunkSeriesMergeFunc
postingsEncoder index.PostingsEncoder postingsEncoder index.PostingsEncoder
enableOverlappingCompaction bool
} }
type CompactorMetrics struct { type CompactorMetrics struct {
@ -153,18 +154,23 @@ type LeveledCompactorOptions struct {
MaxBlockChunkSegmentSize int64 MaxBlockChunkSegmentSize int64
// MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used. // MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used.
MergeFunc storage.VerticalChunkSeriesMergeFunc MergeFunc storage.VerticalChunkSeriesMergeFunc
// EnableOverlappingCompaction enables compaction of overlapping blocks. In Prometheus it is always enabled.
// It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction.
EnableOverlappingCompaction bool
} }
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize, MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
MergeFunc: mergeFunc, MergeFunc: mergeFunc,
EnableOverlappingCompaction: true,
}) })
} }
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
MergeFunc: mergeFunc, MergeFunc: mergeFunc,
EnableOverlappingCompaction: true,
}) })
} }
@ -191,14 +197,15 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer
pe = index.EncodePostingsRaw pe = index.EncodePostingsRaw
} }
return &LeveledCompactor{ return &LeveledCompactor{
ranges: ranges, ranges: ranges,
chunkPool: pool, chunkPool: pool,
logger: l, logger: l,
metrics: newCompactorMetrics(r), metrics: newCompactorMetrics(r),
ctx: ctx, ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc, mergeFunc: mergeFunc,
postingsEncoder: pe, postingsEncoder: pe,
enableOverlappingCompaction: opts.EnableOverlappingCompaction,
}, nil }, nil
} }
@ -317,6 +324,9 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
// selectOverlappingDirs returns all dirs with overlapping time ranges. // selectOverlappingDirs returns all dirs with overlapping time ranges.
// It expects sorted input by mint and returns the overlapping dirs in the same order as received. // It expects sorted input by mint and returns the overlapping dirs in the same order as received.
func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string { func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string {
if !c.enableOverlappingCompaction {
return nil
}
if len(ds) < 2 { if len(ds) < 2 {
return nil return nil
} }