diff --git a/db.go b/db.go index 975d7717e3..ea5b19c83d 100644 --- a/db.go +++ b/db.go @@ -575,6 +575,10 @@ func validateBlockSequence(bs []*Block) error { } // OverlappingBlocks returns all overlapping blocks from given meta files. +// We sort blocks by minTime. Then we iterate over each block minTime and treat it as our "current" timestamp. +// We check all the pending blocks (blocks that we have seen their minTimes, but their maxTime was still ahead current +// timestamp) if they did not finish. If not, it means they overlap with our current b. In the same time b is assumed as +// pending. func OverlappingBlocks(bm []BlockMeta) (overlaps [][]BlockMeta) { if len(bm) <= 1 { return nil @@ -583,65 +587,37 @@ func OverlappingBlocks(bm []BlockMeta) (overlaps [][]BlockMeta) { return bm[i].MinTime < bm[j].MinTime }) - for i, b := range bm[1:] { - prev := bm[i] - if b.MinTime >= prev.MaxTime { - continue + pending := []BlockMeta{bm[0]} + for _, b := range bm[1:] { + var ( + newPending []BlockMeta + samePendings = true + ) + + for _, p := range pending { + if b.MinTime >= p.MaxTime { + samePendings = false + continue + } + + // "p" overlaps with "b" and "p" is still pending. + newPending = append(newPending, p) } - // prev overlaps with b. + // Our block "b" is now pending. + pending = append(newPending, b) - overlap := []BlockMeta{prev} - - // Check if prev overlaps with something else. - for j, fb := range bm[i+1:] { - if fb.MinTime >= prev.MaxTime { - break - } - - if fb.MinTime >= bm[i+j].MaxTime { - // fb overlaps with prev, but fb does not overlap with previous block. Pack in separate group. - overlaps = append(overlaps, overlap) - overlap = []BlockMeta{prev} - } - - overlap = append(overlap, fb) - } - overlaps = append(overlaps, overlap) - } - - if len(overlaps) < 2 { - return overlaps - } - - // Deduplicate cases like {a, b, c} {b, c} into just {a, b, c} - newOverlaps := [][]BlockMeta{overlaps[0]} - for i, overlap := range overlaps[1:] { - prev := overlaps[i] - - // Check if prev contains all overlap elements. - found := false - for _, o := range overlap { - found = false - for _, p := range prev { - if p.ULID.Compare(o.ULID) == 0 { - found = true - break - } - } - if !found { - break - } - } - - if found { - // We can ignore this overlap. + if len(newPending) == 0 { continue } - newOverlaps = append(newOverlaps, overlap) - } + if samePendings && len(overlaps) > 0 { + overlaps[len(overlaps)-1] = append(overlaps[len(overlaps)-1], b) + continue + } + overlaps = append(overlaps, append(newPending, b)) - return newOverlaps + } + return overlaps } func (db *DB) String() string {