Fix compaction range selection

This commit is contained in:
Fabian Reinartz 2017-07-13 16:13:59 +02:00
parent 3c22488157
commit 5d7b5994d6
2 changed files with 79 additions and 31 deletions

View file

@ -229,11 +229,24 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
for i := 0; i < len(ds); {
var group []dirMeta
var t0 int64
m := ds[i].meta
// Compute start of aligned time range of size tr closest to the current block's start.
t0 := ds[i].meta.MinTime - (ds[i].meta.MinTime % tr)
if m.MinTime >= 0 {
t0 = tr * (m.MinTime / tr)
} else {
t0 = tr * ((m.MinTime - tr + 1) / tr)
}
// Skip blocks that don't fall into the range. This can happen via mis-alignment or
// by being the multiple of the intended range.
if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr {
i++
continue
}
// Add all dirs to the current group that are within [t0, t0+tr].
for ; i < len(ds); i++ {
// Either the block falls into the next range or doesn't fit at all (checked above).
if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr {
break
}

View file

@ -14,7 +14,6 @@
package tsdb
import (
"sort"
"testing"
"github.com/stretchr/testify/require"
@ -178,58 +177,84 @@ func TestCompactionSelect(t *testing.T) {
}
func TestSplitByRange(t *testing.T) {
splitterFunc := func(ds []dirMeta, tr int64) [][]dirMeta {
rMap := make(map[int64][]dirMeta)
for _, dir := range ds {
t0 := dir.meta.MinTime - dir.meta.MinTime%tr
if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) {
rMap[t0] = append(rMap[t0], dir)
}
}
res := make([][]dirMeta, 0, len(rMap))
for _, v := range rMap {
res = append(res, v)
}
sort.Slice(res, func(i, j int) bool {
return res[i][0].meta.MinTime < res[j][0].meta.MinTime
})
return res
}
cases := []struct {
trange int64
ranges [][]int64
output [][][]int64
ranges [][2]int64
output [][][2]int64
}{
{
trange: 60,
ranges: [][]int64{{0, 10}},
ranges: [][2]int64{{0, 10}},
output: [][][2]int64{
{{0, 10}},
},
},
{
trange: 60,
ranges: [][]int64{{0, 60}},
ranges: [][2]int64{{0, 60}},
output: [][][2]int64{
{{0, 60}},
},
},
{
trange: 60,
ranges: [][]int64{{0, 10}, {30, 60}},
ranges: [][2]int64{{0, 10}, {9, 15}, {30, 60}},
output: [][][2]int64{
{{0, 10}, {9, 15}, {30, 60}},
},
},
{
trange: 60,
ranges: [][]int64{{0, 10}, {60, 90}},
ranges: [][2]int64{{70, 90}, {125, 130}, {130, 180}, {1000, 1001}},
output: [][][2]int64{
{{70, 90}},
{{125, 130}, {130, 180}},
{{1000, 1001}},
},
},
// Mis-aligned or too-large blocks are ignored.
{
trange: 60,
ranges: [][2]int64{{50, 70}, {70, 80}},
output: [][][2]int64{
{{70, 80}},
},
},
{
trange: 72,
ranges: [][2]int64{{0, 144}, {144, 216}, {216, 288}},
output: [][][2]int64{
{{144, 216}},
{{216, 288}},
},
},
// Various awkward edge cases easy to hit with negative numbers.
{
trange: 60,
ranges: [][2]int64{{-10, -5}},
output: [][][2]int64{
{{-10, -5}},
},
},
{
trange: 60,
ranges: [][]int64{{0, 10}, {20, 30}, {90, 120}},
ranges: [][2]int64{{-60, -50}, {-10, -5}},
output: [][][2]int64{
{{-60, -50}, {-10, -5}},
},
},
{
trange: 60,
ranges: [][]int64{{0, 10}, {59, 60}, {60, 120}, {120, 180}, {190, 200}, {200, 210}, {220, 239}},
ranges: [][2]int64{{-60, -50}, {-10, -5}, {0, 15}},
output: [][][2]int64{
{{-60, -50}, {-10, -5}},
{{0, 15}},
},
},
}
for _, c := range cases {
// Transform input range tuples into dirMetas.
blocks := make([]dirMeta, 0, len(c.ranges))
for _, r := range c.ranges {
blocks = append(blocks, dirMeta{
@ -240,6 +265,16 @@ func TestSplitByRange(t *testing.T) {
})
}
require.Equal(t, splitterFunc(blocks, c.trange), splitByRange(blocks, c.trange))
// Transform output range tuples into dirMetas.
exp := make([][]dirMeta, len(c.output))
for i, group := range c.output {
for _, r := range group {
exp[i] = append(exp[i], dirMeta{
meta: &BlockMeta{MinTime: r[0], MaxTime: r[1]},
})
}
}
require.Equal(t, exp, splitByRange(blocks, c.trange))
}
}