From d0770302ed94ea6463fd97875f835f77ac0bfb0a Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 17 Mar 2017 15:30:05 +0100 Subject: [PATCH] Add retention deletion --- db.go | 88 +++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 55 insertions(+), 33 deletions(-) diff --git a/db.go b/db.go index 688605517..3f1d9c766 100644 --- a/db.go +++ b/db.go @@ -207,10 +207,13 @@ func (db *DB) run() { var merr MultiError - changes, err := db.compact() + changes1, err := db.retentionCutoff() merr.Add(err) - if changes { + changes2, err := db.compact() + merr.Add(err) + + if changes1 || changes2 { merr.Add(db.reloadBlocks()) } if err := merr.Err(); err != nil { @@ -223,11 +226,31 @@ func (db *DB) run() { } } -func (db *DB) compact() (changes bool, err error) { - // Check whether we have pending head blocks that are ready to be persisted. - // They have the highest priority. +func (db *DB) retentionCutoff() (bool, error) { db.headmtx.RLock() + if db.opts.RetentionDuration == 0 { + return false, nil + } + // We don't count the span covered by head blocks towards the + // retention time as it generally makes up a fraction of it. + if len(db.persisted) == 0 { + return false, nil + } + + last := db.persisted[len(db.persisted)-1] + mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) + + db.headmtx.RUnlock() + + return retentionCutoff(db.dir, mint) +} + +func (db *DB) compact() (changes bool, err error) { + db.headmtx.RLock() + + // Check whether we have pending head blocks that are ready to be persisted. + // They have the highest priority. var singles []*headBlock // Collect head blocks that are ready for compaction. Write them after @@ -297,36 +320,35 @@ Loop: return changes, nil } -// func (db *DB) retentionCutoff() error { -// if db.opts.RetentionDuration == 0 { -// return nil -// } -// h := db.heads[len(db.heads)-1] -// t := h.meta.MinTime - int64(db.opts.RetentionDuration) +// retentionCutoff deletes all directories of blocks in dir that are strictly +// before mint. +func retentionCutoff(dir string, mint int64) (bool, error) { + dirs, err := blockDirs(dir) + if err != nil { + return false, errors.Wrapf(err, "list block dirs %s", dir) + } -// var ( -// blocks = db.blocks() -// i int -// b Block -// ) -// for i, b = range blocks { -// if b.Meta().MinTime >= t { -// break -// } -// } -// if i <= 1 { -// return nil -// } -// db.logger.Log("msg", "retention cutoff", "idx", i-1) -// db.removeBlocks(0, i) + changes := false -// for _, b := range blocks[:i] { -// if err := os.RemoveAll(b.Dir()); err != nil { -// return errors.Wrap(err, "removing old block") -// } -// } -// return nil -// } + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + return changes, errors.Wrapf(err, "read block meta %s", dir) + } + // The first block we encounter marks that we crossed the boundary + // of deletable blocks. + if meta.MaxTime >= mint { + break + } + changes = true + + if err := os.RemoveAll(dir); err != nil { + return changes, err + } + } + + return changes, nil +} func (db *DB) reloadBlocks() error { var cs []io.Closer