From 4cc37eecab2bb29a75c55e5a90c3dcb8017731ea Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 1 Sep 2017 11:46:46 +0200 Subject: [PATCH] Refactor and add tests for compactor --- block.go | 7 -- compact.go | 67 ++++------ compact_test.go | 322 ++++++++++++++++++++---------------------------- db.go | 22 +--- 4 files changed, 161 insertions(+), 257 deletions(-) diff --git a/block.go b/block.go index a03a79dce6..84f0060373 100644 --- a/block.go +++ b/block.go @@ -53,13 +53,6 @@ type BlockReader interface { Tombstones() TombstoneReader } -// // Block is an interface to a DiskBlock that can also be queried. -// type Block interface { -// DiskBlock -// Queryable -// Snapshottable -// } - // Snapshottable defines an entity that can be backedup online. type Snapshottable interface { Snapshot(dir string) error diff --git a/compact.go b/compact.go index 1ffd90a07f..f5a2045eb8 100644 --- a/compact.go +++ b/compact.go @@ -59,10 +59,11 @@ type Compactor interface { // LeveledCompactor implements the Compactor interface. type LeveledCompactor struct { - dir string - metrics *compactorMetrics - logger log.Logger - opts *LeveledCompactorOptions + dir string + metrics *compactorMetrics + logger log.Logger + ranges []int64 + chunkPool chunks.Pool } type compactorMetrics struct { @@ -97,30 +98,20 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { return m } -// LeveledCompactorOptions are the options for a LeveledCompactor. -type LeveledCompactorOptions struct { - blockRanges []int64 - chunkPool chunks.Pool -} - // NewLeveledCompactor returns a LeveledCompactor. -func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, opts *LeveledCompactorOptions) *LeveledCompactor { - if opts == nil { - opts = &LeveledCompactorOptions{ - chunkPool: chunks.NewPool(), - } +func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, pool chunks.Pool) (*LeveledCompactor, error) { + if len(ranges) == 0 { + return nil, errors.Errorf("at least one range must be provided") + } + if pool == nil { + pool = chunks.NewPool() } return &LeveledCompactor{ - opts: opts, - logger: l, - metrics: newCompactorMetrics(r), - } -} - -type compactionInfo struct { - seq int - generation int - mint, maxt int64 + ranges: ranges, + chunkPool: pool, + logger: l, + metrics: newCompactorMetrics(r), + }, nil } type dirMeta struct { @@ -142,21 +133,15 @@ func (c *LeveledCompactor) Plan(dir string) ([]string, error) { if err != nil { return nil, err } - if meta.Compaction.Level > 0 { - dms = append(dms, dirMeta{dir, meta}) - } + dms = append(dms, dirMeta{dir, meta}) } - sort.Slice(dms, func(i, j int) bool { - return dms[i].meta.MinTime < dms[j].meta.MinTime - }) - return c.plan(dms) } func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { - if len(dms) <= 1 { - return nil, nil - } + sort.Slice(dms, func(i, j int) bool { + return dms[i].meta.MinTime < dms[j].meta.MinTime + }) var res []string for _, dm := range c.selectDirs(dms) { @@ -169,11 +154,11 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { // Compact any blocks that have >5% tombstones. for i := len(dms) - 1; i >= 0; i-- { meta := dms[i].meta - if meta.MaxTime-meta.MinTime < c.opts.blockRanges[len(c.opts.blockRanges)/2] { + if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { break } - if meta.Stats.NumSeries/(meta.Stats.NumTombstones+1) <= 20 { // 5% + if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { return []string{dms[i].dir}, nil } } @@ -184,13 +169,13 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { // selectDirs returns the dir metas that should be compacted into a single new block. // If only a single block range is configured, the result is always nil. func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { - if len(c.opts.blockRanges) < 2 || len(ds) < 1 { + if len(c.ranges) < 2 || len(ds) < 1 { return nil } highTime := ds[len(ds)-1].meta.MinTime - for _, iv := range c.opts.blockRanges[1:] { + for _, iv := range c.ranges[1:] { parts := splitByRange(ds, iv) if len(parts) == 0 { continue @@ -291,7 +276,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { var metas []*BlockMeta for _, d := range dirs { - b, err := newPersistedBlock(d, c.opts.chunkPool) + b, err := newPersistedBlock(d, c.chunkPool) if err != nil { return err } @@ -491,7 +476,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } for _, chk := range chks { - c.opts.chunkPool.Put(chk.Chunk) + c.chunkPool.Put(chk.Chunk) } for _, l := range lset { diff --git a/compact_test.go b/compact_test.go index 81cd23b497..d1650cd16a 100644 --- a/compact_test.go +++ b/compact_test.go @@ -19,194 +19,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestLeveledCompactor_Select(t *testing.T) { - opts := &LeveledCompactorOptions{ - blockRanges: []int64{ - 20, - 60, - 240, - 720, - 2160, - }, - } - - type dirMetaSimple struct { - dir string - tr []int64 - } - - cases := []struct { - blocks []dirMetaSimple - planned [][]string - }{ - { - blocks: []dirMetaSimple{ - { - dir: "1", - tr: []int64{0, 20}, - }, - }, - planned: nil, - }, - { - // We should wait for a third block of size 20 to appear before compacting - // the existing ones. - blocks: []dirMetaSimple{ - { - dir: "1", - tr: []int64{0, 20}, - }, - { - dir: "2", - tr: []int64{20, 40}, - }, - }, - planned: nil, - }, - { - // Block to fill the entire parent range appeared – should be compacted. - blocks: []dirMetaSimple{ - { - dir: "1", - tr: []int64{0, 20}, - }, - { - dir: "2", - tr: []int64{20, 40}, - }, - { - dir: "3", - tr: []int64{40, 60}, - }, - }, - planned: [][]string{{"1", "2", "3"}}, - }, - { - // Block for the next parent range appeared. Nothing will happen in the first one - // anymore and we should compact it. - blocks: []dirMetaSimple{ - { - dir: "1", - tr: []int64{0, 20}, - }, - { - dir: "2", - tr: []int64{20, 40}, - }, - { - dir: "3", - tr: []int64{60, 80}, - }, - }, - planned: [][]string{{"1", "2"}}, - }, - { - blocks: []dirMetaSimple{ - { - dir: "1", - tr: []int64{0, 20}, - }, - { - dir: "2", - tr: []int64{20, 40}, - }, - { - dir: "3", - tr: []int64{40, 60}, - }, - { - dir: "4", - tr: []int64{60, 120}, - }, - { - dir: "5", - tr: []int64{120, 180}, - }, - }, - planned: [][]string{{"1", "2", "3"}}, // We still need 0-60 to compact 0-240 - }, - { - blocks: []dirMetaSimple{ - { - dir: "2", - tr: []int64{20, 40}, - }, - { - dir: "4", - tr: []int64{60, 120}, - }, - { - dir: "5", - tr: []int64{120, 180}, - }, - { - dir: "6", - tr: []int64{720, 960}, - }, - { - dir: "7", - tr: []int64{1200, 1440}, - }, - }, - planned: [][]string{{"2", "4", "5"}}, - }, - { - blocks: []dirMetaSimple{ - { - dir: "1", - tr: []int64{0, 60}, - }, - { - dir: "4", - tr: []int64{60, 80}, - }, - { - dir: "5", - tr: []int64{80, 100}, - }, - { - dir: "6", - tr: []int64{100, 120}, - }, - }, - planned: [][]string{{"4", "5", "6"}}, - }, - } - - c := &LeveledCompactor{ - opts: opts, - } - sliceDirs := func(dms []dirMeta) [][]string { - if len(dms) == 0 { - return nil - } - var res []string - for _, dm := range dms { - res = append(res, dm.dir) - } - return [][]string{res} - } - - dmFromSimple := func(dms []dirMetaSimple) []dirMeta { - dirs := make([]dirMeta, 0, len(dms)) - for _, dir := range dms { - dirs = append(dirs, dirMeta{ - dir: dir.dir, - meta: &BlockMeta{ - MinTime: dir.tr[0], - MaxTime: dir.tr[1], - }, - }) - } - - return dirs - } - - for _, tc := range cases { - require.Equal(t, tc.planned, sliceDirs(c.selectDirs(dmFromSimple(tc.blocks)))) - } -} - func TestSplitByRange(t *testing.T) { cases := []struct { trange int64 @@ -329,8 +141,136 @@ func TestNoPanicFor0Tombstones(t *testing.T) { }, } - c := NewLeveledCompactor(nil, nil, &LeveledCompactorOptions{ - blockRanges: []int64{50}, - }) + c, err := NewLeveledCompactor(nil, nil, []int64{50}, nil) + require.NoError(t, err) + c.plan(metas) } + +func TestLeveledCompactor_plan(t *testing.T) { + compactor, err := NewLeveledCompactor(nil, nil, []int64{ + 20, + 60, + 240, + 720, + 2160, + }, nil) + require.NoError(t, err) + + metaRange := func(name string, mint, maxt int64, stats *BlockStats) dirMeta { + meta := &BlockMeta{MinTime: mint, MaxTime: maxt} + if stats != nil { + meta.Stats = *stats + } + return dirMeta{ + dir: name, + meta: meta, + } + } + + cases := []struct { + metas []dirMeta + expected []string + }{ + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + }, + expected: nil, + }, + // We should wait for a third block of size 20 to appear before compacting + // the existing ones. + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + }, + expected: nil, + }, + // Block to fill the entire parent range appeared – should be compacted. + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 40, 60, nil), + }, + expected: []string{"1", "2", "3"}, + }, + // Block for the next parent range appeared. Nothing will happen in the first one + // anymore and we should compact it. + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 60, 80, nil), + }, + expected: []string{"1", "2"}, + }, + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 40, 60, nil), + metaRange("4", 60, 120, nil), + metaRange("5", 120, 180, nil), + }, + expected: []string{"1", "2", "3"}, + }, + { + metas: []dirMeta{ + metaRange("2", 20, 40, nil), + metaRange("4", 60, 120, nil), + metaRange("5", 120, 180, nil), + metaRange("6", 720, 960, nil), + }, + expected: []string{"2", "4", "5"}, + }, + { + metas: []dirMeta{ + metaRange("1", 0, 60, nil), + metaRange("4", 60, 80, nil), + metaRange("5", 80, 100, nil), + metaRange("6", 100, 120, nil), + }, + expected: []string{"4", "5", "6"}, + }, + // Select large blocks that have many tombstones. + { + metas: []dirMeta{ + metaRange("1", 0, 720, &BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }), + }, + expected: []string{"1"}, + }, + // For small blocks, do not compact tombstones. + { + metas: []dirMeta{ + metaRange("1", 0, 30, &BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }), + }, + expected: nil, + }, + // Regression test: we were stuck in a compact loop where we always recompacted + // the same block when tombstones and series counts were zero. + { + metas: []dirMeta{ + metaRange("1", 0, 720, &BlockStats{ + NumSeries: 0, + NumTombstones: 0, + }), + }, + expected: nil, + }, + } + + for i, c := range cases { + res, err := compactor.plan(c.metas) + require.NoError(t, err) + + require.Equal(t, c.expected, res, "test case %d", i) + } +} diff --git a/db.go b/db.go index 8d4c569326..4a629c1f3e 100644 --- a/db.go +++ b/db.go @@ -199,30 +199,16 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = &lockf } - copts := &LeveledCompactorOptions{ - blockRanges: opts.BlockRanges, - chunkPool: db.chunkPool, + db.compactor, err = NewLeveledCompactor(r, l, opts.BlockRanges, db.chunkPool) + if err != nil { + return nil, errors.Wrap(err, "create leveled compactor") } - if len(copts.blockRanges) == 0 { - return nil, errors.New("at least one block-range must exist") - } - - for float64(copts.blockRanges[len(copts.blockRanges)-1])/float64(opts.RetentionDuration) > 0.2 { - if len(copts.blockRanges) == 1 { - break - } - // Max overflow is restricted to 20%. - copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1] - } - - db.compactor = NewLeveledCompactor(r, l, copts) - wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, 10*time.Second) if err != nil { return nil, err } - db.head, err = NewHead(r, l, wal, copts.blockRanges[0]) + db.head, err = NewHead(r, l, wal, opts.BlockRanges[0]) if err != nil { return nil, err }