Properly cleanup compacted dirs, fixes, docs

This commit is contained in:
Fabian Reinartz 2017-03-02 14:32:09 +01:00
parent 2c3e778d90
commit 92120448c2
3 changed files with 62 additions and 27 deletions

View file

@ -13,16 +13,24 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
// Compactor provides compaction against an underlying storage
// of time series data.
type Compactor interface {
Plan() ([][]string, error)
// Plan returns a set of non-overlapping directories that can
// be compacted concurrently.
// Results returned when compactions are in progress are undefined.
Plan(dir string) ([][]string, error)
Write(dir string, bs ...Block) error
// Write persists a Block into a directory.
Write(dir string, b Block) error
// Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan().
Compact(dirs ...string) error
}
// compactor implements the Compactor interface.
type compactor struct {
dir string
metrics *compactorMetrics
opts *compactorOptions
}
@ -63,9 +71,8 @@ type compactorOptions struct {
maxBlockRange uint64
}
func newCompactor(dir string, r prometheus.Registerer, opts *compactorOptions) *compactor {
func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor {
return &compactor{
dir: dir,
opts: opts,
metrics: newCompactorMetrics(r),
}
@ -79,8 +86,8 @@ type compactionInfo struct {
const compactionBlocksLen = 3
func (c *compactor) Plan() ([][]string, error) {
dirs, err := blockDirs(c.dir)
func (c *compactor) Plan(dir string) ([][]string, error) {
dirs, err := blockDirs(dir)
if err != nil {
return nil, err
}
@ -150,6 +157,7 @@ func mergeBlockMetas(blocks ...Block) (res BlockMeta) {
func (c *compactor) Compact(dirs ...string) (err error) {
var blocks []Block
for _, d := range dirs {
b, err := newPersistedBlock(d)
if err != nil {
@ -158,10 +166,16 @@ func (c *compactor) Compact(dirs ...string) (err error) {
blocks = append(blocks, b)
}
return c.Write(dirs[0], blocks...)
return c.write(dirs[0], blocks...)
}
func (c *compactor) Write(dir string, blocks ...Block) (err error) {
func (c *compactor) Write(dir string, b Block) error {
return c.write(dir, b)
}
// write creates a new block that is the union of the provided blocks into dir.
// It cleans up all files of the old blocks after completing successfully.
func (c *compactor) write(dir string, blocks ...Block) (err error) {
defer func(t time.Time) {
if err != nil {
c.metrics.failed.Inc()
@ -169,26 +183,32 @@ func (c *compactor) Write(dir string, blocks ...Block) (err error) {
c.metrics.duration.Observe(time.Since(t).Seconds())
}(time.Now())
if err = os.RemoveAll(dir); err != nil {
tmp := dir + ".tmp"
if err = os.RemoveAll(tmp); err != nil {
return err
}
if err = os.MkdirAll(dir, 0777); err != nil {
if err = os.MkdirAll(tmp, 0777); err != nil {
return err
}
chunkw, err := newChunkWriter(chunkDir(dir))
chunkw, err := newChunkWriter(chunkDir(tmp))
if err != nil {
return errors.Wrap(err, "open chunk writer")
}
indexw, err := newIndexWriter(dir)
indexw, err := newIndexWriter(tmp)
if err != nil {
return errors.Wrap(err, "open index writer")
}
if err = c.write(dir, blocks, indexw, chunkw); err != nil {
meta, err := c.populate(blocks, indexw, chunkw)
if err != nil {
return errors.Wrap(err, "write compaction")
}
if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}
if err = chunkw.Close(); err != nil {
return errors.Wrap(err, "close chunk writer")
@ -196,16 +216,28 @@ func (c *compactor) Write(dir string, blocks ...Block) (err error) {
if err = indexw.Close(); err != nil {
return errors.Wrap(err, "close index writer")
}
if err := renameFile(tmp, dir); err != nil {
return errors.Wrap(err, "rename block dir")
}
for _, b := range blocks[1:] {
if err := os.RemoveAll(b.Dir()); err != nil {
return err
}
}
return nil
}
func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw ChunkWriter) error {
// populate fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
var set compactionSet
for i, b := range blocks {
all, err := b.Index().Postings("", "")
if err != nil {
return err
return nil, err
}
// TODO(fabxc): find more transparent way of handling this.
if hb, ok := b.(*headBlock); ok {
@ -219,7 +251,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw
}
set, err = newCompactionMerger(set, s)
if err != nil {
return err
return nil, err
}
}
@ -234,7 +266,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw
for set.Next() {
lset, chunks := set.At()
if err := chunkw.WriteChunks(chunks...); err != nil {
return err
return nil, err
}
indexw.AddSeries(i, lset, chunks...)
@ -255,7 +287,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw
i++
}
if set.Err() != nil {
return set.Err()
return nil, set.Err()
}
s := make([]string, 0, 256)
@ -266,13 +298,13 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw
s = append(s, x)
}
if err := indexw.WriteLabelIndex([]string{n}, s); err != nil {
return err
return nil, err
}
}
for t := range postings.m {
if err := indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil {
return err
return nil, err
}
}
// Write a postings list containing all series.
@ -281,10 +313,10 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw
all[i] = uint32(i)
}
if err := indexw.WritePostings("", "", newListPostings(all)); err != nil {
return err
return nil, err
}
return writeMetaFile(dir, &meta)
return &meta, nil
}
type compactionSet interface {

6
db.go
View file

@ -172,7 +172,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
donec: make(chan struct{}),
stopc: make(chan struct{}),
}
db.compactor = newCompactor(dir, r, &compactorOptions{
db.compactor = newCompactor(r, &compactorOptions{
maxBlockRange: opts.MaxBlockDuration,
})
@ -245,7 +245,7 @@ func (db *DB) compact() error {
// Check for compactions of multiple blocks.
for {
plans, err := db.compactor.Plan()
plans, err := db.compactor.Plan(db.dir)
if err != nil {
return errors.Wrap(err, "plan compaction")
}
@ -363,7 +363,7 @@ func (db *DB) reloadBlocks() error {
for seq, b := range db.seqBlocks {
if _, ok := seqBlocks[seq]; !ok {
if err := b.Close(); err != nil {
return err
return errors.Wrapf(err, "closing removed block %d", b.Meta().Sequence)
}
}
}

View file

@ -149,8 +149,11 @@ func (h *headBlock) Close() error {
return err
}
// Check whether the head block still exists in the underlying dir
// or has already been replaced with a compacted version
// or has already been replaced with a compacted version or removed.
meta, err := readMetaFile(h.dir)
if os.IsNotExist(err) {
return nil
}
if err != nil {
return err
}