diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 97280ffe4..dbe92f3df 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -113,8 +113,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ WALFlushInterval: 200 * time.Millisecond, RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds - MinBlockDuration: 3 * 60 * 60 * 1000, // 3 hours in milliseconds - MaxBlockDuration: 27 * 60 * 60 * 1000, // 1 days in milliseconds + BlockRanges: tsdb.ExponentialBlockRanges(int64(2*time.Hour), 3, 5), }) if err != nil { exitWithError(err) diff --git a/compact.go b/compact.go index dd66bfd96..6158f2d67 100644 --- a/compact.go +++ b/compact.go @@ -30,6 +30,18 @@ import ( "github.com/prometheus/tsdb/labels" ) +// ExponentialBlockRanges returns the time ranges based on the stepSize +func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 { + ranges := make([]int64, 0, steps) + curRange := minSize + for i := 0; i < steps; i++ { + ranges = append(ranges, curRange) + curRange = curRange * int64(stepSize) + } + + return ranges +} + // Compactor provides compaction against an underlying storage // of time series data. type Compactor interface { @@ -91,15 +103,6 @@ type compactorOptions struct { } func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { - if opts.blockRanges == nil { - opts.blockRanges = []int64{ - int64(2 * time.Hour), - int64(6 * time.Hour), - int64(24 * time.Hour), - int64(72 * time.Hour), // 3d - int64(216 * time.Hour), // 9d - } - } return &compactor{ dir: dir, opts: opts, @@ -161,6 +164,11 @@ func (c *compactor) Plan() ([][]string, error) { } func (c *compactor) selectDirs(ds []dirMeta) []dirMeta { + // The way to skip compaction is to not have blockRanges. + if len(c.opts.blockRanges) == 1 { + return nil + } + return selectRecurse(ds, c.opts.blockRanges) } @@ -184,6 +192,8 @@ func selectRecurse(dms []dirMeta, intervals []int64) []dirMeta { // If there are too many blocks, see if a smaller interval will catch them. // i.e, if we have 0-20, 60-80, 80-100; all fall under 0-240, but we'd rather compact 60-100 // than all at once. + // Again if have 0-1d, 1d-2d, 3-6d we compact 0-1d, 1d-2d to compact it into the 0-3d block instead of compacting all three + // This is to honor the boundaries as much as possible. if len(dirs) > 2 { smallerDirs := selectRecurse(dirs, intervals[:len(intervals)-1]) if len(smallerDirs) > 1 { @@ -194,29 +204,28 @@ func selectRecurse(dms []dirMeta, intervals []int64) []dirMeta { return dirs } +// splitByRange splits the directories by the time range. +// for example if we have blocks 0-10, 10-20, 50-60, 90-100 and want to split them into 30 interval ranges +// splitByRange returns [0-10, 10-20], [50-60], [90-100]. func splitByRange(ds []dirMeta, tr int64) [][]dirMeta { - splitDirs := [][]dirMeta{} - t0 := ds[0].meta.MinTime - ds[0].meta.MinTime%tr - dirs := []dirMeta{} + var splitDirs [][]dirMeta - for _, dir := range ds { - if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) { - dirs = append(dirs, dir) - continue - } + for i := 0; i < len(ds); { + var group []dirMeta + // Compute start of aligned time range of size tr closest to the current block's start. + t0 := ds[i].meta.MinTime - (ds[i].meta.MinTime % tr) - if dir.meta.MinTime >= t0+tr { - splitDirs = append(splitDirs, dirs) - dirs = []dirMeta{} - t0 = dir.meta.MinTime - dir.meta.MinTime%tr - if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) { - dirs = append(dirs, dir) + // Add all dirs to the current group that are within [t0, t0+tr]. + for ; i < len(ds); i++ { + if ds[i].meta.MinTime < t0 || ds[i].meta.MaxTime > t0+tr { + break } + group = append(group, ds[i]) } - } - if len(dirs) > 0 { - splitDirs = append(splitDirs, dirs) + if len(group) > 0 { + splitDirs = append(splitDirs, group) + } } return splitDirs diff --git a/compact_test.go b/compact_test.go index cfe2494b0..07d3f6d63 100644 --- a/compact_test.go +++ b/compact_test.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( diff --git a/db.go b/db.go index 9bcd688c8..0f12e86f7 100644 --- a/db.go +++ b/db.go @@ -45,7 +45,7 @@ import ( var DefaultOptions = &Options{ WALFlushInterval: 5 * time.Second, RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds - MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds + BlockRanges: ExponentialBlockRanges(int64(2*time.Hour), 3, 5), NoLockfile: false, } @@ -57,9 +57,8 @@ type Options struct { // Duration of persisted data to keep. RetentionDuration uint64 - // The timestamp range of head blocks after which they get persisted. - // It's the minimum duration of any persisted block. - MinBlockDuration uint64 + // The sizes of the Blocks. + BlockRanges []int64 // NoLockfile disables creation and consideration of a lock file. NoLockfile bool @@ -223,7 +222,24 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = &lockf } - db.compactor = newCompactor(dir, r, l, &compactorOptions{}) + copts := &compactorOptions{ + blockRanges: opts.BlockRanges, + } + + if len(copts.blockRanges) == 0 { + return nil, errors.New("at least one block-range must exist") + } + + for float64(copts.blockRanges[len(copts.blockRanges)-1])/float64(opts.RetentionDuration) > 0.2 { + if len(copts.blockRanges) == 1 { + break + } + + // Max overflow is restricted to 20%. + copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1] + } + + db.compactor = newCompactor(dir, r, l, copts) if err := db.reloadBlocks(); err != nil { return nil, err @@ -693,20 +709,20 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { // it is within or after the currently appendable window. func (db *DB) ensureHead(t int64) error { var ( - mint, maxt = rangeForTimestamp(t, int64(db.opts.MinBlockDuration)) + mint, maxt = rangeForTimestamp(t, int64(db.opts.BlockRanges[0])) addBuffer = len(db.blocks) == 0 last BlockMeta ) if !addBuffer { last = db.blocks[len(db.blocks)-1].Meta() - addBuffer = last.MaxTime <= mint-int64(db.opts.MinBlockDuration) + addBuffer = last.MaxTime <= mint-int64(db.opts.BlockRanges[0]) } // Create another block of buffer in front if the DB is initialized or retrieving // new data after a long gap. // This ensures we always have a full block width if append window. if addBuffer { - if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil { + if _, err := db.createHeadBlock(mint-int64(db.opts.BlockRanges[0]), mint); err != nil { return err } // If the previous block reaches into our new window, make it smaller.