Initial implementation of HeadBlock Snapshots

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-06-05 13:48:31 +05:30
parent 3a5ae6b1a4
commit a1c8425357
No known key found for this signature in database
GPG key ID: F1C217E8E9023CAD
4 changed files with 102 additions and 3 deletions

View file

@ -58,6 +58,12 @@ type Block interface {
type headBlock interface {
Block
Appendable
Snapshottable
}
// Snapshottable defines an entity that can be backedup online.
type Snapshottable interface {
Snapshot(dir string) error
}
// Appendable defines an entity to which data can be appended.

View file

@ -246,7 +246,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
return errors.Wrap(err, "open index writer")
}
meta, err := c.populate(blocks, indexw, chunkw)
meta, err := populateBlock(blocks, indexw, chunkw)
if err != nil {
return errors.Wrap(err, "write compaction")
}
@ -289,9 +289,9 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
return nil
}
// populate fills the index and chunk writers with new data gathered as the union
// populateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
var set compactionSet
for i, b := range blocks {

28
db.go
View file

@ -528,6 +528,34 @@ func (db *DB) Close() error {
return merr.Err()
}
// DisableCompactions disables compactions.
func (db *DB) DisableCompactions() error {
db.stopc <- struct{}{} // TODO: Can this block?
db.cmtx.Lock()
return nil
}
// EnableCompactions enables compactions.
func (db *DB) EnableCompactions() error {
db.cmtx.Unlock()
return nil
}
// Snapshot writes the current headBlock snapshots to snapshots directory.
func (db *DB) Snapshot(dir string) error {
db.headmtx.RLock()
heads := db.heads[:]
db.headmtx.RUnlock()
for _, h := range heads {
if err := h.Snapshot(dir); err != nil {
return errors.Wrap(err, "error snapshotting headblock")
}
}
return nil
}
// Appender returns a new Appender on the database.
func (db *DB) Appender() Appender {
db.metrics.activeAppenders.Inc()

65
head.go
View file

@ -262,6 +262,71 @@ Outer:
return nil
}
// Snapshot persists the current state of the headblock to the given directory.
func (h *HeadBlock) Snapshot(snapshotDir string) error {
// Needed to stop any appenders.
h.mtx.Lock()
defer h.mtx.Unlock()
if h.meta.Stats.NumSeries == 0 {
return nil
}
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
dir := filepath.Join(snapshotDir, uid.String())
tmp := dir + ".tmp"
if err := os.RemoveAll(tmp); err != nil {
return err
}
if err := os.MkdirAll(tmp, 0777); err != nil {
return err
}
// Populate chunk and index files into temporary directory with
// data of all blocks.
chunkw, err := newChunkWriter(chunkDir(tmp))
if err != nil {
return errors.Wrap(err, "open chunk writer")
}
indexw, err := newIndexWriter(tmp)
if err != nil {
return errors.Wrap(err, "open index writer")
}
meta, err := populateBlock([]Block{h}, indexw, chunkw)
if err != nil {
return errors.Wrap(err, "write snapshot")
}
meta.ULID = uid
if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}
if err = chunkw.Close(); err != nil {
return errors.Wrap(err, "close chunk writer")
}
if err = indexw.Close(); err != nil {
return errors.Wrap(err, "close index writer")
}
// Create an empty tombstones file.
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
return errors.Wrap(err, "write new tombstones file")
}
// Block successfully written, make visible
if err := renameFile(tmp, dir); err != nil {
return errors.Wrap(err, "rename block dir")
}
return nil
}
// Dir returns the directory of the block.
func (h *HeadBlock) Dir() string { return h.dir }