mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 04:38:48 -08:00
Merge pull request #103 from Gouthamve/block-ranges
Compaction implementation for block-ranges
This commit is contained in:
commit
7b9c536883
|
@ -113,8 +113,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
|
|||
st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{
|
||||
WALFlushInterval: 200 * time.Millisecond,
|
||||
RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||
MinBlockDuration: 3 * 60 * 60 * 1000, // 3 hours in milliseconds
|
||||
MaxBlockDuration: 27 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||
BlockRanges: tsdb.ExponentialBlockRanges(int64(2*time.Hour), 3, 5),
|
||||
})
|
||||
if err != nil {
|
||||
exitWithError(err)
|
||||
|
|
103
compact.go
103
compact.go
|
@ -30,6 +30,18 @@ import (
|
|||
"github.com/prometheus/tsdb/labels"
|
||||
)
|
||||
|
||||
// ExponentialBlockRanges returns the time ranges based on the stepSize
|
||||
func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 {
|
||||
ranges := make([]int64, 0, steps)
|
||||
curRange := minSize
|
||||
for i := 0; i < steps; i++ {
|
||||
ranges = append(ranges, curRange)
|
||||
curRange = curRange * int64(stepSize)
|
||||
}
|
||||
|
||||
return ranges
|
||||
}
|
||||
|
||||
// Compactor provides compaction against an underlying storage
|
||||
// of time series data.
|
||||
type Compactor interface {
|
||||
|
@ -87,7 +99,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
|
|||
}
|
||||
|
||||
type compactorOptions struct {
|
||||
maxBlockRange uint64
|
||||
blockRanges []int64
|
||||
}
|
||||
|
||||
func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
|
||||
|
@ -133,37 +145,90 @@ func (c *compactor) Plan() ([][]string, error) {
|
|||
return dms[i].meta.MinTime < dms[j].meta.MinTime
|
||||
})
|
||||
|
||||
if len(dms) == 0 {
|
||||
if len(dms) <= 1 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
sliceDirs := func(i, j int) [][]string {
|
||||
sliceDirs := func(dms []dirMeta) [][]string {
|
||||
if len(dms) == 0 {
|
||||
return nil
|
||||
}
|
||||
var res []string
|
||||
for k := i; k < j; k++ {
|
||||
res = append(res, dms[k].dir)
|
||||
for _, dm := range dms {
|
||||
res = append(res, dm.dir)
|
||||
}
|
||||
return [][]string{res}
|
||||
}
|
||||
|
||||
// Then we care about compacting multiple blocks, starting with the oldest.
|
||||
for i := 0; i < len(dms)-compactionBlocksLen+1; i++ {
|
||||
if c.match(dms[i : i+3]) {
|
||||
return sliceDirs(i, i+compactionBlocksLen), nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return sliceDirs(c.selectDirs(dms)), nil
|
||||
}
|
||||
|
||||
func (c *compactor) match(dirs []dirMeta) bool {
|
||||
g := dirs[0].meta.Compaction.Generation
|
||||
func (c *compactor) selectDirs(ds []dirMeta) []dirMeta {
|
||||
// The way to skip compaction is to not have blockRanges.
|
||||
if len(c.opts.blockRanges) == 1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, d := range dirs {
|
||||
if d.meta.Compaction.Generation != g {
|
||||
return false
|
||||
return selectRecurse(ds, c.opts.blockRanges)
|
||||
}
|
||||
|
||||
func selectRecurse(dms []dirMeta, intervals []int64) []dirMeta {
|
||||
if len(intervals) == 0 {
|
||||
return dms
|
||||
}
|
||||
|
||||
// Get the blocks by the max interval
|
||||
blocks := splitByRange(dms, intervals[len(intervals)-1])
|
||||
dirs := []dirMeta{}
|
||||
for i := len(blocks) - 1; i >= 0; i-- {
|
||||
// We need to choose the oldest blocks to compact. If there are a couple of blocks in
|
||||
// the largest interval, we should compact those first.
|
||||
if len(blocks[i]) > 1 {
|
||||
dirs = blocks[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
return uint64(dirs[len(dirs)-1].meta.MaxTime-dirs[0].meta.MinTime) <= c.opts.maxBlockRange
|
||||
|
||||
// If there are too many blocks, see if a smaller interval will catch them.
|
||||
// i.e, if we have 0-20, 60-80, 80-100; all fall under 0-240, but we'd rather compact 60-100
|
||||
// than all at once.
|
||||
// Again if have 0-1d, 1d-2d, 3-6d we compact 0-1d, 1d-2d to compact it into the 0-3d block instead of compacting all three
|
||||
// This is to honor the boundaries as much as possible.
|
||||
if len(dirs) > 2 {
|
||||
smallerDirs := selectRecurse(dirs, intervals[:len(intervals)-1])
|
||||
if len(smallerDirs) > 1 {
|
||||
return smallerDirs
|
||||
}
|
||||
}
|
||||
|
||||
return dirs
|
||||
}
|
||||
|
||||
// splitByRange splits the directories by the time range.
|
||||
// for example if we have blocks 0-10, 10-20, 50-60, 90-100 and want to split them into 30 interval ranges
|
||||
// splitByRange returns [0-10, 10-20], [50-60], [90-100].
|
||||
func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
|
||||
var splitDirs [][]dirMeta
|
||||
|
||||
for i := 0; i < len(ds); {
|
||||
var group []dirMeta
|
||||
// Compute start of aligned time range of size tr closest to the current block's start.
|
||||
t0 := ds[i].meta.MinTime - (ds[i].meta.MinTime % tr)
|
||||
|
||||
// Add all dirs to the current group that are within [t0, t0+tr].
|
||||
for ; i < len(ds); i++ {
|
||||
if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr {
|
||||
break
|
||||
}
|
||||
group = append(group, ds[i])
|
||||
}
|
||||
|
||||
if len(group) > 0 {
|
||||
splitDirs = append(splitDirs, group)
|
||||
}
|
||||
}
|
||||
|
||||
return splitDirs
|
||||
}
|
||||
|
||||
func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
|
||||
|
|
245
compact_test.go
Normal file
245
compact_test.go
Normal file
|
@ -0,0 +1,245 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCompactionSelect(t *testing.T) {
|
||||
opts := &compactorOptions{
|
||||
blockRanges: []int64{
|
||||
20,
|
||||
60,
|
||||
240,
|
||||
720,
|
||||
2160,
|
||||
},
|
||||
}
|
||||
|
||||
type dirMetaSimple struct {
|
||||
dir string
|
||||
tr []int64
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
blocks []dirMetaSimple
|
||||
planned [][]string
|
||||
}{
|
||||
{
|
||||
blocks: []dirMetaSimple{
|
||||
{
|
||||
dir: "1",
|
||||
tr: []int64{0, 20},
|
||||
},
|
||||
},
|
||||
planned: nil,
|
||||
},
|
||||
{
|
||||
blocks: []dirMetaSimple{
|
||||
{
|
||||
dir: "1",
|
||||
tr: []int64{0, 20},
|
||||
},
|
||||
{
|
||||
dir: "2",
|
||||
tr: []int64{20, 40},
|
||||
},
|
||||
{
|
||||
dir: "3",
|
||||
tr: []int64{40, 60},
|
||||
},
|
||||
},
|
||||
planned: [][]string{{"1", "2", "3"}},
|
||||
},
|
||||
{
|
||||
blocks: []dirMetaSimple{
|
||||
{
|
||||
dir: "1",
|
||||
tr: []int64{0, 20},
|
||||
},
|
||||
{
|
||||
dir: "2",
|
||||
tr: []int64{20, 40},
|
||||
},
|
||||
{
|
||||
dir: "3",
|
||||
tr: []int64{40, 60},
|
||||
},
|
||||
{
|
||||
dir: "4",
|
||||
tr: []int64{60, 120},
|
||||
},
|
||||
{
|
||||
dir: "5",
|
||||
tr: []int64{120, 180},
|
||||
},
|
||||
},
|
||||
planned: [][]string{{"1", "2", "3"}}, // We still need 0-60 to compact 0-240
|
||||
},
|
||||
{
|
||||
blocks: []dirMetaSimple{
|
||||
{
|
||||
dir: "1",
|
||||
tr: []int64{0, 20},
|
||||
},
|
||||
{
|
||||
dir: "2",
|
||||
tr: []int64{20, 40},
|
||||
},
|
||||
{
|
||||
dir: "3",
|
||||
tr: []int64{40, 60},
|
||||
},
|
||||
{
|
||||
dir: "4",
|
||||
tr: []int64{60, 120},
|
||||
},
|
||||
{
|
||||
dir: "5",
|
||||
tr: []int64{120, 180},
|
||||
},
|
||||
{
|
||||
dir: "6",
|
||||
tr: []int64{720, 960},
|
||||
},
|
||||
{
|
||||
dir: "7",
|
||||
tr: []int64{1200, 1440},
|
||||
},
|
||||
},
|
||||
planned: [][]string{{"6", "7"}},
|
||||
},
|
||||
{
|
||||
blocks: []dirMetaSimple{
|
||||
{
|
||||
dir: "1",
|
||||
tr: []int64{0, 20},
|
||||
},
|
||||
{
|
||||
dir: "2",
|
||||
tr: []int64{60, 80},
|
||||
},
|
||||
{
|
||||
dir: "3",
|
||||
tr: []int64{80, 100},
|
||||
},
|
||||
},
|
||||
planned: [][]string{{"2", "3"}},
|
||||
},
|
||||
}
|
||||
|
||||
c := &compactor{
|
||||
opts: opts,
|
||||
}
|
||||
sliceDirs := func(dms []dirMeta) [][]string {
|
||||
if len(dms) == 0 {
|
||||
return nil
|
||||
}
|
||||
var res []string
|
||||
for _, dm := range dms {
|
||||
res = append(res, dm.dir)
|
||||
}
|
||||
return [][]string{res}
|
||||
}
|
||||
|
||||
dmFromSimple := func(dms []dirMetaSimple) []dirMeta {
|
||||
dirs := make([]dirMeta, 0, len(dms))
|
||||
for _, dir := range dms {
|
||||
dirs = append(dirs, dirMeta{
|
||||
dir: dir.dir,
|
||||
meta: &BlockMeta{
|
||||
MinTime: dir.tr[0],
|
||||
MaxTime: dir.tr[1],
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return dirs
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
require.Equal(t, tc.planned, sliceDirs(c.selectDirs(dmFromSimple(tc.blocks))))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitByRange(t *testing.T) {
|
||||
splitterFunc := func(ds []dirMeta, tr int64) [][]dirMeta {
|
||||
rMap := make(map[int64][]dirMeta)
|
||||
for _, dir := range ds {
|
||||
t0 := dir.meta.MinTime - dir.meta.MinTime%tr
|
||||
if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) {
|
||||
rMap[t0] = append(rMap[t0], dir)
|
||||
}
|
||||
}
|
||||
res := make([][]dirMeta, 0, len(rMap))
|
||||
for _, v := range rMap {
|
||||
res = append(res, v)
|
||||
}
|
||||
|
||||
sort.Slice(res, func(i, j int) bool {
|
||||
return res[i][0].meta.MinTime < res[j][0].meta.MinTime
|
||||
})
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
trange int64
|
||||
ranges [][]int64
|
||||
output [][][]int64
|
||||
}{
|
||||
{
|
||||
trange: 60,
|
||||
ranges: [][]int64{{0, 10}},
|
||||
},
|
||||
{
|
||||
trange: 60,
|
||||
ranges: [][]int64{{0, 60}},
|
||||
},
|
||||
{
|
||||
trange: 60,
|
||||
ranges: [][]int64{{0, 10}, {30, 60}},
|
||||
},
|
||||
{
|
||||
trange: 60,
|
||||
ranges: [][]int64{{0, 10}, {60, 90}},
|
||||
},
|
||||
{
|
||||
trange: 60,
|
||||
ranges: [][]int64{{0, 10}, {20, 30}, {90, 120}},
|
||||
},
|
||||
{
|
||||
trange: 60,
|
||||
ranges: [][]int64{{0, 10}, {59, 60}, {60, 120}, {120, 180}, {190, 200}, {200, 210}, {220, 239}},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
blocks := make([]dirMeta, 0, len(c.ranges))
|
||||
for _, r := range c.ranges {
|
||||
blocks = append(blocks, dirMeta{
|
||||
meta: &BlockMeta{
|
||||
MinTime: r[0],
|
||||
MaxTime: r[1],
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
require.Equal(t, splitterFunc(blocks, c.trange), splitByRange(blocks, c.trange))
|
||||
}
|
||||
}
|
38
db.go
38
db.go
|
@ -45,8 +45,7 @@ import (
|
|||
var DefaultOptions = &Options{
|
||||
WALFlushInterval: 5 * time.Second,
|
||||
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
||||
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
||||
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour), 3, 5),
|
||||
NoLockfile: false,
|
||||
}
|
||||
|
||||
|
@ -58,12 +57,8 @@ type Options struct {
|
|||
// Duration of persisted data to keep.
|
||||
RetentionDuration uint64
|
||||
|
||||
// The timestamp range of head blocks after which they get persisted.
|
||||
// It's the minimum duration of any persisted block.
|
||||
MinBlockDuration uint64
|
||||
|
||||
// The maximum timestamp range of compacted blocks.
|
||||
MaxBlockDuration uint64
|
||||
// The sizes of the Blocks.
|
||||
BlockRanges []int64
|
||||
|
||||
// NoLockfile disables creation and consideration of a lock file.
|
||||
NoLockfile bool
|
||||
|
@ -227,9 +222,24 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
db.lockf = &lockf
|
||||
}
|
||||
|
||||
db.compactor = newCompactor(dir, r, l, &compactorOptions{
|
||||
maxBlockRange: opts.MaxBlockDuration,
|
||||
})
|
||||
copts := &compactorOptions{
|
||||
blockRanges: opts.BlockRanges,
|
||||
}
|
||||
|
||||
if len(copts.blockRanges) == 0 {
|
||||
return nil, errors.New("at least one block-range must exist")
|
||||
}
|
||||
|
||||
for float64(copts.blockRanges[len(copts.blockRanges)-1])/float64(opts.RetentionDuration) > 0.2 {
|
||||
if len(copts.blockRanges) == 1 {
|
||||
break
|
||||
}
|
||||
|
||||
// Max overflow is restricted to 20%.
|
||||
copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1]
|
||||
}
|
||||
|
||||
db.compactor = newCompactor(dir, r, l, copts)
|
||||
|
||||
if err := db.reloadBlocks(); err != nil {
|
||||
return nil, err
|
||||
|
@ -699,20 +709,20 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
|
|||
// it is within or after the currently appendable window.
|
||||
func (db *DB) ensureHead(t int64) error {
|
||||
var (
|
||||
mint, maxt = rangeForTimestamp(t, int64(db.opts.MinBlockDuration))
|
||||
mint, maxt = rangeForTimestamp(t, int64(db.opts.BlockRanges[0]))
|
||||
addBuffer = len(db.blocks) == 0
|
||||
last BlockMeta
|
||||
)
|
||||
|
||||
if !addBuffer {
|
||||
last = db.blocks[len(db.blocks)-1].Meta()
|
||||
addBuffer = last.MaxTime <= mint-int64(db.opts.MinBlockDuration)
|
||||
addBuffer = last.MaxTime <= mint-int64(db.opts.BlockRanges[0])
|
||||
}
|
||||
// Create another block of buffer in front if the DB is initialized or retrieving
|
||||
// new data after a long gap.
|
||||
// This ensures we always have a full block width of append window.
|
||||
if addBuffer {
|
||||
if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil {
|
||||
if _, err := db.createHeadBlock(mint-int64(db.opts.BlockRanges[0]), mint); err != nil {
|
||||
return err
|
||||
}
|
||||
// If the previous block reaches into our new window, make it smaller.
|
||||
|
|
Loading…
Reference in a new issue