mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 06:04:05 -08:00
Choose ranges by retention and min-block-duration
Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
parent
ce23adb579
commit
9a2ab732b5
|
@ -113,8 +113,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
|
||||||
st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{
|
st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{
|
||||||
WALFlushInterval: 200 * time.Millisecond,
|
WALFlushInterval: 200 * time.Millisecond,
|
||||||
RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||||
MinBlockDuration: 3 * 60 * 60 * 1000, // 3 hours in milliseconds
|
BlockRanges: tsdb.ExponentialBlockRanges(int64(2*time.Hour), 3, 5),
|
||||||
MaxBlockDuration: 27 * 60 * 60 * 1000, // 1 days in milliseconds
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
exitWithError(err)
|
exitWithError(err)
|
||||||
|
|
61
compact.go
61
compact.go
|
@ -30,6 +30,18 @@ import (
|
||||||
"github.com/prometheus/tsdb/labels"
|
"github.com/prometheus/tsdb/labels"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ExponentialBlockRanges returns the time ranges based on the stepSize
|
||||||
|
func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 {
|
||||||
|
ranges := make([]int64, 0, steps)
|
||||||
|
curRange := minSize
|
||||||
|
for i := 0; i < steps; i++ {
|
||||||
|
ranges = append(ranges, curRange)
|
||||||
|
curRange = curRange * int64(stepSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ranges
|
||||||
|
}
|
||||||
|
|
||||||
// Compactor provides compaction against an underlying storage
|
// Compactor provides compaction against an underlying storage
|
||||||
// of time series data.
|
// of time series data.
|
||||||
type Compactor interface {
|
type Compactor interface {
|
||||||
|
@ -91,15 +103,6 @@ type compactorOptions struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
|
func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor {
|
||||||
if opts.blockRanges == nil {
|
|
||||||
opts.blockRanges = []int64{
|
|
||||||
int64(2 * time.Hour),
|
|
||||||
int64(6 * time.Hour),
|
|
||||||
int64(24 * time.Hour),
|
|
||||||
int64(72 * time.Hour), // 3d
|
|
||||||
int64(216 * time.Hour), // 9d
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &compactor{
|
return &compactor{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
|
@ -161,6 +164,11 @@ func (c *compactor) Plan() ([][]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactor) selectDirs(ds []dirMeta) []dirMeta {
|
func (c *compactor) selectDirs(ds []dirMeta) []dirMeta {
|
||||||
|
// The way to skip compaction is to not have blockRanges.
|
||||||
|
if len(c.opts.blockRanges) == 1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
return selectRecurse(ds, c.opts.blockRanges)
|
return selectRecurse(ds, c.opts.blockRanges)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -184,6 +192,8 @@ func selectRecurse(dms []dirMeta, intervals []int64) []dirMeta {
|
||||||
// If there are too many blocks, see if a smaller interval will catch them.
|
// If there are too many blocks, see if a smaller interval will catch them.
|
||||||
// i.e, if we have 0-20, 60-80, 80-100; all fall under 0-240, but we'd rather compact 60-100
|
// i.e, if we have 0-20, 60-80, 80-100; all fall under 0-240, but we'd rather compact 60-100
|
||||||
// than all at once.
|
// than all at once.
|
||||||
|
// Again if have 0-1d, 1d-2d, 3-6d we compact 0-1d, 1d-2d to compact it into the 0-3d block instead of compacting all three
|
||||||
|
// This is to honor the boundaries as much as possible.
|
||||||
if len(dirs) > 2 {
|
if len(dirs) > 2 {
|
||||||
smallerDirs := selectRecurse(dirs, intervals[:len(intervals)-1])
|
smallerDirs := selectRecurse(dirs, intervals[:len(intervals)-1])
|
||||||
if len(smallerDirs) > 1 {
|
if len(smallerDirs) > 1 {
|
||||||
|
@ -194,29 +204,28 @@ func selectRecurse(dms []dirMeta, intervals []int64) []dirMeta {
|
||||||
return dirs
|
return dirs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// splitByRange splits the directories by the time range.
|
||||||
|
// for example if we have blocks 0-10, 10-20, 50-60, 90-100 and want to split them into 30 interval ranges
|
||||||
|
// splitByRange returns [0-10, 10-20], [50-60], [90-100].
|
||||||
func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
|
func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
|
||||||
splitDirs := [][]dirMeta{}
|
var splitDirs [][]dirMeta
|
||||||
t0 := ds[0].meta.MinTime - ds[0].meta.MinTime%tr
|
|
||||||
dirs := []dirMeta{}
|
|
||||||
|
|
||||||
for _, dir := range ds {
|
for i := 0; i < len(ds); {
|
||||||
if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) {
|
var group []dirMeta
|
||||||
dirs = append(dirs, dir)
|
// Compute start of aligned time range of size tr closest to the current block's start.
|
||||||
continue
|
t0 := ds[i].meta.MinTime - (ds[i].meta.MinTime % tr)
|
||||||
}
|
|
||||||
|
|
||||||
if dir.meta.MinTime >= t0+tr {
|
// Add all dirs to the current group that are within [t0, t0+tr].
|
||||||
splitDirs = append(splitDirs, dirs)
|
for ; i < len(ds); i++ {
|
||||||
dirs = []dirMeta{}
|
if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr {
|
||||||
t0 = dir.meta.MinTime - dir.meta.MinTime%tr
|
break
|
||||||
if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) {
|
|
||||||
dirs = append(dirs, dir)
|
|
||||||
}
|
}
|
||||||
|
group = append(group, ds[i])
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if len(dirs) > 0 {
|
if len(group) > 0 {
|
||||||
splitDirs = append(splitDirs, dirs)
|
splitDirs = append(splitDirs, group)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return splitDirs
|
return splitDirs
|
||||||
|
|
|
@ -1,3 +1,16 @@
|
||||||
|
// Copyright 2017 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
32
db.go
32
db.go
|
@ -45,7 +45,7 @@ import (
|
||||||
var DefaultOptions = &Options{
|
var DefaultOptions = &Options{
|
||||||
WALFlushInterval: 5 * time.Second,
|
WALFlushInterval: 5 * time.Second,
|
||||||
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
||||||
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour), 3, 5),
|
||||||
NoLockfile: false,
|
NoLockfile: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,9 +57,8 @@ type Options struct {
|
||||||
// Duration of persisted data to keep.
|
// Duration of persisted data to keep.
|
||||||
RetentionDuration uint64
|
RetentionDuration uint64
|
||||||
|
|
||||||
// The timestamp range of head blocks after which they get persisted.
|
// The sizes of the Blocks.
|
||||||
// It's the minimum duration of any persisted block.
|
BlockRanges []int64
|
||||||
MinBlockDuration uint64
|
|
||||||
|
|
||||||
// NoLockfile disables creation and consideration of a lock file.
|
// NoLockfile disables creation and consideration of a lock file.
|
||||||
NoLockfile bool
|
NoLockfile bool
|
||||||
|
@ -223,7 +222,24 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
||||||
db.lockf = &lockf
|
db.lockf = &lockf
|
||||||
}
|
}
|
||||||
|
|
||||||
db.compactor = newCompactor(dir, r, l, &compactorOptions{})
|
copts := &compactorOptions{
|
||||||
|
blockRanges: opts.BlockRanges,
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(copts.blockRanges) == 0 {
|
||||||
|
return nil, errors.New("at least one block-range must exist")
|
||||||
|
}
|
||||||
|
|
||||||
|
for float64(copts.blockRanges[len(copts.blockRanges)-1])/float64(opts.RetentionDuration) > 0.2 {
|
||||||
|
if len(copts.blockRanges) == 1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Max overflow is restricted to 20%.
|
||||||
|
copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
db.compactor = newCompactor(dir, r, l, copts)
|
||||||
|
|
||||||
if err := db.reloadBlocks(); err != nil {
|
if err := db.reloadBlocks(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -693,20 +709,20 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
|
||||||
// it is within or after the currently appendable window.
|
// it is within or after the currently appendable window.
|
||||||
func (db *DB) ensureHead(t int64) error {
|
func (db *DB) ensureHead(t int64) error {
|
||||||
var (
|
var (
|
||||||
mint, maxt = rangeForTimestamp(t, int64(db.opts.MinBlockDuration))
|
mint, maxt = rangeForTimestamp(t, int64(db.opts.BlockRanges[0]))
|
||||||
addBuffer = len(db.blocks) == 0
|
addBuffer = len(db.blocks) == 0
|
||||||
last BlockMeta
|
last BlockMeta
|
||||||
)
|
)
|
||||||
|
|
||||||
if !addBuffer {
|
if !addBuffer {
|
||||||
last = db.blocks[len(db.blocks)-1].Meta()
|
last = db.blocks[len(db.blocks)-1].Meta()
|
||||||
addBuffer = last.MaxTime <= mint-int64(db.opts.MinBlockDuration)
|
addBuffer = last.MaxTime <= mint-int64(db.opts.BlockRanges[0])
|
||||||
}
|
}
|
||||||
// Create another block of buffer in front if the DB is initialized or retrieving
|
// Create another block of buffer in front if the DB is initialized or retrieving
|
||||||
// new data after a long gap.
|
// new data after a long gap.
|
||||||
// This ensures we always have a full block width if append window.
|
// This ensures we always have a full block width if append window.
|
||||||
if addBuffer {
|
if addBuffer {
|
||||||
if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil {
|
if _, err := db.createHeadBlock(mint-int64(db.opts.BlockRanges[0]), mint); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// If the previous block reaches into our new window, make it smaller.
|
// If the previous block reaches into our new window, make it smaller.
|
||||||
|
|
Loading…
Reference in a new issue