mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
commit
357e33bd1e
46
block.go
46
block.go
|
@ -1,4 +1,5 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
|
@ -52,6 +53,7 @@ type DiskBlock interface {
|
|||
type Block interface {
|
||||
DiskBlock
|
||||
Queryable
|
||||
Snapshottable
|
||||
}
|
||||
|
||||
// headBlock is a regular block that can still be appended to.
|
||||
|
@ -60,6 +62,11 @@ type headBlock interface {
|
|||
Appendable
|
||||
}
|
||||
|
||||
// Snapshottable defines an entity that can be backedup online.
|
||||
type Snapshottable interface {
|
||||
Snapshot(dir string) error
|
||||
}
|
||||
|
||||
// Appendable defines an entity to which data can be appended.
|
||||
type Appendable interface {
|
||||
// Appender returns a new Appender against an underlying store.
|
||||
|
@ -272,6 +279,45 @@ Outer:
|
|||
return writeMetaFile(pb.dir, &pb.meta)
|
||||
}
|
||||
|
||||
func (pb *persistedBlock) Snapshot(dir string) error {
|
||||
blockDir := filepath.Join(dir, pb.meta.ULID.String())
|
||||
if err := os.MkdirAll(blockDir, 0777); err != nil {
|
||||
return errors.Wrap(err, "create snapshot block dir")
|
||||
}
|
||||
|
||||
chunksDir := chunkDir(blockDir)
|
||||
if err := os.MkdirAll(chunksDir, 0777); err != nil {
|
||||
return errors.Wrap(err, "create snapshot chunk dir")
|
||||
}
|
||||
|
||||
// Hardlink meta, index and tombstones
|
||||
for _, fname := range []string{
|
||||
metaFilename,
|
||||
indexFilename,
|
||||
tombstoneFilename,
|
||||
} {
|
||||
if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil {
|
||||
return errors.Wrapf(err, "create snapshot %s", fname)
|
||||
}
|
||||
}
|
||||
|
||||
// Hardlink the chunks
|
||||
curChunkDir := chunkDir(pb.dir)
|
||||
files, err := ioutil.ReadDir(curChunkDir)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "ReadDir the current chunk dir")
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
err := os.Link(filepath.Join(curChunkDir, f.Name()), filepath.Join(chunksDir, f.Name()))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "hardlink a chunk")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
|
||||
func walDir(dir string) string { return filepath.Join(dir, "wal") }
|
||||
|
||||
|
|
|
@ -246,7 +246,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
|||
return errors.Wrap(err, "open index writer")
|
||||
}
|
||||
|
||||
meta, err := c.populate(blocks, indexw, chunkw)
|
||||
meta, err := populateBlock(blocks, indexw, chunkw)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "write compaction")
|
||||
}
|
||||
|
@ -289,9 +289,9 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
// populate fills the index and chunk writers with new data gathered as the union
|
||||
// populateBlock fills the index and chunk writers with new data gathered as the union
|
||||
// of the provided blocks. It returns meta information for the new block.
|
||||
func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
|
||||
func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
|
||||
var set compactionSet
|
||||
|
||||
for i, b := range blocks {
|
||||
|
|
53
db.go
53
db.go
|
@ -121,7 +121,8 @@ type DB struct {
|
|||
stopc chan struct{}
|
||||
|
||||
// cmtx is used to control compactions and deletions.
|
||||
cmtx sync.Mutex
|
||||
cmtx sync.Mutex
|
||||
compacting bool
|
||||
}
|
||||
|
||||
type dbMetrics struct {
|
||||
|
@ -200,12 +201,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
}
|
||||
|
||||
db = &DB{
|
||||
dir: dir,
|
||||
logger: l,
|
||||
opts: opts,
|
||||
compactc: make(chan struct{}, 1),
|
||||
donec: make(chan struct{}),
|
||||
stopc: make(chan struct{}),
|
||||
dir: dir,
|
||||
logger: l,
|
||||
opts: opts,
|
||||
compactc: make(chan struct{}, 1),
|
||||
donec: make(chan struct{}),
|
||||
stopc: make(chan struct{}),
|
||||
compacting: true,
|
||||
}
|
||||
db.metrics = newDBMetrics(db, r)
|
||||
|
||||
|
@ -528,6 +530,43 @@ func (db *DB) Close() error {
|
|||
return merr.Err()
|
||||
}
|
||||
|
||||
// DisableCompactions disables compactions.
|
||||
func (db *DB) DisableCompactions() {
|
||||
if db.compacting {
|
||||
db.cmtx.Lock()
|
||||
db.compacting = false
|
||||
db.logger.Log("msg", "compactions disabled")
|
||||
}
|
||||
}
|
||||
|
||||
// EnableCompactions enables compactions.
|
||||
func (db *DB) EnableCompactions() {
|
||||
if !db.compacting {
|
||||
db.cmtx.Unlock()
|
||||
db.compacting = true
|
||||
db.logger.Log("msg", "compactions enabled")
|
||||
}
|
||||
}
|
||||
|
||||
// Snapshot writes the current data to the directory.
|
||||
func (db *DB) Snapshot(dir string) error {
|
||||
db.mtx.Lock() // To block any appenders.
|
||||
defer db.mtx.Unlock()
|
||||
|
||||
db.cmtx.Lock()
|
||||
defer db.cmtx.Unlock()
|
||||
|
||||
blocks := db.blocks[:]
|
||||
for _, b := range blocks {
|
||||
db.logger.Log("msg", "snapshotting block", "block", b)
|
||||
if err := b.Snapshot(dir); err != nil {
|
||||
return errors.Wrap(err, "error snapshotting headblock")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Appender returns a new Appender on the database.
|
||||
func (db *DB) Appender() Appender {
|
||||
db.metrics.activeAppenders.Inc()
|
||||
|
|
64
head.go
64
head.go
|
@ -262,6 +262,70 @@ Outer:
|
|||
return nil
|
||||
}
|
||||
|
||||
// Snapshot persists the current state of the headblock to the given directory.
|
||||
// TODO(gouthamve): Snapshot must be called when there are no active appenders.
|
||||
// This has been ensured by acquiring a Lock on DB.mtx, but this limitation should
|
||||
// be removed in the future.
|
||||
func (h *HeadBlock) Snapshot(snapshotDir string) error {
|
||||
if h.meta.Stats.NumSeries == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
uid := ulid.MustNew(ulid.Now(), entropy)
|
||||
|
||||
dir := filepath.Join(snapshotDir, uid.String())
|
||||
tmp := dir + ".tmp"
|
||||
|
||||
if err := os.RemoveAll(tmp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(tmp, 0777); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Populate chunk and index files into temporary directory with
|
||||
// data of all blocks.
|
||||
chunkw, err := newChunkWriter(chunkDir(tmp))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open chunk writer")
|
||||
}
|
||||
indexw, err := newIndexWriter(tmp)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "open index writer")
|
||||
}
|
||||
|
||||
meta, err := populateBlock([]Block{h}, indexw, chunkw)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "write snapshot")
|
||||
}
|
||||
meta.ULID = uid
|
||||
|
||||
if err = writeMetaFile(tmp, meta); err != nil {
|
||||
return errors.Wrap(err, "write merged meta")
|
||||
}
|
||||
|
||||
if err = chunkw.Close(); err != nil {
|
||||
return errors.Wrap(err, "close chunk writer")
|
||||
}
|
||||
if err = indexw.Close(); err != nil {
|
||||
return errors.Wrap(err, "close index writer")
|
||||
}
|
||||
|
||||
// Create an empty tombstones file.
|
||||
if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil {
|
||||
return errors.Wrap(err, "write new tombstones file")
|
||||
}
|
||||
|
||||
// Block successfully written, make visible
|
||||
if err := renameFile(tmp, dir); err != nil {
|
||||
return errors.Wrap(err, "rename block dir")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Dir returns the directory of the block.
|
||||
func (h *HeadBlock) Dir() string { return h.dir }
|
||||
|
||||
|
|
4
index.go
4
index.go
|
@ -39,6 +39,8 @@ const (
|
|||
indexFormatV1 = 1
|
||||
)
|
||||
|
||||
const indexFilename = "index"
|
||||
|
||||
const compactionPageBytes = minSectorSize * 64
|
||||
|
||||
type indexWriterSeries struct {
|
||||
|
@ -138,7 +140,7 @@ func newIndexWriter(dir string) (*indexWriter, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_CREATE|os.O_WRONLY, 0666)
|
||||
f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue