selectOverlappingDirs selects wrong dirs where there are 2 disjoint sets of overlapping dirs

Signed-off-by: naivewong <867245430@qq.com>
This commit is contained in:
naivewong 2019-04-08 20:27:06 +08:00 committed by Krasi Georgiev
parent 8b33ee9e2b
commit 7ab060c864
4 changed files with 122 additions and 47 deletions

View file

@ -3,6 +3,7 @@
- [REMOVED] `chunks.NewReader` is removed as it wasn't used anywhere. - [REMOVED] `chunks.NewReader` is removed as it wasn't used anywhere.
- [REMOVED] `FromData` is considered unused so was removed. - [REMOVED] `FromData` is considered unused so was removed.
- [FEATURE] Added option WALSegmentSize -1 to disable the WAL. - [FEATURE] Added option WALSegmentSize -1 to disable the WAL.
- [BUGFIX] Bugfix in selectOverlappingDirs. Only return the first overlapping blocks.
- [BUGFIX] Fsync the meta file to persist it on disk to avoid data loss in case of a host crash. - [BUGFIX] Fsync the meta file to persist it on disk to avoid data loss in case of a host crash.
- [BUGFIX] Fix fd and vm_area leak on error path in chunks.NewDirReader. - [BUGFIX] Fix fd and vm_area leak on error path in chunks.NewDirReader.
- [BUGFIX] Fix fd and vm_area leak on error path in index.NewFileReader. - [BUGFIX] Fix fd and vm_area leak on error path in index.NewFileReader.

View file

@ -267,8 +267,8 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
return nil return nil
} }
// selectOverlappingDirs returns all dirs with overlaping time ranges. // selectOverlappingDirs returns all dirs with overlapping time ranges.
// It expects sorted input by mint. // It expects sorted input by mint and returns the overlapping dirs in the same order as received.
func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string { func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string {
if len(ds) < 2 { if len(ds) < 2 {
return nil return nil
@ -281,6 +281,8 @@ func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string {
overlappingDirs = append(overlappingDirs, ds[i].dir) overlappingDirs = append(overlappingDirs, ds[i].dir)
} }
overlappingDirs = append(overlappingDirs, d.dir) overlappingDirs = append(overlappingDirs, d.dir)
} else if len(overlappingDirs) > 0 {
break
} }
if d.meta.MaxTime > globalMaxt { if d.meta.MaxTime > globalMaxt {
globalMaxt = d.meta.MaxTime globalMaxt = d.meta.MaxTime

View file

@ -173,27 +173,25 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, nil) }, nil)
testutil.Ok(t, err) testutil.Ok(t, err)
cases := []struct { cases := map[string]struct {
metas []dirMeta metas []dirMeta
expected []string expected []string
}{ }{
{ "Outside Range": {
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 20, nil), metaRange("1", 0, 20, nil),
}, },
expected: nil, expected: nil,
}, },
// We should wait for four blocks of size 20 to appear before compacting. "We should wait for four blocks of size 20 to appear before compacting.": {
{
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 20, nil), metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil), metaRange("2", 20, 40, nil),
}, },
expected: nil, expected: nil,
}, },
// We should wait for a next block of size 20 to appear before compacting `We should wait for a next block of size 20 to appear before compacting
// the existing ones. We have three, but we ignore the fresh one from WAl. the existing ones. We have three, but we ignore the fresh one from WAl`: {
{
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 20, nil), metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil), metaRange("2", 20, 40, nil),
@ -201,8 +199,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: nil, expected: nil,
}, },
// Block to fill the entire parent range appeared should be compacted. "Block to fill the entire parent range appeared should be compacted": {
{
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 20, nil), metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil), metaRange("2", 20, 40, nil),
@ -211,9 +208,8 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: []string{"1", "2", "3"}, expected: []string{"1", "2", "3"},
}, },
// Block for the next parent range appeared with gap with size 20. Nothing will happen in the first one `Block for the next parent range appeared with gap with size 20. Nothing will happen in the first one
// anymore but we ignore fresh one still, so no compaction. anymore but we ignore fresh one still, so no compaction`: {
{
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 20, nil), metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil), metaRange("2", 20, 40, nil),
@ -221,9 +217,8 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: nil, expected: nil,
}, },
// Block for the next parent range appeared, and we have a gap with size 20 between second and third block. `Block for the next parent range appeared, and we have a gap with size 20 between second and third block.
// We will not get this missed gap anymore and we should compact just these two. We will not get this missed gap anymore and we should compact just these two.`: {
{
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 20, nil), metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil), metaRange("2", 20, 40, nil),
@ -232,8 +227,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: []string{"1", "2"}, expected: []string{"1", "2"},
}, },
{ "We have 20, 20, 20, 60, 60 range blocks. '5' is marked as fresh one": {
// We have 20, 20, 20, 60, 60 range blocks. "5" is marked as fresh one.
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 20, nil), metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil), metaRange("2", 20, 40, nil),
@ -243,8 +237,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: []string{"1", "2", "3"}, expected: []string{"1", "2", "3"},
}, },
{ "We have 20, 60, 20, 60, 240 range blocks. We can compact 20 + 60 + 60": {
// We have 20, 60, 20, 60, 240 range blocks. We can compact 20 + 60 + 60.
metas: []dirMeta{ metas: []dirMeta{
metaRange("2", 20, 40, nil), metaRange("2", 20, 40, nil),
metaRange("4", 60, 120, nil), metaRange("4", 60, 120, nil),
@ -254,8 +247,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: []string{"2", "4", "6"}, expected: []string{"2", "4", "6"},
}, },
// Do not select large blocks that have many tombstones when there is no fresh block. "Do not select large blocks that have many tombstones when there is no fresh block": {
{
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 540, &BlockStats{ metaRange("1", 0, 540, &BlockStats{
NumSeries: 10, NumSeries: 10,
@ -264,8 +256,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: nil, expected: nil,
}, },
// Select large blocks that have many tombstones when fresh appears. "Select large blocks that have many tombstones when fresh appears": {
{
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 540, &BlockStats{ metaRange("1", 0, 540, &BlockStats{
NumSeries: 10, NumSeries: 10,
@ -275,8 +266,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: []string{"1"}, expected: []string{"1"},
}, },
// For small blocks, do not compact tombstones, even when fresh appears. "For small blocks, do not compact tombstones, even when fresh appears.": {
{
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 60, &BlockStats{ metaRange("1", 0, 60, &BlockStats{
NumSeries: 10, NumSeries: 10,
@ -286,9 +276,8 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: nil, expected: nil,
}, },
// Regression test: we were stuck in a compact loop where we always recompacted `Regression test: we were stuck in a compact loop where we always recompacted
// the same block when tombstones and series counts were zero. the same block when tombstones and series counts were zero`: {
{
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 540, &BlockStats{ metaRange("1", 0, 540, &BlockStats{
NumSeries: 0, NumSeries: 0,
@ -298,12 +287,11 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: nil, expected: nil,
}, },
// Regression test: we were wrongly assuming that new block is fresh from WAL when its ULID is newest. `Regression test: we were wrongly assuming that new block is fresh from WAL when its ULID is newest.
// We need to actually look on max time instead. We need to actually look on max time instead.
//
// With previous, wrong approach "8" block was ignored, so we were wrongly compacting 5 and 7 and introducing With previous, wrong approach "8" block was ignored, so we were wrongly compacting 5 and 7 and introducing
// block overlaps. block overlaps`: {
{
metas: []dirMeta{ metas: []dirMeta{
metaRange("5", 0, 360, nil), metaRange("5", 0, 360, nil),
metaRange("6", 540, 560, nil), // Fresh one. metaRange("6", 540, 560, nil), // Fresh one.
@ -312,8 +300,10 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: []string{"7", "8"}, expected: []string{"7", "8"},
}, },
// For overlapping blocks. // |--------------|
{ // |----------------|
// |--------------|
"Overlapping blocks 1": {
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 20, nil), metaRange("1", 0, 20, nil),
metaRange("2", 19, 40, nil), metaRange("2", 19, 40, nil),
@ -321,7 +311,10 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: []string{"1", "2"}, expected: []string{"1", "2"},
}, },
{ // |--------------|
// |--------------|
// |--------------|
"Overlapping blocks 2": {
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 20, nil), metaRange("1", 0, 20, nil),
metaRange("2", 20, 40, nil), metaRange("2", 20, 40, nil),
@ -329,7 +322,10 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: []string{"2", "3"}, expected: []string{"2", "3"},
}, },
{ // |--------------|
// |---------------------|
// |--------------|
"Overlapping blocks 3": {
metas: []dirMeta{ metas: []dirMeta{
metaRange("1", 0, 20, nil), metaRange("1", 0, 20, nil),
metaRange("2", 10, 40, nil), metaRange("2", 10, 40, nil),
@ -337,7 +333,11 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: []string{"1", "2", "3"}, expected: []string{"1", "2", "3"},
}, },
{ // |--------------|
// |--------------------------------|
// |--------------|
// |--------------|
"Overlapping blocks 4": {
metas: []dirMeta{ metas: []dirMeta{
metaRange("5", 0, 360, nil), metaRange("5", 0, 360, nil),
metaRange("6", 340, 560, nil), metaRange("6", 340, 560, nil),
@ -346,10 +346,23 @@ func TestLeveledCompactor_plan(t *testing.T) {
}, },
expected: []string{"5", "6", "7", "8"}, expected: []string{"5", "6", "7", "8"},
}, },
// |--------------|
// |--------------|
// |--------------|
// |--------------|
"Overlapping blocks 5": {
metas: []dirMeta{
metaRange("1", 0, 10, nil),
metaRange("2", 9, 20, nil),
metaRange("3", 30, 40, nil),
metaRange("4", 39, 50, nil),
},
expected: []string{"1", "2"},
},
} }
for _, c := range cases { for title, c := range cases {
if !t.Run("", func(t *testing.T) { if !t.Run(title, func(t *testing.T) {
res, err := compactor.plan(c.metas) res, err := compactor.plan(c.metas)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, c.expected, res) testutil.Equals(t, c.expected, res)

View file

@ -1706,8 +1706,10 @@ func TestCorrectNumTombstones(t *testing.T) {
func TestVerticalCompaction(t *testing.T) { func TestVerticalCompaction(t *testing.T) {
cases := []struct { cases := []struct {
blockSeries [][]Series blockSeries [][]Series
expSeries map[string][]tsdbutil.Sample expSeries map[string][]tsdbutil.Sample
expBlockNum int
expOverlappingBlocks int
}{ }{
// Case 0 // Case 0
// |--------------| // |--------------|
@ -1734,6 +1736,8 @@ func TestVerticalCompaction(t *testing.T) {
sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99}, sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
sample{12, 99}, sample{13, 99}, sample{14, 99}, sample{12, 99}, sample{13, 99}, sample{14, 99},
}}, }},
expBlockNum: 1,
expOverlappingBlocks: 1,
}, },
// Case 1 // Case 1
// |-------------------------------| // |-------------------------------|
@ -1760,6 +1764,8 @@ func TestVerticalCompaction(t *testing.T) {
sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 0}, sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 0},
sample{13, 0}, sample{17, 0}, sample{13, 0}, sample{17, 0},
}}, }},
expBlockNum: 1,
expOverlappingBlocks: 1,
}, },
// Case 2 // Case 2
// |-------------------------------| // |-------------------------------|
@ -1794,6 +1800,8 @@ func TestVerticalCompaction(t *testing.T) {
sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59}, sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59},
sample{21, 59}, sample{22, 59}, sample{21, 59}, sample{22, 59},
}}, }},
expBlockNum: 1,
expOverlappingBlocks: 1,
}, },
// Case 3 // Case 3
// |-------------------| // |-------------------|
@ -1828,6 +1836,8 @@ func TestVerticalCompaction(t *testing.T) {
sample{15, 59}, sample{16, 99}, sample{17, 59}, sample{20, 59}, sample{15, 59}, sample{16, 99}, sample{17, 59}, sample{20, 59},
sample{21, 59}, sample{22, 59}, sample{21, 59}, sample{22, 59},
}}, }},
expBlockNum: 1,
expOverlappingBlocks: 1,
}, },
// Case 4 // Case 4
// |-------------------------------------| // |-------------------------------------|
@ -1864,6 +1874,8 @@ func TestVerticalCompaction(t *testing.T) {
sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99}, sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
sample{20, 0}, sample{22, 0}, sample{20, 0}, sample{22, 0},
}}, }},
expBlockNum: 1,
expOverlappingBlocks: 1,
}, },
// Case 5: series are merged properly when there are multiple series. // Case 5: series are merged properly when there are multiple series.
// |-------------------------------------| // |-------------------------------------|
@ -1958,6 +1970,53 @@ func TestVerticalCompaction(t *testing.T) {
sample{20, 0}, sample{22, 0}, sample{20, 0}, sample{22, 0},
}, },
}, },
expBlockNum: 1,
expOverlappingBlocks: 1,
},
// Case 6
// |--------------|
// |----------------|
// |--------------|
// |----------------|
{
blockSeries: [][]Series{
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
}),
},
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
sample{12, 99}, sample{13, 99}, sample{14, 99},
}),
},
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{20, 0}, sample{21, 0}, sample{22, 0}, sample{24, 0},
sample{25, 0}, sample{27, 0}, sample{28, 0}, sample{29, 0},
}),
},
[]Series{
newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
sample{23, 99}, sample{25, 99}, sample{26, 99}, sample{27, 99},
sample{28, 99}, sample{29, 99}, sample{30, 99}, sample{31, 99},
}),
},
},
expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
sample{12, 99}, sample{13, 99}, sample{14, 99},
sample{20, 0}, sample{21, 0}, sample{22, 0}, sample{23, 99},
sample{24, 0}, sample{25, 99}, sample{26, 99}, sample{27, 99},
sample{28, 99}, sample{29, 99}, sample{30, 99}, sample{31, 99},
}},
expBlockNum: 2,
expOverlappingBlocks: 2,
}, },
} }
@ -1995,9 +2054,9 @@ func TestVerticalCompaction(t *testing.T) {
testutil.Equals(t, 0, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count should be still 0 here") testutil.Equals(t, 0, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count should be still 0 here")
err = db.compact() err = db.compact()
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, 1, len(db.Blocks()), "Wrong number of blocks [after compact]") testutil.Equals(t, c.expBlockNum, len(db.Blocks()), "Wrong number of blocks [after compact]")
testutil.Equals(t, 1, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count mismatch") testutil.Equals(t, c.expOverlappingBlocks, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count mismatch")
// Query test after merging the overlapping blocks. // Query test after merging the overlapping blocks.
querier, err = db.Querier(0, 100) querier, err = db.Querier(0, 100)