db: Fixed validateBlockSequence.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartek Plotka 2018-03-28 18:33:41 +01:00
parent a9b28a6aa0
commit 51ce1cc7ff
2 changed files with 102 additions and 48 deletions

78
db.go
View file

@ -555,8 +555,9 @@ func (db *DB) reload(deleteable ...string) (err error) {
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}
// ValidateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence.
func validateBlockSequence(bs []*Block) error {
if len(bs) == 0 {
if len(bs) <= 1 {
return nil
}
@ -565,31 +566,82 @@ func validateBlockSequence(bs []*Block) error {
metas = append(metas, b.meta)
}
overlappedBlocks := ValidateBlockSequence(metas)
if len(overlappedBlocks) == 0 {
overlaps := OverlappingBlocks(metas)
if len(overlaps) == 0 {
return nil
}
return errors.Errorf("block time ranges overlap (%v)", overlappedBlocks)
return errors.Errorf("block time ranges overlap (%v)", overlaps)
}
// ValidateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence.
func ValidateBlockSequence(bm []BlockMeta) [][]BlockMeta {
if len(bm) == 0 {
// OverlappingBlocks returns all overlapping blocks from given meta files.
func OverlappingBlocks(bm []BlockMeta) (overlaps [][]BlockMeta) {
if len(bm) <= 1 {
return nil
}
sort.Slice(bm, func(i, j int) bool {
return bm[i].MinTime < bm[j].MinTime
})
prev := bm[0]
for _, b := range bm[1:] {
if b.MinTime < prev.MaxTime {
return [][]BlockMeta{{b, prev}}
for i, b := range bm[1:] {
prev := bm[i]
if b.MinTime >= prev.MaxTime {
continue
}
//prev = b
// prev overlaps with b.
overlap := []BlockMeta{prev}
// Check if prev overlaps with something else.
for j, fb := range bm[i+1:] {
if fb.MinTime >= prev.MaxTime {
break
}
if fb.MinTime >= bm[i+j].MaxTime {
// fb overlaps with prev, but fb does not overlap with previous block. Pack in separate group.
overlaps = append(overlaps, overlap)
overlap = []BlockMeta{prev}
}
overlap = append(overlap, fb)
}
overlaps = append(overlaps, overlap)
}
return nil
if len(overlaps) < 2 {
return overlaps
}
// Deduplicate cases like {a, b, c} {b, c} into just {a, b, c}
newOverlaps := [][]BlockMeta{overlaps[0]}
for i, overlap := range overlaps[1:] {
prev := overlaps[i]
// Check if prev contains all overlap elements.
found := false
for _, o := range overlap {
found = false
for _, p := range prev {
if p.ULID.Compare(o.ULID) == 0 {
found = true
break
}
}
if !found {
break
}
}
if found {
// We can ignore this overlap.
continue
}
newOverlaps = append(newOverlaps, overlap)
}
return newOverlaps
}
func (db *DB) String() string {

View file

@ -21,6 +21,7 @@ import (
"sort"
"testing"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
@ -893,46 +894,47 @@ func expandSeriesSet(ss SeriesSet) ([]labels.Labels, error) {
return result, ss.Err()
}
func TestValidateBlockSequenceDetectsAllOverlaps(t *testing.T) {
var metas []BlockMeta
// Create 10 blocks that does not overlap (0-10, 10-20, ..., 90-100)
for i := 0; i < 10; i++ {
metas = append(metas, BlockMeta{MinTime: int64(i * 10), MaxTime: int64((i + 1) * 10)})
func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
u := uint64(0)
newULID := func() ulid.ULID {
u++
return ulid.MustNew(u, nil)
}
overlappedBlocks := ValidateBlockSequence(metas)
testutil.Assert(t, len(overlappedBlocks) == 0, "we found unexpected overlaps")
// Create 10 blocks that does not overlap (0-10, 10-20, ..., 100-110) but in reverse order to ensure our algorithm
// will handle that.
var metas = make([]BlockMeta, 11)
for i := 10; i >= 0; i-- {
metas[i] = BlockMeta{MinTime: int64(i * 10), MaxTime: int64((i + 1) * 10), ULID: newULID()}
}
// Add overlaping blocks.
testutil.Assert(t, len(OverlappingBlocks(metas)) == 0, "we found unexpected overlaps")
// Add overlapping blocks.
// o1 overlaps with 10-20.
o1 := BlockMeta{MinTime: 15, MaxTime: 17}
o1 := BlockMeta{MinTime: 15, MaxTime: 17, ULID: newULID()}
testutil.Equals(t, [][]BlockMeta{{metas[1], o1}}, OverlappingBlocks(append(metas, o1)))
overlappedBlocks = ValidateBlockSequence(append(metas, o1))
expectedOverlaps := [][]BlockMeta{
{metas[1], o1},
}
testutil.Equals(t, expectedOverlaps, overlappedBlocks)
// o2 overlaps with 20-30 and 30-40.
o2 := BlockMeta{MinTime: 21, MaxTime: 31, ULID: newULID()}
testutil.Equals(t, [][]BlockMeta{{metas[2], o2}, {o2, metas[3]}}, OverlappingBlocks(append(metas, o2)))
//// o2 overlaps with 20-30 and 30-40.
//o2 := BlockMeta{MinTime: 21, MaxTime: 31}
//
//// o3a and o3b overlaps with 30-40 and each other.
//o3a := BlockMeta{MinTime: 33, MaxTime: 39}
//o3b := BlockMeta{MinTime: 34, MaxTime: 36}
//
//// o4 is 1:1 overlap with 50-60
//o4 := BlockMeta{MinTime: 50, MaxTime: 60}
//
//// o5 overlaps with 50-60, 60-70 and 70,80
//o5 := BlockMeta{MinTime: 60, MaxTime: 80}
//
// o3a and o3b overlaps with 30-40 and each other.
o3a := BlockMeta{MinTime: 33, MaxTime: 39, ULID: newULID()}
o3b := BlockMeta{MinTime: 34, MaxTime: 36, ULID: newULID()}
testutil.Equals(t, [][]BlockMeta{{metas[3], o3a, o3b}}, OverlappingBlocks(append(metas, o3a, o3b)))
//expectedOverlaps := [][]block.Meta{
// {metas[1], o1},
// {metas[2], o2},
// {metas[3], o2},
// {metas[3], o3, o3b},
//}
// o4 is 1:1 overlap with 50-60.
o4 := BlockMeta{MinTime: 50, MaxTime: 60, ULID: newULID()}
testutil.Equals(t, [][]BlockMeta{{metas[5], o4}}, OverlappingBlocks(append(metas, o4)))
// o5 overlaps with 60-70, 70-80 and 80-90.
o5 := BlockMeta{MinTime: 61, MaxTime: 85, ULID: newULID()}
testutil.Equals(t, [][]BlockMeta{{metas[6], o5}, {o5, metas[7]}, {o5, metas[8]}}, OverlappingBlocks(append(metas, o5)))
// o6a overlaps with 90-100, 100-110 and o6b, o6b overlaps with 90-100 and o6a.
o6a := BlockMeta{MinTime: 92, MaxTime: 105, ULID: newULID()}
o6b := BlockMeta{MinTime: 94, MaxTime: 99, ULID: newULID()}
testutil.Equals(t, [][]BlockMeta{{metas[9], o6a, o6b}, {o6a, metas[10]}}, OverlappingBlocks(append(metas, o6a, o6b)))
}