Added grouping by overlap range.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartek Plotka 2018-04-05 13:51:33 +01:00
parent 7412e2b44b
commit cc306ef0d5
2 changed files with 69 additions and 39 deletions

48
db.go
View file

@ -19,18 +19,16 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math"
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
"sort" "sort"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
"golang.org/x/sync/errgroup"
"strings"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/nightlyone/lockfile" "github.com/nightlyone/lockfile"
@ -40,6 +38,7 @@ import (
"github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"golang.org/x/sync/errgroup"
) )
// DefaultOptions used for the DB. They are sane for setups using // DefaultOptions used for the DB. They are sane for setups using
@ -570,21 +569,22 @@ func validateBlockSequence(bs []*Block) error {
overlaps := OverlappingBlocks(metas) overlaps := OverlappingBlocks(metas)
if len(overlaps) > 0 { if len(overlaps) > 0 {
return errors.Errorf("block time ranges overlap: %s", PrintOverlappedBlocks(overlaps)) return errors.Errorf("block time ranges overlap: %s", SprintOverlappedBlocks(overlaps))
} }
return nil return nil
} }
type TimeRange struct { type TimeRange struct {
MaxTime, MinTime int64 Min, Max int64
} }
// OverlappingBlocks returns all overlapping blocks from given meta files.
// OverlappingBlocks returns all overlapping blocks from given meta files aggregated by overlaping range.
// We sort blocks by minTime. Then we iterate over each block minTime and treat it as our "current" timestamp. // We sort blocks by minTime. Then we iterate over each block minTime and treat it as our "current" timestamp.
// We check all the pending blocks (blocks that we have seen their minTimes, but their maxTime was still ahead current // We check all the pending blocks (blocks that we have seen their minTimes, but their maxTime was still ahead current
// timestamp) if they did not finish. If not, it means they overlap with our current b. In the same time b is assumed as // timestamp) if they finish. If not, it means they overlap with our current b. In the same time b is assumed as
// pending. // pending.
func OverlappingBlocks(bm []BlockMeta) (overlaps [][]BlockMeta) { func OverlappingBlocks(bm []BlockMeta) map[TimeRange][]BlockMeta {
if len(bm) <= 1 { if len(bm) <= 1 {
return nil return nil
} }
@ -593,6 +593,8 @@ func OverlappingBlocks(bm []BlockMeta) (overlaps [][]BlockMeta) {
}) })
var ( var (
overlaps [][]BlockMeta
// pending contains not ended blocks in regards to "current" timestamp. // pending contains not ended blocks in regards to "current" timestamp.
pending = []BlockMeta{bm[0]} pending = []BlockMeta{bm[0]}
// continuousPending helps to aggregate same overlaps to single group. // continuousPending helps to aggregate same overlaps to single group.
@ -627,13 +629,31 @@ func OverlappingBlocks(bm []BlockMeta) (overlaps [][]BlockMeta) {
// Start new pendings. // Start new pendings.
continuousPending = true continuousPending = true
} }
return overlaps
// Fetch the critical overlapped time range foreach overlap groups.
overlapGroups := map[TimeRange][]BlockMeta{}
for _, overlap := range overlaps {
minRange := TimeRange{Min: 0, Max: math.MaxInt64}
for _, b := range overlap {
if minRange.Max > b.MaxTime {
minRange.Max = b.MaxTime
}
if minRange.Min < b.MinTime {
minRange.Min = b.MinTime
}
}
overlapGroups[minRange] = overlap
}
return overlapGroups
} }
// PrintOverlappedBlocks returns human readable string form of overlapped blocks. // SprintOverlappedBlocks returns human readable string form of overlapped blocks.
func PrintOverlappedBlocks(overlaps [][]BlockMeta) string { func SprintOverlappedBlocks(overlaps map[TimeRange][]BlockMeta) string {
var res []string var res []string
for _, o := range overlaps { for r, o := range overlaps {
var groups []string var groups []string
for _, m := range o { for _, m := range o {
groups = append(groups, fmt.Sprintf( groups = append(groups, fmt.Sprintf(
@ -644,7 +664,7 @@ func PrintOverlappedBlocks(overlaps [][]BlockMeta) string {
(time.Duration((m.MaxTime-m.MinTime)/1000)*time.Second).String(), (time.Duration((m.MaxTime-m.MinTime)/1000)*time.Second).String(),
)) ))
} }
res = append(res, fmt.Sprintf("<%s>", strings.Join(groups, ""))) res = append(res, fmt.Sprintf("[%d %d]: <%s> ", r.Min, r.Max, strings.Join(groups, "")))
} }
return strings.Join(res, "") return strings.Join(res, "")
} }

View file

@ -907,47 +907,57 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
// o1 overlaps with 10-20. // o1 overlaps with 10-20.
o1 := BlockMeta{MinTime: 15, MaxTime: 17} o1 := BlockMeta{MinTime: 15, MaxTime: 17}
testutil.Equals(t, [][]BlockMeta{{metas[1], o1}}, OverlappingBlocks(append(metas, o1))) testutil.Equals(t, map[TimeRange][]BlockMeta{
{Min: 15, Max: 17}: {metas[1], o1},
}, OverlappingBlocks(append(metas, o1)))
// o2 overlaps with 20-30 and 30-40. // o2 overlaps with 20-30 and 30-40.
o2 := BlockMeta{MinTime: 21, MaxTime: 31} o2 := BlockMeta{MinTime: 21, MaxTime: 31}
testutil.Equals(t, [][]BlockMeta{{metas[2], o2}, {o2, metas[3]}}, OverlappingBlocks(append(metas, o2))) testutil.Equals(t, map[TimeRange][]BlockMeta{
{Min: 21, Max: 30}: {metas[2], o2},
{Min: 30, Max: 31}: {o2, metas[3]},
}, OverlappingBlocks(append(metas, o2)))
// o3a and o3b overlaps with 30-40 and each other. // o3a and o3b overlaps with 30-40 and each other.
o3a := BlockMeta{MinTime: 33, MaxTime: 39} o3a := BlockMeta{MinTime: 33, MaxTime: 39}
o3b := BlockMeta{MinTime: 34, MaxTime: 36} o3b := BlockMeta{MinTime: 34, MaxTime: 36}
testutil.Equals(t, [][]BlockMeta{{metas[3], o3a, o3b}}, OverlappingBlocks(append(metas, o3a, o3b))) testutil.Equals(t, map[TimeRange][]BlockMeta{
{Min: 34, Max: 36}: {metas[3], o3a, o3b},
}, OverlappingBlocks(append(metas, o3a, o3b)))
// o4 is 1:1 overlap with 50-60. // o4 is 1:1 overlap with 50-60.
o4 := BlockMeta{MinTime: 50, MaxTime: 60} o4 := BlockMeta{MinTime: 50, MaxTime: 60}
testutil.Equals(t, [][]BlockMeta{{metas[5], o4}}, OverlappingBlocks(append(metas, o4))) testutil.Equals(t, map[TimeRange][]BlockMeta{
{Min: 50, Max: 60}: {metas[5], o4},
}, OverlappingBlocks(append(metas, o4)))
// o5 overlaps with 60-70, 70-80 and 80-90. // o5 overlaps with 60-70, 70-80 and 80-90.
o5 := BlockMeta{MinTime: 61, MaxTime: 85} o5 := BlockMeta{MinTime: 61, MaxTime: 85}
testutil.Equals(t, [][]BlockMeta{{metas[6], o5}, {o5, metas[7]}, {o5, metas[8]}}, OverlappingBlocks(append(metas, o5))) testutil.Equals(t, map[TimeRange][]BlockMeta{
{Min: 61, Max: 70}: {metas[6], o5},
{Min: 70, Max: 80}: {o5, metas[7]},
{Min: 80, Max: 85}: {o5, metas[8]},
}, OverlappingBlocks(append(metas, o5)))
// o6a overlaps with 90-100, 100-110 and o6b, o6b overlaps with 90-100 and o6a. // o6a overlaps with 90-100, 100-110 and o6b, o6b overlaps with 90-100 and o6a.
o6a := BlockMeta{MinTime: 92, MaxTime: 105} o6a := BlockMeta{MinTime: 92, MaxTime: 105}
o6b := BlockMeta{MinTime: 94, MaxTime: 99} o6b := BlockMeta{MinTime: 94, MaxTime: 99}
testutil.Equals(t, [][]BlockMeta{{metas[9], o6a, o6b}, {o6a, metas[10]}}, OverlappingBlocks(append(metas, o6a, o6b))) testutil.Equals(t, map[TimeRange][]BlockMeta{
{Min: 94, Max: 99}: {metas[9], o6a, o6b},
{Min: 100, Max: 105}: {o6a, metas[10]},
}, OverlappingBlocks(append(metas, o6a, o6b)))
// All together. // All together.
testutil.Equals(t, [][]BlockMeta{ testutil.Equals(t, map[TimeRange][]BlockMeta{
{metas[1], o1}, {Min: 15, Max: 17}: {metas[1], o1},
{metas[2], o2}, {o2, metas[3]}, {Min: 21, Max: 30}: {metas[2], o2}, {Min: 30, Max: 31}: {o2, metas[3]},
{metas[3], o3a, o3b}, {Min: 34, Max: 36}: {metas[3], o3a, o3b},
{metas[5], o4}, {Min: 50, Max: 60}: {metas[5], o4},
{metas[6], o5}, {o5, metas[7]}, {o5, metas[8]}, {Min: 61, Max: 70}: {metas[6], o5}, {Min: 70, Max: 80}: {o5, metas[7]}, {Min: 80, Max: 85}: {o5, metas[8]},
{metas[9], o6a, o6b}, {o6a, metas[10]}, {Min: 94, Max: 99}: {metas[9], o6a, o6b}, {Min: 100, Max: 105}: {o6a, metas[10]},
}, OverlappingBlocks(append(metas, o1, o2, o3a, o3b, o4, o5, o6a, o6b))) }, OverlappingBlocks(append(metas, o1, o2, o3a, o3b, o4, o5, o6a, o6b)))
// Additional cases. // Additional case.
a1 := BlockMeta{MinTime: 1, MaxTime: 5}
a2 := BlockMeta{MinTime: 1, MaxTime: 2}
a3 := BlockMeta{MinTime: 3, MaxTime: 4}
a4 := BlockMeta{MinTime: 3, MaxTime: 4}
testutil.Equals(t, [][]BlockMeta{{a1, a2}, {a1, a3, a4}}, OverlappingBlocks(append([]BlockMeta{a1}, a2, a3, a4)))
var nc1 []BlockMeta var nc1 []BlockMeta
nc1 = append(nc1, BlockMeta{MinTime: 1, MaxTime: 5}) nc1 = append(nc1, BlockMeta{MinTime: 1, MaxTime: 5})
nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3}) nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3})
@ -959,10 +969,10 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
nc1 = append(nc1, BlockMeta{MinTime: 5, MaxTime: 7}) nc1 = append(nc1, BlockMeta{MinTime: 5, MaxTime: 7})
nc1 = append(nc1, BlockMeta{MinTime: 7, MaxTime: 10}) nc1 = append(nc1, BlockMeta{MinTime: 7, MaxTime: 10})
nc1 = append(nc1, BlockMeta{MinTime: 8, MaxTime: 9}) nc1 = append(nc1, BlockMeta{MinTime: 8, MaxTime: 9})
testutil.Equals(t, [][]BlockMeta{ testutil.Equals(t, map[TimeRange][]BlockMeta{
{nc1[0], nc1[1], nc1[2], nc1[3], nc1[4], nc1[5]}, // 1-5, 2-3, 2-3, 2-3, 2-3, 2,6 {Min: 2, Max: 3}: {nc1[0], nc1[1], nc1[2], nc1[3], nc1[4], nc1[5]}, // 1-5, 2-3, 2-3, 2-3, 2-3, 2,6
{nc1[0], nc1[5], nc1[6]}, // 1-5, 2-6, 3-5 {Min: 3, Max: 5}: {nc1[0], nc1[5], nc1[6]}, // 1-5, 2-6, 3-5
{nc1[5], nc1[7]}, // 2-6, 5-7 {Min: 5, Max: 6}: {nc1[5], nc1[7]}, // 2-6, 5-7
{nc1[8], nc1[9]}, // 7-10, 8-9 {Min: 8, Max: 9}: {nc1[8], nc1[9]}, // 7-10, 8-9
}, OverlappingBlocks(nc1)) }, OverlappingBlocks(nc1))
} }