Merge pull request #13393 from vinted/disable_overlapping_compaction

This commit is contained in:
gotjosh 2024-01-15 08:37:24 +00:00 committed by GitHub
commit 4fbd406a36
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -75,14 +75,15 @@ type Compactor interface {
// LeveledCompactor implements the Compactor interface.
type LeveledCompactor struct {
metrics *CompactorMetrics
logger log.Logger
ranges []int64
chunkPool chunkenc.Pool
ctx context.Context
maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc
postingsEncoder index.PostingsEncoder
metrics *CompactorMetrics
logger log.Logger
ranges []int64
chunkPool chunkenc.Pool
ctx context.Context
maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc
postingsEncoder index.PostingsEncoder
enableOverlappingCompaction bool
}
type CompactorMetrics struct {
@ -153,18 +154,23 @@ type LeveledCompactorOptions struct {
MaxBlockChunkSegmentSize int64
// MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used.
MergeFunc storage.VerticalChunkSeriesMergeFunc
// EnableOverlappingCompaction enables compaction of overlapping blocks. In Prometheus it is always enabled.
// It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction.
EnableOverlappingCompaction bool
}
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
MergeFunc: mergeFunc,
MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
MergeFunc: mergeFunc,
EnableOverlappingCompaction: true,
})
}
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
MergeFunc: mergeFunc,
MergeFunc: mergeFunc,
EnableOverlappingCompaction: true,
})
}
@ -191,14 +197,15 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer
pe = index.EncodePostingsRaw
}
return &LeveledCompactor{
ranges: ranges,
chunkPool: pool,
logger: l,
metrics: newCompactorMetrics(r),
ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc,
postingsEncoder: pe,
ranges: ranges,
chunkPool: pool,
logger: l,
metrics: newCompactorMetrics(r),
ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc,
postingsEncoder: pe,
enableOverlappingCompaction: opts.EnableOverlappingCompaction,
}, nil
}
@ -317,6 +324,9 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
// selectOverlappingDirs returns all dirs with overlapping time ranges.
// It expects sorted input by mint and returns the overlapping dirs in the same order as received.
func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string {
if !c.enableOverlappingCompaction {
return nil
}
if len(ds) < 2 {
return nil
}