mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 13:57:36 -08:00
Add an option to enable overlapping compaction separately with overlapping queries
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
parent
55236be04a
commit
0eb828c179
|
@ -71,7 +71,7 @@ func main() {
|
||||||
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil)
|
c, err := tsdb.NewLeveledCompactorWithChunkSize(ctx, nil, logger, []int64{0}, nil, segmentSizeMB*1024*1024, nil, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("creating compator", err)
|
log.Fatalln("creating compator", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1523,7 +1523,8 @@ func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
|
||||||
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
|
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
|
||||||
MaxBytes: int64(opts.MaxBytes),
|
MaxBytes: int64(opts.MaxBytes),
|
||||||
NoLockfile: opts.NoLockfile,
|
NoLockfile: opts.NoLockfile,
|
||||||
AllowOverlappingBlocks: opts.AllowOverlappingBlocks,
|
AllowOverlappingCompaction: opts.AllowOverlappingBlocks,
|
||||||
|
AllowOverlappingQueries: opts.AllowOverlappingBlocks,
|
||||||
WALCompression: opts.WALCompression,
|
WALCompression: opts.WALCompression,
|
||||||
HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize,
|
HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize,
|
||||||
StripeSize: opts.StripeSize,
|
StripeSize: opts.StripeSize,
|
||||||
|
|
|
@ -117,7 +117,8 @@ func TestBackfillRuleIntegration(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := tsdb.DefaultOptions()
|
opts := tsdb.DefaultOptions()
|
||||||
opts.AllowOverlappingBlocks = true
|
opts.AllowOverlappingQueries = true
|
||||||
|
opts.AllowOverlappingCompaction = true
|
||||||
db, err := tsdb.Open(tmpDir, nil, nil, opts, nil)
|
db, err := tsdb.Open(tmpDir, nil, nil, opts, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -248,7 +249,8 @@ func TestBackfillLabels(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := tsdb.DefaultOptions()
|
opts := tsdb.DefaultOptions()
|
||||||
opts.AllowOverlappingBlocks = true
|
opts.AllowOverlappingQueries = true
|
||||||
|
opts.AllowOverlappingCompaction = true
|
||||||
db, err := tsdb.Open(tmpDir, nil, nil, opts, nil)
|
db, err := tsdb.Open(tmpDir, nil, nil, opts, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
|
@ -299,7 +299,7 @@ func TestBlockSize(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size")
|
require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size")
|
||||||
|
|
||||||
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
|
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
|
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -475,7 +475,7 @@ func createBlock(tb testing.TB, dir string, series []storage.Series) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
|
func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
|
||||||
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil)
|
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil, true)
|
||||||
require.NoError(tb, err)
|
require.NoError(tb, err)
|
||||||
|
|
||||||
require.NoError(tb, os.MkdirAll(dir, 0o777))
|
require.NoError(tb, os.MkdirAll(dir, 0o777))
|
||||||
|
|
|
@ -100,7 +100,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
|
||||||
nil,
|
nil,
|
||||||
w.logger,
|
w.logger,
|
||||||
[]int64{w.blockSize},
|
[]int64{w.blockSize},
|
||||||
chunkenc.NewPool(), nil)
|
chunkenc.NewPool(), nil, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ulid.ULID{}, errors.Wrap(err, "create leveled compactor")
|
return ulid.ULID{}, errors.Wrap(err, "create leveled compactor")
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,13 +79,14 @@ type Compactor interface {
|
||||||
|
|
||||||
// LeveledCompactor implements the Compactor interface.
|
// LeveledCompactor implements the Compactor interface.
|
||||||
type LeveledCompactor struct {
|
type LeveledCompactor struct {
|
||||||
metrics *compactorMetrics
|
metrics *compactorMetrics
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
ranges []int64
|
ranges []int64
|
||||||
chunkPool chunkenc.Pool
|
chunkPool chunkenc.Pool
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
maxBlockChunkSegmentSize int64
|
maxBlockChunkSegmentSize int64
|
||||||
mergeFunc storage.VerticalChunkSeriesMergeFunc
|
mergeFunc storage.VerticalChunkSeriesMergeFunc
|
||||||
|
enableOverlappingCompaction bool
|
||||||
|
|
||||||
concurrencyOpts LeveledCompactorConcurrencyOptions
|
concurrencyOpts LeveledCompactorConcurrencyOptions
|
||||||
}
|
}
|
||||||
|
@ -151,11 +152,11 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLeveledCompactor returns a LeveledCompactor.
|
// NewLeveledCompactor returns a LeveledCompactor.
|
||||||
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
|
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) {
|
||||||
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc)
|
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, enableOverlappingCompaction)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
|
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, enableOverlappingCompaction bool) (*LeveledCompactor, error) {
|
||||||
if len(ranges) == 0 {
|
if len(ranges) == 0 {
|
||||||
return nil, errors.Errorf("at least one range must be provided")
|
return nil, errors.Errorf("at least one range must be provided")
|
||||||
}
|
}
|
||||||
|
@ -169,14 +170,15 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
|
||||||
mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)
|
mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)
|
||||||
}
|
}
|
||||||
return &LeveledCompactor{
|
return &LeveledCompactor{
|
||||||
ranges: ranges,
|
ranges: ranges,
|
||||||
chunkPool: pool,
|
chunkPool: pool,
|
||||||
logger: l,
|
logger: l,
|
||||||
metrics: newCompactorMetrics(r),
|
metrics: newCompactorMetrics(r),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
|
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
|
||||||
mergeFunc: mergeFunc,
|
mergeFunc: mergeFunc,
|
||||||
concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(),
|
concurrencyOpts: DefaultLeveledCompactorConcurrencyOptions(),
|
||||||
|
enableOverlappingCompaction: enableOverlappingCompaction,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -234,7 +236,7 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
|
||||||
if len(res) > 0 {
|
if len(res) > 0 {
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
// No overlapping blocks, do compaction the usual way.
|
// No overlapping blocks or overlapping block compaction not allowed, do compaction the usual way.
|
||||||
// We do not include a recently created block with max(minTime), so the block which was just created from WAL.
|
// We do not include a recently created block with max(minTime), so the block which was just created from WAL.
|
||||||
// This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap.
|
// This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap.
|
||||||
dms = dms[:len(dms)-1]
|
dms = dms[:len(dms)-1]
|
||||||
|
@ -307,6 +309,9 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
|
||||||
// selectOverlappingDirs returns all dirs with overlapping time ranges.
|
// selectOverlappingDirs returns all dirs with overlapping time ranges.
|
||||||
// It expects sorted input by mint and returns the overlapping dirs in the same order as received.
|
// It expects sorted input by mint and returns the overlapping dirs in the same order as received.
|
||||||
func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string {
|
func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string {
|
||||||
|
if !c.enableOverlappingCompaction {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
if len(ds) < 2 {
|
if len(ds) < 2 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,7 +161,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil)
|
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
c.plan(metas)
|
c.plan(metas)
|
||||||
|
@ -175,7 +175,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
|
||||||
180,
|
180,
|
||||||
540,
|
540,
|
||||||
1620,
|
1620,
|
||||||
}, nil, nil)
|
}, nil, nil, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
cases := map[string]struct {
|
cases := map[string]struct {
|
||||||
|
@ -384,7 +384,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
|
||||||
240,
|
240,
|
||||||
720,
|
720,
|
||||||
2160,
|
2160,
|
||||||
}, nil, nil)
|
}, nil, nil, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
|
@ -434,7 +434,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
|
||||||
240,
|
240,
|
||||||
720,
|
720,
|
||||||
2160,
|
2160,
|
||||||
}, nil, nil)
|
}, nil, nil, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
tmpdir := t.TempDir()
|
tmpdir := t.TempDir()
|
||||||
|
@ -528,7 +528,7 @@ func TestCompaction_CompactWithSplitting(t *testing.T) {
|
||||||
|
|
||||||
for _, shardCount := range shardCounts {
|
for _, shardCount := range shardCounts {
|
||||||
t.Run(fmt.Sprintf("series=%d, shards=%d", series, shardCount), func(t *testing.T) {
|
t.Run(fmt.Sprintf("series=%d, shards=%d", series, shardCount), func(t *testing.T) {
|
||||||
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
|
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, openBlocks, shardCount)
|
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, openBlocks, shardCount)
|
||||||
|
@ -662,7 +662,7 @@ func TestCompaction_CompactEmptyBlocks(t *testing.T) {
|
||||||
blockDirs = append(blockDirs, bdir)
|
blockDirs = append(blockDirs, bdir)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
|
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, nil, 5)
|
blockIDs, err := c.CompactWithSplitting(dir, blockDirs, nil, 5)
|
||||||
|
@ -1137,7 +1137,7 @@ func TestCompaction_populateBlock(t *testing.T) {
|
||||||
blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt})
|
blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt})
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil)
|
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
meta := &BlockMeta{
|
meta := &BlockMeta{
|
||||||
|
@ -1259,7 +1259,7 @@ func BenchmarkCompaction(b *testing.B) {
|
||||||
blockDirs = append(blockDirs, block.Dir())
|
blockDirs = append(blockDirs, block.Dir())
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
|
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, true)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
@ -1643,3 +1643,212 @@ func TestCompactBlockMetas(t *testing.T) {
|
||||||
}
|
}
|
||||||
require.Equal(t, expected, output)
|
require.Equal(t, expected, output)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLeveledCompactor_plan_overlapping_disabled(t *testing.T) {
|
||||||
|
// This mimics our default ExponentialBlockRanges with min block size equals to 20.
|
||||||
|
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{
|
||||||
|
20,
|
||||||
|
60,
|
||||||
|
180,
|
||||||
|
540,
|
||||||
|
1620,
|
||||||
|
}, nil, nil, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
cases := map[string]struct {
|
||||||
|
metas []dirMeta
|
||||||
|
expected []string
|
||||||
|
}{
|
||||||
|
"Outside Range": {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 20, nil),
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
"We should wait for four blocks of size 20 to appear before compacting.": {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 20, nil),
|
||||||
|
metaRange("2", 20, 40, nil),
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
`We should wait for a next block of size 20 to appear before compacting
|
||||||
|
the existing ones. We have three, but we ignore the fresh one from WAl`: {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 20, nil),
|
||||||
|
metaRange("2", 20, 40, nil),
|
||||||
|
metaRange("3", 40, 60, nil),
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
"Block to fill the entire parent range appeared – should be compacted": {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 20, nil),
|
||||||
|
metaRange("2", 20, 40, nil),
|
||||||
|
metaRange("3", 40, 60, nil),
|
||||||
|
metaRange("4", 60, 80, nil),
|
||||||
|
},
|
||||||
|
expected: []string{"1", "2", "3"},
|
||||||
|
},
|
||||||
|
`Block for the next parent range appeared with gap with size 20. Nothing will happen in the first one
|
||||||
|
anymore but we ignore fresh one still, so no compaction`: {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 20, nil),
|
||||||
|
metaRange("2", 20, 40, nil),
|
||||||
|
metaRange("3", 60, 80, nil),
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
`Block for the next parent range appeared, and we have a gap with size 20 between second and third block.
|
||||||
|
We will not get this missed gap anymore and we should compact just these two.`: {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 20, nil),
|
||||||
|
metaRange("2", 20, 40, nil),
|
||||||
|
metaRange("3", 60, 80, nil),
|
||||||
|
metaRange("4", 80, 100, nil),
|
||||||
|
},
|
||||||
|
expected: []string{"1", "2"},
|
||||||
|
},
|
||||||
|
"We have 20, 20, 20, 60, 60 range blocks. '5' is marked as fresh one": {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 20, nil),
|
||||||
|
metaRange("2", 20, 40, nil),
|
||||||
|
metaRange("3", 40, 60, nil),
|
||||||
|
metaRange("4", 60, 120, nil),
|
||||||
|
metaRange("5", 120, 180, nil),
|
||||||
|
},
|
||||||
|
expected: []string{"1", "2", "3"},
|
||||||
|
},
|
||||||
|
"We have 20, 60, 20, 60, 240 range blocks. We can compact 20 + 60 + 60": {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("2", 20, 40, nil),
|
||||||
|
metaRange("4", 60, 120, nil),
|
||||||
|
metaRange("5", 960, 980, nil), // Fresh one.
|
||||||
|
metaRange("6", 120, 180, nil),
|
||||||
|
metaRange("7", 720, 960, nil),
|
||||||
|
},
|
||||||
|
expected: []string{"2", "4", "6"},
|
||||||
|
},
|
||||||
|
"Do not select large blocks that have many tombstones when there is no fresh block": {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 540, &BlockStats{
|
||||||
|
NumSeries: 10,
|
||||||
|
NumTombstones: 3,
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
"Select large blocks that have many tombstones when fresh appears": {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 540, &BlockStats{
|
||||||
|
NumSeries: 10,
|
||||||
|
NumTombstones: 3,
|
||||||
|
}),
|
||||||
|
metaRange("2", 540, 560, nil),
|
||||||
|
},
|
||||||
|
expected: []string{"1"},
|
||||||
|
},
|
||||||
|
"For small blocks, do not compact tombstones, even when fresh appears.": {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 60, &BlockStats{
|
||||||
|
NumSeries: 10,
|
||||||
|
NumTombstones: 3,
|
||||||
|
}),
|
||||||
|
metaRange("2", 60, 80, nil),
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
`Regression test: we were stuck in a compact loop where we always recompacted
|
||||||
|
the same block when tombstones and series counts were zero`: {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 540, &BlockStats{
|
||||||
|
NumSeries: 0,
|
||||||
|
NumTombstones: 0,
|
||||||
|
}),
|
||||||
|
metaRange("2", 540, 560, nil),
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
`Regression test: we were wrongly assuming that new block is fresh from WAL when its ULID is newest.
|
||||||
|
We need to actually look on max time instead.
|
||||||
|
With previous, wrong approach "8" block was ignored, so we were wrongly compacting 5 and 7 and introducing
|
||||||
|
block overlaps`: {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("5", 0, 360, nil),
|
||||||
|
metaRange("6", 540, 560, nil), // Fresh one.
|
||||||
|
metaRange("7", 360, 420, nil),
|
||||||
|
metaRange("8", 420, 540, nil),
|
||||||
|
},
|
||||||
|
expected: []string{"7", "8"},
|
||||||
|
},
|
||||||
|
// |--------------|
|
||||||
|
// |----------------|
|
||||||
|
// |--------------|
|
||||||
|
"Overlapping blocks 1": {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 20, nil),
|
||||||
|
metaRange("2", 19, 40, nil),
|
||||||
|
metaRange("3", 40, 60, nil),
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
// |--------------|
|
||||||
|
// |--------------|
|
||||||
|
// |--------------|
|
||||||
|
"Overlapping blocks 2": {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 20, nil),
|
||||||
|
metaRange("2", 20, 40, nil),
|
||||||
|
metaRange("3", 30, 50, nil),
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
// |--------------|
|
||||||
|
// |---------------------|
|
||||||
|
// |--------------|
|
||||||
|
"Overlapping blocks 3": {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 20, nil),
|
||||||
|
metaRange("2", 10, 40, nil),
|
||||||
|
metaRange("3", 30, 50, nil),
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
// |--------------|
|
||||||
|
// |--------------------------------|
|
||||||
|
// |--------------|
|
||||||
|
// |--------------|
|
||||||
|
"Overlapping blocks 4": {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("5", 0, 360, nil),
|
||||||
|
metaRange("6", 340, 560, nil),
|
||||||
|
metaRange("7", 360, 420, nil),
|
||||||
|
metaRange("8", 420, 540, nil),
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
// |--------------|
|
||||||
|
// |--------------|
|
||||||
|
// |--------------|
|
||||||
|
// |--------------|
|
||||||
|
"Overlapping blocks 5": {
|
||||||
|
metas: []dirMeta{
|
||||||
|
metaRange("1", 0, 10, nil),
|
||||||
|
metaRange("2", 9, 20, nil),
|
||||||
|
metaRange("3", 30, 40, nil),
|
||||||
|
metaRange("4", 39, 50, nil),
|
||||||
|
},
|
||||||
|
expected: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for title, c := range cases {
|
||||||
|
if !t.Run(title, func(t *testing.T) {
|
||||||
|
res, err := compactor.plan(c.metas)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, c.expected, res)
|
||||||
|
}) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
43
tsdb/db.go
43
tsdb/db.go
|
@ -71,19 +71,20 @@ var ErrNotReady = errors.New("TSDB not ready")
|
||||||
// millisecond precision timestamps.
|
// millisecond precision timestamps.
|
||||||
func DefaultOptions() *Options {
|
func DefaultOptions() *Options {
|
||||||
return &Options{
|
return &Options{
|
||||||
WALSegmentSize: wal.DefaultSegmentSize,
|
WALSegmentSize: wal.DefaultSegmentSize,
|
||||||
MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize,
|
MaxBlockChunkSegmentSize: chunks.DefaultChunkSegmentSize,
|
||||||
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
|
RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
|
||||||
MinBlockDuration: DefaultBlockDuration,
|
MinBlockDuration: DefaultBlockDuration,
|
||||||
MaxBlockDuration: DefaultBlockDuration,
|
MaxBlockDuration: DefaultBlockDuration,
|
||||||
NoLockfile: false,
|
NoLockfile: false,
|
||||||
AllowOverlappingBlocks: false,
|
AllowOverlappingCompaction: false,
|
||||||
WALCompression: false,
|
AllowOverlappingQueries: false,
|
||||||
StripeSize: DefaultStripeSize,
|
WALCompression: false,
|
||||||
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
|
StripeSize: DefaultStripeSize,
|
||||||
IsolationDisabled: defaultIsolationDisabled,
|
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
|
||||||
HeadChunksEndTimeVariance: 0,
|
IsolationDisabled: defaultIsolationDisabled,
|
||||||
HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize,
|
HeadChunksEndTimeVariance: 0,
|
||||||
|
HeadChunksWriteQueueSize: chunks.DefaultWriteQueueSize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,9 +116,14 @@ type Options struct {
|
||||||
// NoLockfile disables creation and consideration of a lock file.
|
// NoLockfile disables creation and consideration of a lock file.
|
||||||
NoLockfile bool
|
NoLockfile bool
|
||||||
|
|
||||||
// Overlapping blocks are allowed if AllowOverlappingBlocks is true.
|
// Querying on overlapping blocks are allowed if AllowOverlappingQueries is true.
|
||||||
// This in-turn enables vertical compaction and vertical query merge.
|
// Since querying is a required operation for TSDB, if there are going to be
|
||||||
AllowOverlappingBlocks bool
|
// overlapping blocks, then this should be set to true.
|
||||||
|
AllowOverlappingQueries bool
|
||||||
|
|
||||||
|
// Compaction of overlapping blocks are allowed if AllowOverlappingCompaction is true.
|
||||||
|
// This is an optional flag for overlapping blocks.
|
||||||
|
AllowOverlappingCompaction bool
|
||||||
|
|
||||||
// WALCompression will turn on Snappy compression for records on the WAL.
|
// WALCompression will turn on Snappy compression for records on the WAL.
|
||||||
WALCompression bool
|
WALCompression bool
|
||||||
|
@ -413,6 +419,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
|
||||||
ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5),
|
ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5),
|
||||||
chunkenc.NewPool(),
|
chunkenc.NewPool(),
|
||||||
nil,
|
nil,
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "create leveled compactor")
|
return errors.Wrap(err, "create leveled compactor")
|
||||||
|
@ -704,7 +711,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil)
|
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, opts.AllowOverlappingCompaction)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
return nil, errors.Wrap(err, "create leveled compactor")
|
return nil, errors.Wrap(err, "create leveled compactor")
|
||||||
|
@ -1138,7 +1145,7 @@ func (db *DB) reloadBlocks() (err error) {
|
||||||
sort.Slice(toLoad, func(i, j int) bool {
|
sort.Slice(toLoad, func(i, j int) bool {
|
||||||
return toLoad[i].Meta().MinTime < toLoad[j].Meta().MinTime
|
return toLoad[i].Meta().MinTime < toLoad[j].Meta().MinTime
|
||||||
})
|
})
|
||||||
if !db.opts.AllowOverlappingBlocks {
|
if !db.opts.AllowOverlappingQueries {
|
||||||
if err := validateBlockSequence(toLoad); err != nil {
|
if err := validateBlockSequence(toLoad); err != nil {
|
||||||
return errors.Wrap(err, "invalid block sequence")
|
return errors.Wrap(err, "invalid block sequence")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue