mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Initial compaction implementation for block-ranges
Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
parent
bc3fee4820
commit
ce23adb579
94
compact.go
94
compact.go
|
@ -87,10 +87,19 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
type compactorOptions struct {
|
type compactorOptions struct {
|
||||||
maxBlockRange uint64
|
blockRanges []int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
|
func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
|
||||||
|
if opts.blockRanges == nil {
|
||||||
|
opts.blockRanges = []int64{
|
||||||
|
int64(2 * time.Hour),
|
||||||
|
int64(6 * time.Hour),
|
||||||
|
int64(24 * time.Hour),
|
||||||
|
int64(72 * time.Hour), // 3d
|
||||||
|
int64(216 * time.Hour), // 9d
|
||||||
|
}
|
||||||
|
}
|
||||||
return &compactor{
|
return &compactor{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
|
@ -133,37 +142,84 @@ func (c *compactor) Plan() ([][]string, error) {
|
||||||
return dms[i].meta.MinTime < dms[j].meta.MinTime
|
return dms[i].meta.MinTime < dms[j].meta.MinTime
|
||||||
})
|
})
|
||||||
|
|
||||||
if len(dms) == 0 {
|
if len(dms) <= 1 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
sliceDirs := func(i, j int) [][]string {
|
sliceDirs := func(dms []dirMeta) [][]string {
|
||||||
|
if len(dms) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
var res []string
|
var res []string
|
||||||
for k := i; k < j; k++ {
|
for _, dm := range dms {
|
||||||
res = append(res, dms[k].dir)
|
res = append(res, dm.dir)
|
||||||
}
|
}
|
||||||
return [][]string{res}
|
return [][]string{res}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then we care about compacting multiple blocks, starting with the oldest.
|
return sliceDirs(c.selectDirs(dms)), nil
|
||||||
for i := 0; i < len(dms)-compactionBlocksLen+1; i++ {
|
|
||||||
if c.match(dms[i : i+3]) {
|
|
||||||
return sliceDirs(i, i+compactionBlocksLen), nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactor) match(dirs []dirMeta) bool {
|
func (c *compactor) selectDirs(ds []dirMeta) []dirMeta {
|
||||||
g := dirs[0].meta.Compaction.Generation
|
return selectRecurse(ds, c.opts.blockRanges)
|
||||||
|
}
|
||||||
|
|
||||||
for _, d := range dirs {
|
func selectRecurse(dms []dirMeta, intervals []int64) []dirMeta {
|
||||||
if d.meta.Compaction.Generation != g {
|
if len(intervals) == 0 {
|
||||||
return false
|
return dms
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the blocks by the max interval
|
||||||
|
blocks := splitByRange(dms, intervals[len(intervals)-1])
|
||||||
|
dirs := []dirMeta{}
|
||||||
|
for i := len(blocks) - 1; i >= 0; i-- {
|
||||||
|
// We need to choose the oldest blocks to compact. If there are a couple of blocks in
|
||||||
|
// the largest interval, we should compact those first.
|
||||||
|
if len(blocks[i]) > 1 {
|
||||||
|
dirs = blocks[i]
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return uint64(dirs[len(dirs)-1].meta.MaxTime-dirs[0].meta.MinTime) <= c.opts.maxBlockRange
|
|
||||||
|
// If there are too many blocks, see if a smaller interval will catch them.
|
||||||
|
// i.e, if we have 0-20, 60-80, 80-100; all fall under 0-240, but we'd rather compact 60-100
|
||||||
|
// than all at once.
|
||||||
|
if len(dirs) > 2 {
|
||||||
|
smallerDirs := selectRecurse(dirs, intervals[:len(intervals)-1])
|
||||||
|
if len(smallerDirs) > 1 {
|
||||||
|
return smallerDirs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return dirs
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
|
||||||
|
splitDirs := [][]dirMeta{}
|
||||||
|
t0 := ds[0].meta.MinTime - ds[0].meta.MinTime%tr
|
||||||
|
dirs := []dirMeta{}
|
||||||
|
|
||||||
|
for _, dir := range ds {
|
||||||
|
if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) {
|
||||||
|
dirs = append(dirs, dir)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if dir.meta.MinTime >= t0+tr {
|
||||||
|
splitDirs = append(splitDirs, dirs)
|
||||||
|
dirs = []dirMeta{}
|
||||||
|
t0 = dir.meta.MinTime - dir.meta.MinTime%tr
|
||||||
|
if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) {
|
||||||
|
dirs = append(dirs, dir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(dirs) > 0 {
|
||||||
|
splitDirs = append(splitDirs, dirs)
|
||||||
|
}
|
||||||
|
|
||||||
|
return splitDirs
|
||||||
}
|
}
|
||||||
|
|
||||||
func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
|
func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
|
||||||
|
|
232
compact_test.go
Normal file
232
compact_test.go
Normal file
|
@ -0,0 +1,232 @@
|
||||||
|
package tsdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sort"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCompactionSelect(t *testing.T) {
|
||||||
|
opts := &compactorOptions{
|
||||||
|
blockRanges: []int64{
|
||||||
|
20,
|
||||||
|
60,
|
||||||
|
240,
|
||||||
|
720,
|
||||||
|
2160,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
type dirMetaSimple struct {
|
||||||
|
dir string
|
||||||
|
tr []int64
|
||||||
|
}
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
blocks []dirMetaSimple
|
||||||
|
planned [][]string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
blocks: []dirMetaSimple{
|
||||||
|
{
|
||||||
|
dir: "1",
|
||||||
|
tr: []int64{0, 20},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
planned: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
blocks: []dirMetaSimple{
|
||||||
|
{
|
||||||
|
dir: "1",
|
||||||
|
tr: []int64{0, 20},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "2",
|
||||||
|
tr: []int64{20, 40},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "3",
|
||||||
|
tr: []int64{40, 60},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
planned: [][]string{{"1", "2", "3"}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
blocks: []dirMetaSimple{
|
||||||
|
{
|
||||||
|
dir: "1",
|
||||||
|
tr: []int64{0, 20},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "2",
|
||||||
|
tr: []int64{20, 40},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "3",
|
||||||
|
tr: []int64{40, 60},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "4",
|
||||||
|
tr: []int64{60, 120},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "5",
|
||||||
|
tr: []int64{120, 180},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
planned: [][]string{{"1", "2", "3"}}, // We still need 0-60 to compact 0-240
|
||||||
|
},
|
||||||
|
{
|
||||||
|
blocks: []dirMetaSimple{
|
||||||
|
{
|
||||||
|
dir: "1",
|
||||||
|
tr: []int64{0, 20},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "2",
|
||||||
|
tr: []int64{20, 40},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "3",
|
||||||
|
tr: []int64{40, 60},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "4",
|
||||||
|
tr: []int64{60, 120},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "5",
|
||||||
|
tr: []int64{120, 180},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "6",
|
||||||
|
tr: []int64{720, 960},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "7",
|
||||||
|
tr: []int64{1200, 1440},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
planned: [][]string{{"6", "7"}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
blocks: []dirMetaSimple{
|
||||||
|
{
|
||||||
|
dir: "1",
|
||||||
|
tr: []int64{0, 20},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "2",
|
||||||
|
tr: []int64{60, 80},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
dir: "3",
|
||||||
|
tr: []int64{80, 100},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
planned: [][]string{{"2", "3"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
c := &compactor{
|
||||||
|
opts: opts,
|
||||||
|
}
|
||||||
|
sliceDirs := func(dms []dirMeta) [][]string {
|
||||||
|
if len(dms) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var res []string
|
||||||
|
for _, dm := range dms {
|
||||||
|
res = append(res, dm.dir)
|
||||||
|
}
|
||||||
|
return [][]string{res}
|
||||||
|
}
|
||||||
|
|
||||||
|
dmFromSimple := func(dms []dirMetaSimple) []dirMeta {
|
||||||
|
dirs := make([]dirMeta, 0, len(dms))
|
||||||
|
for _, dir := range dms {
|
||||||
|
dirs = append(dirs, dirMeta{
|
||||||
|
dir: dir.dir,
|
||||||
|
meta: &BlockMeta{
|
||||||
|
MinTime: dir.tr[0],
|
||||||
|
MaxTime: dir.tr[1],
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return dirs
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
require.Equal(t, tc.planned, sliceDirs(c.selectDirs(dmFromSimple(tc.blocks))))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSplitByRange(t *testing.T) {
|
||||||
|
splitterFunc := func(ds []dirMeta, tr int64) [][]dirMeta {
|
||||||
|
rMap := make(map[int64][]dirMeta)
|
||||||
|
for _, dir := range ds {
|
||||||
|
t0 := dir.meta.MinTime - dir.meta.MinTime%tr
|
||||||
|
if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) {
|
||||||
|
rMap[t0] = append(rMap[t0], dir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
res := make([][]dirMeta, 0, len(rMap))
|
||||||
|
for _, v := range rMap {
|
||||||
|
res = append(res, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(res, func(i, j int) bool {
|
||||||
|
return res[i][0].meta.MinTime < res[j][0].meta.MinTime
|
||||||
|
})
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
trange int64
|
||||||
|
ranges [][]int64
|
||||||
|
output [][][]int64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
trange: 60,
|
||||||
|
ranges: [][]int64{{0, 10}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
trange: 60,
|
||||||
|
ranges: [][]int64{{0, 60}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
trange: 60,
|
||||||
|
ranges: [][]int64{{0, 10}, {30, 60}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
trange: 60,
|
||||||
|
ranges: [][]int64{{0, 10}, {60, 90}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
trange: 60,
|
||||||
|
ranges: [][]int64{{0, 10}, {20, 30}, {90, 120}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
trange: 60,
|
||||||
|
ranges: [][]int64{{0, 10}, {59, 60}, {60, 120}, {120, 180}, {190, 200}, {200, 210}, {220, 239}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
blocks := make([]dirMeta, 0, len(c.ranges))
|
||||||
|
for _, r := range c.ranges {
|
||||||
|
blocks = append(blocks, dirMeta{
|
||||||
|
meta: &BlockMeta{
|
||||||
|
MinTime: r[0],
|
||||||
|
MaxTime: r[1],
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, splitterFunc(blocks, c.trange), splitByRange(blocks, c.trange))
|
||||||
|
}
|
||||||
|
}
|
8
db.go
8
db.go
|
@ -46,7 +46,6 @@ var DefaultOptions = &Options{
|
||||||
WALFlushInterval: 5 * time.Second,
|
WALFlushInterval: 5 * time.Second,
|
||||||
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
||||||
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
||||||
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
|
||||||
NoLockfile: false,
|
NoLockfile: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,9 +61,6 @@ type Options struct {
|
||||||
// It's the minimum duration of any persisted block.
|
// It's the minimum duration of any persisted block.
|
||||||
MinBlockDuration uint64
|
MinBlockDuration uint64
|
||||||
|
|
||||||
// The maximum timestamp range of compacted blocks.
|
|
||||||
MaxBlockDuration uint64
|
|
||||||
|
|
||||||
// NoLockfile disables creation and consideration of a lock file.
|
// NoLockfile disables creation and consideration of a lock file.
|
||||||
NoLockfile bool
|
NoLockfile bool
|
||||||
}
|
}
|
||||||
|
@ -227,9 +223,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
db.lockf = &lockf
|
db.lockf = &lockf
|
||||||
}
|
}
|
||||||
|
|
||||||
db.compactor = newCompactor(dir, r, l, &compactorOptions{
|
db.compactor = newCompactor(dir, r, l, &compactorOptions{})
|
||||||
maxBlockRange: opts.MaxBlockDuration,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err := db.reloadBlocks(); err != nil {
|
if err := db.reloadBlocks(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
Loading…
Reference in a new issue