Merge pull request #213 from Gouthamve/close-retention-blocks

Close the retention blocks before deleting them.
This commit is contained in:
Goutham Veeramachaneni 2017-11-23 19:30:51 +05:30 committed by GitHub
commit 17dd4f8f6c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 16 deletions

36
db.go
View file

@ -277,16 +277,23 @@ func (db *DB) retentionCutoff() (bool, error) {
} }
db.mtx.RLock() db.mtx.RLock()
defer db.mtx.RUnlock() blocks := db.blocks[:]
db.mtx.RUnlock()
if len(db.blocks) == 0 { if len(blocks) == 0 {
return false, nil return false, nil
} }
last := db.blocks[len(db.blocks)-1] last := blocks[len(db.blocks)-1]
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
return retentionCutoff(db.dir, mint) mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
dirs, err := retentionCutoffDirs(db.dir, mint)
if err != nil {
return false, err
}
// This will close the dirs and then delete the dirs.
return len(dirs) > 0, db.reload(dirs...)
} }
// Appender opens a new appender against the database. // Appender opens a new appender against the database.
@ -388,40 +395,37 @@ func (db *DB) compact() (changes bool, err error) {
return changes, nil return changes, nil
} }
// retentionCutoff deletes all directories of blocks in dir that are strictly // retentionCutoffDirs returns all directories of blocks in dir that are strictly
// before mint. // before mint.
func retentionCutoff(dir string, mint int64) (bool, error) { func retentionCutoffDirs(dir string, mint int64) ([]string, error) {
df, err := fileutil.OpenDir(dir) df, err := fileutil.OpenDir(dir)
if err != nil { if err != nil {
return false, errors.Wrapf(err, "open directory") return nil, errors.Wrapf(err, "open directory")
} }
defer df.Close() defer df.Close()
dirs, err := blockDirs(dir) dirs, err := blockDirs(dir)
if err != nil { if err != nil {
return false, errors.Wrapf(err, "list block dirs %s", dir) return nil, errors.Wrapf(err, "list block dirs %s", dir)
} }
changes := false delDirs := []string{}
for _, dir := range dirs { for _, dir := range dirs {
meta, err := readMetaFile(dir) meta, err := readMetaFile(dir)
if err != nil { if err != nil {
return changes, errors.Wrapf(err, "read block meta %s", dir) return nil, errors.Wrapf(err, "read block meta %s", dir)
} }
// The first block we encounter marks that we crossed the boundary // The first block we encounter marks that we crossed the boundary
// of deletable blocks. // of deletable blocks.
if meta.MaxTime >= mint { if meta.MaxTime >= mint {
break break
} }
changes = true
if err := os.RemoveAll(dir); err != nil { delDirs = append(delDirs, dir)
return changes, err
}
} }
return changes, fileutil.Fsync(df) return delDirs, nil
} }
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {

View file

@ -556,3 +556,60 @@ func TestWALFlushedOnDBClose(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, values, []string{"labelvalue"}) require.Equal(t, values, []string{"labelvalue"})
} }
func TestDB_Retention(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir)
db, err := Open(tmpdir, nil, nil, nil)
require.NoError(t, err)
lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}}
app := db.Appender()
_, err = app.Add(lbls, 0, 1)
require.NoError(t, err)
require.NoError(t, app.Commit())
// create snapshot to make it create a block.
// TODO(gouthamve): Add a method to compact headblock.
snap, err := ioutil.TempDir("", "snap")
require.NoError(t, err)
require.NoError(t, db.Snapshot(snap))
require.NoError(t, db.Close())
defer os.RemoveAll(snap)
// reopen DB from snapshot
db, err = Open(snap, nil, nil, nil)
require.NoError(t, err)
Equals(t, 1, len(db.blocks))
app = db.Appender()
_, err = app.Add(lbls, 100, 1)
require.NoError(t, err)
require.NoError(t, app.Commit())
// Snapshot again to create another block.
snap, err = ioutil.TempDir("", "snap")
require.NoError(t, err)
require.NoError(t, db.Snapshot(snap))
require.NoError(t, db.Close())
defer os.RemoveAll(snap)
// reopen DB from snapshot
db, err = Open(snap, nil, nil, &Options{
RetentionDuration: 10,
BlockRanges: []int64{50},
})
require.NoError(t, err)
Equals(t, 2, len(db.blocks))
// Now call rentention.
changes, err := db.retentionCutoff()
Ok(t, err)
Assert(t, changes, "there should be changes")
Equals(t, 1, len(db.blocks))
Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block.
}