From 9c76624df24e5f5286fc01f8c74e284af9ce2a9f Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 9 Feb 2017 17:54:26 -0800 Subject: [PATCH] Add initial retention cutoff --- cmd/tsdb/main.go | 10 ++++++++-- db.go | 50 +++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 49 insertions(+), 11 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 4a838664b..03ec0f789 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -90,7 +90,13 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { dir := filepath.Join(b.outPath, "storage") - st, err := tsdb.OpenPartitioned(dir, 1, nil, nil) + st, err := tsdb.OpenPartitioned(dir, 1, nil, &tsdb.Options{ + WALFlushInterval: 5 * time.Second, + RetentionDuration: 1 * 24 * 60 * 60 * 1000, // 1 days in milliseconds + MinBlockDuration: 2 * 60 * 60 * 1000, // 2 hours in milliseconds + MaxBlockDuration: 16 * 60 * 60 * 1000, // 1 days in milliseconds + AppendableBlocks: 2, + }) if err != nil { exitWithError(err) } @@ -122,7 +128,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { dur := measureTime("ingestScrapes", func() { b.startProfiling() - total, err = b.ingestScrapes(metrics, 3000) + total, err = b.ingestScrapes(metrics, 4000) if err != nil { exitWithError(err) } diff --git a/db.go b/db.go index 72b82c778..261a79629 100644 --- a/db.go +++ b/db.go @@ -28,10 +28,11 @@ import ( // DefaultOptions used for the DB. They are sane for setups using // millisecond precision timestampdb. var DefaultOptions = &Options{ - WALFlushInterval: 5 * time.Second, - MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds - MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds - AppendableBlocks: 2, + WALFlushInterval: 5 * time.Second, + RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds + MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds + MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds + AppendableBlocks: 2, } // Options of the DB storage. @@ -39,6 +40,9 @@ type Options struct { // The interval at which the write ahead log is flushed to disc. WALFlushInterval time.Duration + // Duration of persisted data to keep. + RetentionDuration uint64 + // The timestamp range of head blocks after which they get persisted. // It's the minimum duration of any persisted block. MinBlockDuration uint64 @@ -284,6 +288,34 @@ func (db *DB) compact(i, j int) error { return errors.Wrap(err, "removing old block") } } + return db.retentionCutoff() +} + +func (db *DB) retentionCutoff() error { + h := db.heads[len(db.heads)-1] + t := h.meta.MinTime - int64(db.opts.RetentionDuration) + + var ( + blocks = db.blocks() + i int + b Block + ) + for i, b = range blocks { + if b.Meta().MinTime >= t { + break + } + } + if i <= 1 { + return nil + } + db.logger.Log("msg", "retention cutoff", "idx", i-1) + db.removeBlocks(0, i) + + for _, b := range blocks[:i] { + if err := os.RemoveAll(b.Dir()); err != nil { + return errors.Wrap(err, "removing old block") + } + } return nil } @@ -566,7 +598,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { func (db *DB) cut(mint int64) (*headBlock, error) { maxt := mint + int64(db.opts.MinBlockDuration) - dir, seq, err := nextBlockDir(db.dir) + dir, seq, err := nextSequenceDir(db.dir, "b-") if err != nil { return nil, err } @@ -616,7 +648,7 @@ func blockDirs(dir string) ([]string, error) { return dirs, nil } -func nextBlockDir(dir string) (string, int, error) { +func nextSequenceDir(dir, prefix string) (string, int, error) { names, err := fileutil.ReadDir(dir) if err != nil { return "", 0, err @@ -624,16 +656,16 @@ func nextBlockDir(dir string) (string, int, error) { i := uint64(0) for _, n := range names { - if !strings.HasPrefix(n, "b-") { + if !strings.HasPrefix(n, prefix) { continue } - j, err := strconv.ParseUint(n[2:], 10, 32) + j, err := strconv.ParseUint(n[len(prefix):], 10, 32) if err != nil { continue } i = j } - return filepath.Join(dir, fmt.Sprintf("b-%0.6d", i+1)), int(i + 1), nil + return filepath.Join(dir, fmt.Sprintf("%s%0.6d", prefix, i+1)), int(i + 1), nil } // PartitionedDB is a time series storage.