Refactor and add tests for compactor

This commit is contained in:
Fabian Reinartz 2017-09-01 11:46:46 +02:00
parent 5cf2662074
commit 4cc37eecab
4 changed files with 161 additions and 257 deletions

View file

@ -53,13 +53,6 @@ type BlockReader interface {
Tombstones() TombstoneReader
}
// // Block is an interface to a DiskBlock that can also be queried.
// type Block interface {
// DiskBlock
// Queryable
// Snapshottable
// }
// Snapshottable defines an entity that can be backedup online.
type Snapshottable interface {
Snapshot(dir string) error

View file

@ -59,10 +59,11 @@ type Compactor interface {
// LeveledCompactor implements the Compactor interface.
type LeveledCompactor struct {
dir string
metrics *compactorMetrics
logger log.Logger
opts *LeveledCompactorOptions
dir string
metrics *compactorMetrics
logger log.Logger
ranges []int64
chunkPool chunks.Pool
}
type compactorMetrics struct {
@ -97,30 +98,20 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
return m
}
// LeveledCompactorOptions are the options for a LeveledCompactor.
type LeveledCompactorOptions struct {
blockRanges []int64
chunkPool chunks.Pool
}
// NewLeveledCompactor returns a LeveledCompactor.
func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, opts *LeveledCompactorOptions) *LeveledCompactor {
if opts == nil {
opts = &LeveledCompactorOptions{
chunkPool: chunks.NewPool(),
}
func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, pool chunks.Pool) (*LeveledCompactor, error) {
if len(ranges) == 0 {
return nil, errors.Errorf("at least one range must be provided")
}
if pool == nil {
pool = chunks.NewPool()
}
return &LeveledCompactor{
opts: opts,
logger: l,
metrics: newCompactorMetrics(r),
}
}
type compactionInfo struct {
seq int
generation int
mint, maxt int64
ranges: ranges,
chunkPool: pool,
logger: l,
metrics: newCompactorMetrics(r),
}, nil
}
type dirMeta struct {
@ -142,21 +133,15 @@ func (c *LeveledCompactor) Plan(dir string) ([]string, error) {
if err != nil {
return nil, err
}
if meta.Compaction.Level > 0 {
dms = append(dms, dirMeta{dir, meta})
}
dms = append(dms, dirMeta{dir, meta})
}
sort.Slice(dms, func(i, j int) bool {
return dms[i].meta.MinTime < dms[j].meta.MinTime
})
return c.plan(dms)
}
func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
if len(dms) <= 1 {
return nil, nil
}
sort.Slice(dms, func(i, j int) bool {
return dms[i].meta.MinTime < dms[j].meta.MinTime
})
var res []string
for _, dm := range c.selectDirs(dms) {
@ -169,11 +154,11 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
// Compact any blocks that have >5% tombstones.
for i := len(dms) - 1; i >= 0; i-- {
meta := dms[i].meta
if meta.MaxTime-meta.MinTime < c.opts.blockRanges[len(c.opts.blockRanges)/2] {
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
break
}
if meta.Stats.NumSeries/(meta.Stats.NumTombstones+1) <= 20 { // 5%
if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
return []string{dms[i].dir}, nil
}
}
@ -184,13 +169,13 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
// selectDirs returns the dir metas that should be compacted into a single new block.
// If only a single block range is configured, the result is always nil.
func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
if len(c.opts.blockRanges) < 2 || len(ds) < 1 {
if len(c.ranges) < 2 || len(ds) < 1 {
return nil
}
highTime := ds[len(ds)-1].meta.MinTime
for _, iv := range c.opts.blockRanges[1:] {
for _, iv := range c.ranges[1:] {
parts := splitByRange(ds, iv)
if len(parts) == 0 {
continue
@ -291,7 +276,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
var metas []*BlockMeta
for _, d := range dirs {
b, err := newPersistedBlock(d, c.opts.chunkPool)
b, err := newPersistedBlock(d, c.chunkPool)
if err != nil {
return err
}
@ -491,7 +476,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
for _, chk := range chks {
c.opts.chunkPool.Put(chk.Chunk)
c.chunkPool.Put(chk.Chunk)
}
for _, l := range lset {

View file

@ -19,194 +19,6 @@ import (
"github.com/stretchr/testify/require"
)
func TestLeveledCompactor_Select(t *testing.T) {
opts := &LeveledCompactorOptions{
blockRanges: []int64{
20,
60,
240,
720,
2160,
},
}
type dirMetaSimple struct {
dir string
tr []int64
}
cases := []struct {
blocks []dirMetaSimple
planned [][]string
}{
{
blocks: []dirMetaSimple{
{
dir: "1",
tr: []int64{0, 20},
},
},
planned: nil,
},
{
// We should wait for a third block of size 20 to appear before compacting
// the existing ones.
blocks: []dirMetaSimple{
{
dir: "1",
tr: []int64{0, 20},
},
{
dir: "2",
tr: []int64{20, 40},
},
},
planned: nil,
},
{
// Block to fill the entire parent range appeared should be compacted.
blocks: []dirMetaSimple{
{
dir: "1",
tr: []int64{0, 20},
},
{
dir: "2",
tr: []int64{20, 40},
},
{
dir: "3",
tr: []int64{40, 60},
},
},
planned: [][]string{{"1", "2", "3"}},
},
{
// Block for the next parent range appeared. Nothing will happen in the first one
// anymore and we should compact it.
blocks: []dirMetaSimple{
{
dir: "1",
tr: []int64{0, 20},
},
{
dir: "2",
tr: []int64{20, 40},
},
{
dir: "3",
tr: []int64{60, 80},
},
},
planned: [][]string{{"1", "2"}},
},
{
blocks: []dirMetaSimple{
{
dir: "1",
tr: []int64{0, 20},
},
{
dir: "2",
tr: []int64{20, 40},
},
{
dir: "3",
tr: []int64{40, 60},
},
{
dir: "4",
tr: []int64{60, 120},
},
{
dir: "5",
tr: []int64{120, 180},
},
},
planned: [][]string{{"1", "2", "3"}}, // We still need 0-60 to compact 0-240
},
{
blocks: []dirMetaSimple{
{
dir: "2",
tr: []int64{20, 40},
},
{
dir: "4",
tr: []int64{60, 120},
},
{
dir: "5",
tr: []int64{120, 180},
},
{
dir: "6",
tr: []int64{720, 960},
},
{
dir: "7",
tr: []int64{1200, 1440},
},
},
planned: [][]string{{"2", "4", "5"}},
},
{
blocks: []dirMetaSimple{
{
dir: "1",
tr: []int64{0, 60},
},
{
dir: "4",
tr: []int64{60, 80},
},
{
dir: "5",
tr: []int64{80, 100},
},
{
dir: "6",
tr: []int64{100, 120},
},
},
planned: [][]string{{"4", "5", "6"}},
},
}
c := &LeveledCompactor{
opts: opts,
}
sliceDirs := func(dms []dirMeta) [][]string {
if len(dms) == 0 {
return nil
}
var res []string
for _, dm := range dms {
res = append(res, dm.dir)
}
return [][]string{res}
}
dmFromSimple := func(dms []dirMetaSimple) []dirMeta {
dirs := make([]dirMeta, 0, len(dms))
for _, dir := range dms {
dirs = append(dirs, dirMeta{
dir: dir.dir,
meta: &BlockMeta{
MinTime: dir.tr[0],
MaxTime: dir.tr[1],
},
})
}
return dirs
}
for _, tc := range cases {
require.Equal(t, tc.planned, sliceDirs(c.selectDirs(dmFromSimple(tc.blocks))))
}
}
func TestSplitByRange(t *testing.T) {
cases := []struct {
trange int64
@ -329,8 +141,136 @@ func TestNoPanicFor0Tombstones(t *testing.T) {
},
}
c := NewLeveledCompactor(nil, nil, &LeveledCompactorOptions{
blockRanges: []int64{50},
})
c, err := NewLeveledCompactor(nil, nil, []int64{50}, nil)
require.NoError(t, err)
c.plan(metas)
}
func TestLeveledCompactor_plan(t *testing.T) {
compactor, err := NewLeveledCompactor(nil, nil, []int64{
20,
60,
240,
720,
2160,
}, nil)
require.NoError(t, err)
metaRange := func(name string, mint, maxt int64, stats *BlockStats) dirMeta {
meta := &BlockMeta{MinTime: mint, MaxTime: maxt}
if stats != nil {
meta.Stats = *stats
}
return dirMeta{
dir: name,
meta: meta,
}
}
cases := []struct {
metas []dirMeta
expected []string
}{
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
},
expected: nil,
},
// We should wait for a third block of size 20 to appear before compacting
// the existing ones.
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil),
},
expected: nil,
},
// Block to fill the entire parent range appeared should be compacted.
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil),
metaRange("3", 40, 60, nil),
},
expected: []string{"1", "2", "3"},
},
// Block for the next parent range appeared. Nothing will happen in the first one
// anymore and we should compact it.
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil),
metaRange("3", 60, 80, nil),
},
expected: []string{"1", "2"},
},
{
metas: []dirMeta{
metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil),
metaRange("3", 40, 60, nil),
metaRange("4", 60, 120, nil),
metaRange("5", 120, 180, nil),
},
expected: []string{"1", "2", "3"},
},
{
metas: []dirMeta{
metaRange("2", 20, 40, nil),
metaRange("4", 60, 120, nil),
metaRange("5", 120, 180, nil),
metaRange("6", 720, 960, nil),
},
expected: []string{"2", "4", "5"},
},
{
metas: []dirMeta{
metaRange("1", 0, 60, nil),
metaRange("4", 60, 80, nil),
metaRange("5", 80, 100, nil),
metaRange("6", 100, 120, nil),
},
expected: []string{"4", "5", "6"},
},
// Select large blocks that have many tombstones.
{
metas: []dirMeta{
metaRange("1", 0, 720, &BlockStats{
NumSeries: 10,
NumTombstones: 3,
}),
},
expected: []string{"1"},
},
// For small blocks, do not compact tombstones.
{
metas: []dirMeta{
metaRange("1", 0, 30, &BlockStats{
NumSeries: 10,
NumTombstones: 3,
}),
},
expected: nil,
},
// Regression test: we were stuck in a compact loop where we always recompacted
// the same block when tombstones and series counts were zero.
{
metas: []dirMeta{
metaRange("1", 0, 720, &BlockStats{
NumSeries: 0,
NumTombstones: 0,
}),
},
expected: nil,
},
}
for i, c := range cases {
res, err := compactor.plan(c.metas)
require.NoError(t, err)
require.Equal(t, c.expected, res, "test case %d", i)
}
}

22
db.go
View file

@ -199,30 +199,16 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
db.lockf = &lockf
}
copts := &LeveledCompactorOptions{
blockRanges: opts.BlockRanges,
chunkPool: db.chunkPool,
db.compactor, err = NewLeveledCompactor(r, l, opts.BlockRanges, db.chunkPool)
if err != nil {
return nil, errors.Wrap(err, "create leveled compactor")
}
if len(copts.blockRanges) == 0 {
return nil, errors.New("at least one block-range must exist")
}
for float64(copts.blockRanges[len(copts.blockRanges)-1])/float64(opts.RetentionDuration) > 0.2 {
if len(copts.blockRanges) == 1 {
break
}
// Max overflow is restricted to 20%.
copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1]
}
db.compactor = NewLeveledCompactor(r, l, copts)
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, 10*time.Second)
if err != nil {
return nil, err
}
db.head, err = NewHead(r, l, wal, copts.blockRanges[0])
db.head, err = NewHead(r, l, wal, opts.BlockRanges[0])
if err != nil {
return nil, err
}