From 67d185ceb92f9c332d7338554d0d0d9788e09a39 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Thu, 19 Jan 2017 19:45:52 +0100 Subject: [PATCH] Compact based on compaction generation --- block.go | 16 ++++++++-------- compact.go | 25 +++++++++++++++++++++---- db.go | 2 ++ reader.go | 5 ++++- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/block.go b/block.go index 13e176718..8e9d0c2e7 100644 --- a/block.go +++ b/block.go @@ -46,6 +46,10 @@ type BlockMeta struct { NumSeries uint64 `json:"numSeries,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"` } `json:"stats,omitempty"` + + Compaction struct { + Generation int `json:"generation"` + } `json:"compaction"` } const ( @@ -108,9 +112,11 @@ func writeMetaFile(dir string, meta *BlockMeta) error { } func newPersistedBlock(dir string) (*persistedBlock, error) { - // TODO(fabxc): validate match of name and stats time, validate magic. + meta, err := readMetaFile(dir) + if err != nil { + return nil, err + } - // mmap files belonging to the block. chunksf, err := openMmapFile(chunksFileName(dir)) if err != nil { return nil, errors.Wrap(err, "open chunk file") @@ -129,11 +135,6 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { return nil, errors.Wrap(err, "create index reader") } - meta, err := readMetaFile(dir) - if err != nil { - return nil, err - } - pb := &persistedBlock{ dir: dir, meta: *meta, @@ -142,7 +143,6 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { chunkr: sr, indexr: ir, } - return pb, nil } diff --git a/compact.go b/compact.go index 948cfb12d..9267dd5ef 100644 --- a/compact.go +++ b/compact.go @@ -64,29 +64,45 @@ func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor { // pick returns a range [i, j] in the blocks that are suitable to be compacted // into a single block at position i. func (c *compactor) pick(bs []Block) (i, j int, ok bool) { + last := len(bs) - 1 if len(bs) == 0 { return 0, 0, false } // Make sure we always compact the last block if unpersisted. - if !bs[last].Persisted() { + if bs[last].Meta().Compaction.Generation == 0 { if len(bs) >= 3 && compactionMatch(bs[last-2:last+1]) { return last - 2, last, true } return last, last, true } - for i := 0; i+2 < len(bs); i += 3 { - tpl := bs[i : i+3] + for i := len(bs); i-3 >= 0; i -= 3 { + tpl := bs[i-3 : i] if compactionMatch(tpl) { - return i, i + 2, true + return i - 3, i - 1, true } } return 0, 0, false } func compactionMatch(blocks []Block) bool { + g := blocks[0].Meta().Compaction.Generation + if g >= 5 { + return false + } + + for _, b := range blocks[1:] { + if b.Meta().Compaction.Generation == 0 { + continue + } + if b.Meta().Compaction.Generation != g { + return false + } + } + return true + // TODO(fabxc): check whether combined size is below maxCompactionSize. // Apply maximum time range? or number of series? – might already be covered by size implicitly. @@ -112,6 +128,7 @@ func compactionMatch(blocks []Block) bool { func mergeBlockMetas(blocks ...Block) (res BlockMeta) { res.MinTime = blocks[0].Meta().MinTime res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime + res.Compaction.Generation = blocks[0].Meta().Compaction.Generation + 1 for _, b := range blocks { res.Stats.NumSamples += b.Meta().Stats.NumSamples diff --git a/db.go b/db.go index 8cd1a51c4..99aa16810 100644 --- a/db.go +++ b/db.go @@ -222,6 +222,8 @@ func (db *DB) blocks() (bs []Block) { return bs } +// compact block in range [i, j] into a temporary directory and atomically +// swap the blocks out on successful completion. func (db *DB) compact(i, j int) error { if j < i { return errors.New("invalid compaction block range") diff --git a/reader.go b/reader.go index e79d64337..a49fb794b 100644 --- a/reader.go +++ b/reader.go @@ -24,6 +24,9 @@ type seriesReader struct { } func newSeriesReader(b []byte) (*seriesReader, error) { + if len(b) < 4 { + return nil, errors.Wrap(errInvalidSize, "index header") + } // Verify magic number. if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries { return nil, fmt.Errorf("invalid magic number %x", m) @@ -91,7 +94,7 @@ var ( ) func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) { - if len(b) < 16 { + if len(b) < 4 { return nil, errors.Wrap(errInvalidSize, "index header") } r := &indexReader{