db: Simplified algorithm.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartek Plotka 2018-03-28 23:18:24 +01:00
parent 51ce1cc7ff
commit c8b4a7b839

74
db.go
View file

@ -575,6 +575,10 @@ func validateBlockSequence(bs []*Block) error {
}
// OverlappingBlocks returns all overlapping blocks from given meta files.
// We sort blocks by minTime. Then we iterate over each block minTime and treat it as our "current" timestamp.
// We check all the pending blocks (blocks that we have seen their minTimes, but their maxTime was still ahead current
// timestamp) if they did not finish. If not, it means they overlap with our current b. In the same time b is assumed as
// pending.
func OverlappingBlocks(bm []BlockMeta) (overlaps [][]BlockMeta) {
if len(bm) <= 1 {
return nil
@ -583,65 +587,37 @@ func OverlappingBlocks(bm []BlockMeta) (overlaps [][]BlockMeta) {
return bm[i].MinTime < bm[j].MinTime
})
for i, b := range bm[1:] {
prev := bm[i]
if b.MinTime >= prev.MaxTime {
pending := []BlockMeta{bm[0]}
for _, b := range bm[1:] {
var (
newPending []BlockMeta
samePendings = true
)
for _, p := range pending {
if b.MinTime >= p.MaxTime {
samePendings = false
continue
}
// prev overlaps with b.
overlap := []BlockMeta{prev}
// "p" overlaps with "b" and "p" is still pending.
newPending = append(newPending, p)
}
// Our block "b" is now pending.
pending = append(newPending, b)
// Check if prev overlaps with something else.
for j, fb := range bm[i+1:] {
if fb.MinTime >= prev.MaxTime {
break
if len(newPending) == 0 {
continue
}
if fb.MinTime >= bm[i+j].MaxTime {
// fb overlaps with prev, but fb does not overlap with previous block. Pack in separate group.
overlaps = append(overlaps, overlap)
overlap = []BlockMeta{prev}
if samePendings && len(overlaps) > 0 {
overlaps[len(overlaps)-1] = append(overlaps[len(overlaps)-1], b)
continue
}
overlaps = append(overlaps, append(newPending, b))
overlap = append(overlap, fb)
}
overlaps = append(overlaps, overlap)
}
if len(overlaps) < 2 {
return overlaps
}
// Deduplicate cases like {a, b, c} {b, c} into just {a, b, c}
newOverlaps := [][]BlockMeta{overlaps[0]}
for i, overlap := range overlaps[1:] {
prev := overlaps[i]
// Check if prev contains all overlap elements.
found := false
for _, o := range overlap {
found = false
for _, p := range prev {
if p.ULID.Compare(o.ULID) == 0 {
found = true
break
}
}
if !found {
break
}
}
if found {
// We can ignore this overlap.
continue
}
newOverlaps = append(newOverlaps, overlap)
}
return newOverlaps
}
func (db *DB) String() string {