Compact based on compaction generation

This commit is contained in:
Fabian Reinartz 2017-01-19 19:45:52 +01:00
parent 472c618c39
commit 67d185ceb9
4 changed files with 35 additions and 13 deletions

View file

@ -46,6 +46,10 @@ type BlockMeta struct {
NumSeries uint64 `json:"numSeries,omitempty"` NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"`
} `json:"stats,omitempty"` } `json:"stats,omitempty"`
Compaction struct {
Generation int `json:"generation"`
} `json:"compaction"`
} }
const ( const (
@ -108,9 +112,11 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
} }
func newPersistedBlock(dir string) (*persistedBlock, error) { func newPersistedBlock(dir string) (*persistedBlock, error) {
// TODO(fabxc): validate match of name and stats time, validate magic. meta, err := readMetaFile(dir)
if err != nil {
return nil, err
}
// mmap files belonging to the block.
chunksf, err := openMmapFile(chunksFileName(dir)) chunksf, err := openMmapFile(chunksFileName(dir))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "open chunk file") return nil, errors.Wrap(err, "open chunk file")
@ -129,11 +135,6 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
return nil, errors.Wrap(err, "create index reader") return nil, errors.Wrap(err, "create index reader")
} }
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
}
pb := &persistedBlock{ pb := &persistedBlock{
dir: dir, dir: dir,
meta: *meta, meta: *meta,
@ -142,7 +143,6 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
chunkr: sr, chunkr: sr,
indexr: ir, indexr: ir,
} }
return pb, nil return pb, nil
} }

View file

@ -64,29 +64,45 @@ func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor {
// pick returns a range [i, j] in the blocks that are suitable to be compacted // pick returns a range [i, j] in the blocks that are suitable to be compacted
// into a single block at position i. // into a single block at position i.
func (c *compactor) pick(bs []Block) (i, j int, ok bool) { func (c *compactor) pick(bs []Block) (i, j int, ok bool) {
last := len(bs) - 1 last := len(bs) - 1
if len(bs) == 0 { if len(bs) == 0 {
return 0, 0, false return 0, 0, false
} }
// Make sure we always compact the last block if unpersisted. // Make sure we always compact the last block if unpersisted.
if !bs[last].Persisted() { if bs[last].Meta().Compaction.Generation == 0 {
if len(bs) >= 3 && compactionMatch(bs[last-2:last+1]) { if len(bs) >= 3 && compactionMatch(bs[last-2:last+1]) {
return last - 2, last, true return last - 2, last, true
} }
return last, last, true return last, last, true
} }
for i := 0; i+2 < len(bs); i += 3 { for i := len(bs); i-3 >= 0; i -= 3 {
tpl := bs[i : i+3] tpl := bs[i-3 : i]
if compactionMatch(tpl) { if compactionMatch(tpl) {
return i, i + 2, true return i - 3, i - 1, true
} }
} }
return 0, 0, false return 0, 0, false
} }
func compactionMatch(blocks []Block) bool { func compactionMatch(blocks []Block) bool {
g := blocks[0].Meta().Compaction.Generation
if g >= 5 {
return false
}
for _, b := range blocks[1:] {
if b.Meta().Compaction.Generation == 0 {
continue
}
if b.Meta().Compaction.Generation != g {
return false
}
}
return true
// TODO(fabxc): check whether combined size is below maxCompactionSize. // TODO(fabxc): check whether combined size is below maxCompactionSize.
// Apply maximum time range? or number of series? might already be covered by size implicitly. // Apply maximum time range? or number of series? might already be covered by size implicitly.
@ -112,6 +128,7 @@ func compactionMatch(blocks []Block) bool {
func mergeBlockMetas(blocks ...Block) (res BlockMeta) { func mergeBlockMetas(blocks ...Block) (res BlockMeta) {
res.MinTime = blocks[0].Meta().MinTime res.MinTime = blocks[0].Meta().MinTime
res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime
res.Compaction.Generation = blocks[0].Meta().Compaction.Generation + 1
for _, b := range blocks { for _, b := range blocks {
res.Stats.NumSamples += b.Meta().Stats.NumSamples res.Stats.NumSamples += b.Meta().Stats.NumSamples

2
db.go
View file

@ -222,6 +222,8 @@ func (db *DB) blocks() (bs []Block) {
return bs return bs
} }
// compact block in range [i, j] into a temporary directory and atomically
// swap the blocks out on successful completion.
func (db *DB) compact(i, j int) error { func (db *DB) compact(i, j int) error {
if j < i { if j < i {
return errors.New("invalid compaction block range") return errors.New("invalid compaction block range")

View file

@ -24,6 +24,9 @@ type seriesReader struct {
} }
func newSeriesReader(b []byte) (*seriesReader, error) { func newSeriesReader(b []byte) (*seriesReader, error) {
if len(b) < 4 {
return nil, errors.Wrap(errInvalidSize, "index header")
}
// Verify magic number. // Verify magic number.
if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries { if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries {
return nil, fmt.Errorf("invalid magic number %x", m) return nil, fmt.Errorf("invalid magic number %x", m)
@ -91,7 +94,7 @@ var (
) )
func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) { func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) {
if len(b) < 16 { if len(b) < 4 {
return nil, errors.Wrap(errInvalidSize, "index header") return nil, errors.Wrap(errInvalidSize, "index header")
} }
r := &indexReader{ r := &indexReader{