diff --git a/compact.go b/compact.go index af6ab72df..fe34174a7 100644 --- a/compact.go +++ b/compact.go @@ -13,16 +13,24 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// Compactor provides compaction against an underlying storage +// of time series data. type Compactor interface { - Plan() ([][]string, error) + // Plan returns a set of non-overlapping directories that can + // be compacted concurrently. + // Results returned when compactions are in progress are undefined. + Plan(dir string) ([][]string, error) - Write(dir string, bs ...Block) error + // Write persists a Block into a directory. + Write(dir string, b Block) error + // Compact runs compaction against the provided directories. Must + // only be called concurrently with results of Plan(). Compact(dirs ...string) error } +// compactor implements the Compactor interface. type compactor struct { - dir string metrics *compactorMetrics opts *compactorOptions } @@ -63,9 +71,8 @@ type compactorOptions struct { maxBlockRange uint64 } -func newCompactor(dir string, r prometheus.Registerer, opts *compactorOptions) *compactor { +func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor { return &compactor{ - dir: dir, opts: opts, metrics: newCompactorMetrics(r), } @@ -79,8 +86,8 @@ type compactionInfo struct { const compactionBlocksLen = 3 -func (c *compactor) Plan() ([][]string, error) { - dirs, err := blockDirs(c.dir) +func (c *compactor) Plan(dir string) ([][]string, error) { + dirs, err := blockDirs(dir) if err != nil { return nil, err } @@ -150,6 +157,7 @@ func mergeBlockMetas(blocks ...Block) (res BlockMeta) { func (c *compactor) Compact(dirs ...string) (err error) { var blocks []Block + for _, d := range dirs { b, err := newPersistedBlock(d) if err != nil { @@ -158,10 +166,16 @@ func (c *compactor) Compact(dirs ...string) (err error) { blocks = append(blocks, b) } - return c.Write(dirs[0], blocks...) + return c.write(dirs[0], blocks...) } -func (c *compactor) Write(dir string, blocks ...Block) (err error) { +func (c *compactor) Write(dir string, b Block) error { + return c.write(dir, b) +} + +// write creates a new block that is the union of the provided blocks into dir. +// It cleans up all files of the old blocks after completing successfully. +func (c *compactor) write(dir string, blocks ...Block) (err error) { defer func(t time.Time) { if err != nil { c.metrics.failed.Inc() @@ -169,26 +183,32 @@ func (c *compactor) Write(dir string, blocks ...Block) (err error) { c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) - if err = os.RemoveAll(dir); err != nil { + tmp := dir + ".tmp" + + if err = os.RemoveAll(tmp); err != nil { return err } - if err = os.MkdirAll(dir, 0777); err != nil { + if err = os.MkdirAll(tmp, 0777); err != nil { return err } - chunkw, err := newChunkWriter(chunkDir(dir)) + chunkw, err := newChunkWriter(chunkDir(tmp)) if err != nil { return errors.Wrap(err, "open chunk writer") } - indexw, err := newIndexWriter(dir) + indexw, err := newIndexWriter(tmp) if err != nil { return errors.Wrap(err, "open index writer") } - if err = c.write(dir, blocks, indexw, chunkw); err != nil { + meta, err := c.populate(blocks, indexw, chunkw) + if err != nil { return errors.Wrap(err, "write compaction") } + if err = writeMetaFile(tmp, meta); err != nil { + return errors.Wrap(err, "write merged meta") + } if err = chunkw.Close(); err != nil { return errors.Wrap(err, "close chunk writer") @@ -196,16 +216,28 @@ func (c *compactor) Write(dir string, blocks ...Block) (err error) { if err = indexw.Close(); err != nil { return errors.Wrap(err, "close index writer") } + + if err := renameFile(tmp, dir); err != nil { + return errors.Wrap(err, "rename block dir") + } + for _, b := range blocks[1:] { + if err := os.RemoveAll(b.Dir()); err != nil { + return err + } + } + return nil } -func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw ChunkWriter) error { +// populate fills the index and chunk writers with new data gathered as the union +// of the provided blocks. It returns meta information for the new block. +func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { var set compactionSet for i, b := range blocks { all, err := b.Index().Postings("", "") if err != nil { - return err + return nil, err } // TODO(fabxc): find more transparent way of handling this. if hb, ok := b.(*headBlock); ok { @@ -219,7 +251,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw } set, err = newCompactionMerger(set, s) if err != nil { - return err + return nil, err } } @@ -234,7 +266,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw for set.Next() { lset, chunks := set.At() if err := chunkw.WriteChunks(chunks...); err != nil { - return err + return nil, err } indexw.AddSeries(i, lset, chunks...) @@ -255,7 +287,7 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw i++ } if set.Err() != nil { - return set.Err() + return nil, set.Err() } s := make([]string, 0, 256) @@ -266,13 +298,13 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw s = append(s, x) } if err := indexw.WriteLabelIndex([]string{n}, s); err != nil { - return err + return nil, err } } for t := range postings.m { if err := indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil { - return err + return nil, err } } // Write a postings list containing all series. @@ -281,10 +313,10 @@ func (c *compactor) write(dir string, blocks []Block, indexw IndexWriter, chunkw all[i] = uint32(i) } if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { - return err + return nil, err } - return writeMetaFile(dir, &meta) + return &meta, nil } type compactionSet interface { diff --git a/db.go b/db.go index 463433bc8..78d226833 100644 --- a/db.go +++ b/db.go @@ -172,7 +172,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db donec: make(chan struct{}), stopc: make(chan struct{}), } - db.compactor = newCompactor(dir, r, &compactorOptions{ + db.compactor = newCompactor(r, &compactorOptions{ maxBlockRange: opts.MaxBlockDuration, }) @@ -245,7 +245,7 @@ func (db *DB) compact() error { // Check for compactions of multiple blocks. for { - plans, err := db.compactor.Plan() + plans, err := db.compactor.Plan(db.dir) if err != nil { return errors.Wrap(err, "plan compaction") } @@ -363,7 +363,7 @@ func (db *DB) reloadBlocks() error { for seq, b := range db.seqBlocks { if _, ok := seqBlocks[seq]; !ok { if err := b.Close(); err != nil { - return err + return errors.Wrapf(err, "closing removed block %d", b.Meta().Sequence) } } } diff --git a/head.go b/head.go index 4ddd1957c..969bf3416 100644 --- a/head.go +++ b/head.go @@ -149,8 +149,11 @@ func (h *headBlock) Close() error { return err } // Check whether the head block still exists in the underlying dir - // or has already been replaced with a compacted version + // or has already been replaced with a compacted version or removed. meta, err := readMetaFile(h.dir) + if os.IsNotExist(err) { + return nil + } if err != nil { return err }