From 0eb828c1790b4514022dc4d3cb071a102553ded9 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 9 Jun 2022 12:11:42 -0700 Subject: [PATCH] Add an option to enable overlapping compaction separately with overlapping queries Signed-off-by: Ganesh Vernekar --- cmd/compact/main.go | 2 +- cmd/prometheus/main.go | 3 +- cmd/promtool/rules_test.go | 6 +- tsdb/block_test.go | 4 +- tsdb/blockwriter.go | 2 +- tsdb/compact.go | 43 +++---- tsdb/compact_test.go | 225 +++++++++++++++++++++++++++++++++++-- tsdb/db.go | 43 ++++--- 8 files changed, 276 insertions(+), 52 deletions(-) diff --git a/cmd/compact/main.go b/cmd/compact/main.go index b9083f8af..7cc4b7593 100644 --- a/cmd/compact/main.go +++ b/cmd/compact/main.go @@ -71,7 +71,7 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() - c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil) + c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil, true) if err != nil { log.Fatalln("creating compator", err) } diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index c0848c766..e9a990e65 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -1523,7 +1523,8 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options { RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond), MaxBytes: int64(opts.MaxBytes), NoLockfile: opts.NoLockfile, - AllowOverlappingBlocks: opts.AllowOverlappingBlocks, + AllowOverlappingCompaction: opts.AllowOverlappingBlocks, + AllowOverlappingQueries: opts.AllowOverlappingBlocks, WALCompression: opts.WALCompression, HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize, StripeSize: opts.StripeSize, diff --git a/cmd/promtool/rules_test.go b/cmd/promtool/rules_test.go index decda0fe3..2367ae31b 100644 --- a/cmd/promtool/rules_test.go +++ b/cmd/promtool/rules_test.go @@ -117,7 +117,8 @@ func TestBackfillRuleIntegration(t *testing.T) { } opts := tsdb.DefaultOptions() - opts.AllowOverlappingBlocks = true + opts.AllowOverlappingQueries = true + opts.AllowOverlappingCompaction = true db, err := tsdb.Open(tmpDir, nil, nil, opts, nil) require.NoError(t, err) @@ -248,7 +249,8 @@ func TestBackfillLabels(t *testing.T) { } opts := tsdb.DefaultOptions() - opts.AllowOverlappingBlocks = true + opts.AllowOverlappingQueries = true + opts.AllowOverlappingCompaction = true db, err := tsdb.Open(tmpDir, nil, nil, opts, nil) require.NoError(t, err) diff --git a/tsdb/block_test.go b/tsdb/block_test.go index b068cb824..4757e1977 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -299,7 +299,7 @@ func TestBlockSize(t *testing.T) { require.NoError(t, err) require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size") - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true) require.NoError(t, err) blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) require.NoError(t, err) @@ -475,7 +475,7 @@ func createBlock(tb testing.TB, dir string, series []storage.Series) string { } func createBlockFromHead(tb testing.TB, dir string, head *Head) string { - compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil, true) require.NoError(tb, err) require.NoError(tb, os.MkdirAll(dir, 0o777)) diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index 064cd01c7..968569785 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -100,7 +100,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { nil, w.logger, []int64{w.blockSize}, - chunkenc.NewPool(), nil) + chunkenc.NewPool(), nil, true) if err != nil { return ulid.ULID{}, errors.Wrap(err, "create leveled compactor") } diff --git a/tsdb/compact.go b/tsdb/compact.go index 4732ac840..3e0bed049 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -79,13 +79,14 @@ type Compactor interface { // LeveledCompactor implements the Compactor interface. type LeveledCompactor struct { - metrics *compactorMetrics - logger log.Logger - ranges []int64 - chunkPool chunkenc.Pool - ctx context.Context - maxBlockChunkSegmentSize int64 - mergeFunc storage.VerticalChunkSeriesMergeFunc + metrics *compactorMetrics + logger log.Logger + ranges []int64 + chunkPool chunkenc.Pool + ctx context.Context + maxBlockChunkSegmentSize int64 + mergeFunc storage.VerticalChunkSeriesMergeFunc + enableOverlappingCompaction bool concurrencyOpts LeveledCompactorConcurrencyOptions } @@ -151,11 +152,11 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { } // NewLeveledCompactor returns a LeveledCompactor. -func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { - return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc) +func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) { + return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, enableOverlappingCompaction) } -func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { +func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, errors.Errorf("at least one range must be provided") } @@ -169,14 +170,15 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) } return &LeveledCompactor{ - ranges: ranges, - chunkPool: pool, - logger: l, - metrics: newCompactorMetrics(r), - ctx: ctx, - maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, - mergeFunc: mergeFunc, - concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(), + ranges: ranges, + chunkPool: pool, + logger: l, + metrics: newCompactorMetrics(r), + ctx: ctx, + maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, + mergeFunc: mergeFunc, + concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(), + enableOverlappingCompaction: enableOverlappingCompaction, }, nil } @@ -234,7 +236,7 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { if len(res) > 0 { return res, nil } - // No overlapping blocks, do compaction the usual way. + // No overlapping blocks or overlapping block compaction not allowed, do compaction the usual way. // We do not include a recently created block with max(minTime), so the block which was just created from WAL. // This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap. dms = dms[:len(dms)-1] @@ -307,6 +309,9 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { // selectOverlappingDirs returns all dirs with overlapping time ranges. // It expects sorted input by mint and returns the overlapping dirs in the same order as received. func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string { + if !c.enableOverlappingCompaction { + return nil + } if len(ds) < 2 { return nil } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 3a063c4bd..065dda9cf 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -161,7 +161,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) { }, } - c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil, true) require.NoError(t, err) c.plan(metas) @@ -175,7 +175,7 @@ func TestLeveledCompactor_plan(t *testing.T) { 180, 540, 1620, - }, nil, nil) + }, nil, nil, true) require.NoError(t, err) cases := map[string]struct { @@ -384,7 +384,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { 240, 720, 2160, - }, nil, nil) + }, nil, nil, true) require.NoError(t, err) cases := []struct { @@ -434,7 +434,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) { 240, 720, 2160, - }, nil, nil) + }, nil, nil, true) require.NoError(t, err) tmpdir := t.TempDir() @@ -528,7 +528,7 @@ func TestCompaction_CompactWithSplitting(t *testing.T) { for _, shardCount := range shardCounts { t.Run(fmt.Sprintf("series=%d, shards=%d", series, shardCount), func(t *testing.T) { - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true) require.NoError(t, err) blockIDs, err := c.CompactWithSplitting(dir, blockDirs, openBlocks, shardCount) @@ -662,7 +662,7 @@ func TestCompaction_CompactEmptyBlocks(t *testing.T) { blockDirs = append(blockDirs, bdir) } - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true) require.NoError(t, err) blockIDs, err := c.CompactWithSplitting(dir, blockDirs, nil, 5) @@ -1137,7 +1137,7 @@ func TestCompaction_populateBlock(t *testing.T) { blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt}) } - c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil, true) require.NoError(t, err) meta := &BlockMeta{ @@ -1259,7 +1259,7 @@ func BenchmarkCompaction(b *testing.B) { blockDirs = append(blockDirs, block.Dir()) } - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true) require.NoError(b, err) b.ResetTimer() @@ -1643,3 +1643,212 @@ func TestCompactBlockMetas(t *testing.T) { } require.Equal(t, expected, output) } + +func TestLeveledCompactor_plan_overlapping_disabled(t *testing.T) { + // This mimics our default ExponentialBlockRanges with min block size equals to 20. + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{ + 20, + 60, + 180, + 540, + 1620, + }, nil, nil, false) + require.NoError(t, err) + + cases := map[string]struct { + metas []dirMeta + expected []string + }{ + "Outside Range": { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + }, + expected: nil, + }, + "We should wait for four blocks of size 20 to appear before compacting.": { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + }, + expected: nil, + }, + `We should wait for a next block of size 20 to appear before compacting + the existing ones. We have three, but we ignore the fresh one from WAl`: { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 40, 60, nil), + }, + expected: nil, + }, + "Block to fill the entire parent range appeared – should be compacted": { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 40, 60, nil), + metaRange("4", 60, 80, nil), + }, + expected: []string{"1", "2", "3"}, + }, + `Block for the next parent range appeared with gap with size 20. Nothing will happen in the first one + anymore but we ignore fresh one still, so no compaction`: { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 60, 80, nil), + }, + expected: nil, + }, + `Block for the next parent range appeared, and we have a gap with size 20 between second and third block. + We will not get this missed gap anymore and we should compact just these two.`: { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 60, 80, nil), + metaRange("4", 80, 100, nil), + }, + expected: []string{"1", "2"}, + }, + "We have 20, 20, 20, 60, 60 range blocks. '5' is marked as fresh one": { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 40, 60, nil), + metaRange("4", 60, 120, nil), + metaRange("5", 120, 180, nil), + }, + expected: []string{"1", "2", "3"}, + }, + "We have 20, 60, 20, 60, 240 range blocks. We can compact 20 + 60 + 60": { + metas: []dirMeta{ + metaRange("2", 20, 40, nil), + metaRange("4", 60, 120, nil), + metaRange("5", 960, 980, nil), // Fresh one. + metaRange("6", 120, 180, nil), + metaRange("7", 720, 960, nil), + }, + expected: []string{"2", "4", "6"}, + }, + "Do not select large blocks that have many tombstones when there is no fresh block": { + metas: []dirMeta{ + metaRange("1", 0, 540, &BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }), + }, + expected: nil, + }, + "Select large blocks that have many tombstones when fresh appears": { + metas: []dirMeta{ + metaRange("1", 0, 540, &BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }), + metaRange("2", 540, 560, nil), + }, + expected: []string{"1"}, + }, + "For small blocks, do not compact tombstones, even when fresh appears.": { + metas: []dirMeta{ + metaRange("1", 0, 60, &BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }), + metaRange("2", 60, 80, nil), + }, + expected: nil, + }, + `Regression test: we were stuck in a compact loop where we always recompacted + the same block when tombstones and series counts were zero`: { + metas: []dirMeta{ + metaRange("1", 0, 540, &BlockStats{ + NumSeries: 0, + NumTombstones: 0, + }), + metaRange("2", 540, 560, nil), + }, + expected: nil, + }, + `Regression test: we were wrongly assuming that new block is fresh from WAL when its ULID is newest. + We need to actually look on max time instead. + With previous, wrong approach "8" block was ignored, so we were wrongly compacting 5 and 7 and introducing + block overlaps`: { + metas: []dirMeta{ + metaRange("5", 0, 360, nil), + metaRange("6", 540, 560, nil), // Fresh one. + metaRange("7", 360, 420, nil), + metaRange("8", 420, 540, nil), + }, + expected: []string{"7", "8"}, + }, + // |--------------| + // |----------------| + // |--------------| + "Overlapping blocks 1": { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 19, 40, nil), + metaRange("3", 40, 60, nil), + }, + expected: nil, + }, + // |--------------| + // |--------------| + // |--------------| + "Overlapping blocks 2": { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 30, 50, nil), + }, + expected: nil, + }, + // |--------------| + // |---------------------| + // |--------------| + "Overlapping blocks 3": { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 10, 40, nil), + metaRange("3", 30, 50, nil), + }, + expected: nil, + }, + // |--------------| + // |--------------------------------| + // |--------------| + // |--------------| + "Overlapping blocks 4": { + metas: []dirMeta{ + metaRange("5", 0, 360, nil), + metaRange("6", 340, 560, nil), + metaRange("7", 360, 420, nil), + metaRange("8", 420, 540, nil), + }, + expected: nil, + }, + // |--------------| + // |--------------| + // |--------------| + // |--------------| + "Overlapping blocks 5": { + metas: []dirMeta{ + metaRange("1", 0, 10, nil), + metaRange("2", 9, 20, nil), + metaRange("3", 30, 40, nil), + metaRange("4", 39, 50, nil), + }, + expected: nil, + }, + } + + for title, c := range cases { + if !t.Run(title, func(t *testing.T) { + res, err := compactor.plan(c.metas) + require.NoError(t, err) + require.Equal(t, c.expected, res) + }) { + return + } + } +} diff --git a/tsdb/db.go b/tsdb/db.go index e9e602988..497887dfa 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -71,19 +71,20 @@ var ErrNotReady = errors.New("TSDB not ready") // millisecond precision timestamps. func DefaultOptions() *Options { return &Options{ - WALSegmentSize: wal.DefaultSegmentSize, - MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize, - RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), - MinBlockDuration: DefaultBlockDuration, - MaxBlockDuration: DefaultBlockDuration, - NoLockfile: false, - AllowOverlappingBlocks: false, - WALCompression: false, - StripeSize: DefaultStripeSize, - HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, - IsolationDisabled: defaultIsolationDisabled, - HeadChunksEndTimeVariance: 0, - HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, + WALSegmentSize: wal.DefaultSegmentSize, + MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize, + RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), + MinBlockDuration: DefaultBlockDuration, + MaxBlockDuration: DefaultBlockDuration, + NoLockfile: false, + AllowOverlappingCompaction: false, + AllowOverlappingQueries: false, + WALCompression: false, + StripeSize: DefaultStripeSize, + HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize, + IsolationDisabled: defaultIsolationDisabled, + HeadChunksEndTimeVariance: 0, + HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize, } } @@ -115,9 +116,14 @@ type Options struct { // NoLockfile disables creation and consideration of a lock file. NoLockfile bool - // Overlapping blocks are allowed if AllowOverlappingBlocks is true. - // This in-turn enables vertical compaction and vertical query merge. - AllowOverlappingBlocks bool + // Querying on overlapping blocks are allowed if AllowOverlappingQueries is true. + // Since querying is a required operation for TSDB, if there are going to be + // overlapping blocks, then this should be set to true. + AllowOverlappingQueries bool + + // Compaction of overlapping blocks are allowed if AllowOverlappingCompaction is true. + // This is an optional flag for overlapping blocks. + AllowOverlappingCompaction bool // WALCompression will turn on Snappy compression for records on the WAL. WALCompression bool @@ -413,6 +419,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5), chunkenc.NewPool(), nil, + false, ) if err != nil { return errors.Wrap(err, "create leveled compactor") @@ -704,7 +711,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } ctx, cancel := context.WithCancel(context.Background()) - db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil) + db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, opts.AllowOverlappingCompaction) if err != nil { cancel() return nil, errors.Wrap(err, "create leveled compactor") @@ -1138,7 +1145,7 @@ func (db *DB) reloadBlocks() (err error) { sort.Slice(toLoad, func(i, j int) bool { return toLoad[i].Meta().MinTime < toLoad[j].Meta().MinTime }) - if !db.opts.AllowOverlappingBlocks { + if !db.opts.AllowOverlappingQueries { if err := validateBlockSequence(toLoad); err != nil { return errors.Wrap(err, "invalid block sequence") }