diff --git a/block.go b/block.go index 1351efae7..c4880c258 100644 --- a/block.go +++ b/block.go @@ -85,17 +85,27 @@ type BlockMeta struct { MaxTime int64 `json:"maxTime"` // Stats about the contents of the block. - Stats struct { - NumSamples uint64 `json:"numSamples,omitempty"` - NumSeries uint64 `json:"numSeries,omitempty"` - NumChunks uint64 `json:"numChunks,omitempty"` - NumTombstones uint64 `json:"numTombstones,omitempty"` - } `json:"stats,omitempty"` + Stats BlockStats `json:"stats,omitempty"` // Information on compactions the block was created from. - Compaction struct { - Generation int `json:"generation"` - } `json:"compaction"` + Compaction BlockMetaCompaction `json:"compaction"` +} + +// BlockStats contains stats about contents of a block. +type BlockStats struct { + NumSamples uint64 `json:"numSamples,omitempty"` + NumSeries uint64 `json:"numSeries,omitempty"` + NumChunks uint64 `json:"numChunks,omitempty"` + NumTombstones uint64 `json:"numTombstones,omitempty"` +} + +// BlockMetaCompaction holds information about compactions a block went through. +type BlockMetaCompaction struct { + // Maximum number of compaction cycles any source block has + // gone through. + Generation int `json:"generation"` + // ULIDs of all source head blocks that went into the block. + Sources []ulid.ULID `json:"sources,omitempty"` } const ( @@ -144,7 +154,7 @@ func writeMetaFile(dir string, meta *BlockMeta) error { var merr MultiError if merr.Add(enc.Encode(&blockMeta{Version: 1, BlockMeta: meta})); merr.Err() != nil { merr.Add(f.Close()) - return merr + return merr.Err() } if err := f.Close(); err != nil { return err diff --git a/compact.go b/compact.go index ee54bc9ec..2250c80b3 100644 --- a/compact.go +++ b/compact.go @@ -166,17 +166,35 @@ func (c *compactor) match(dirs []dirMeta) bool { return uint64(dirs[len(dirs)-1].meta.MaxTime-dirs[0].meta.MinTime) <= c.opts.maxBlockRange } -func mergeBlockMetas(blocks ...Block) (res BlockMeta) { - m0 := blocks[0].Meta() +func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) { + res.MinTime = blocks[0].MinTime + res.MaxTime = blocks[len(blocks)-1].MaxTime - res.MinTime = m0.MinTime - res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime - - res.Compaction.Generation = m0.Compaction.Generation + 1 + sources := map[ulid.ULID]struct{}{} for _, b := range blocks { - res.Stats.NumSamples += b.Meta().Stats.NumSamples + res.Stats.NumSamples += b.Stats.NumSamples + + if b.Compaction.Generation > res.Compaction.Generation { + res.Compaction.Generation = b.Compaction.Generation + } + for _, s := range b.Compaction.Sources { + sources[s] = struct{}{} + } + // If it's an in memory block, its ULID goes into the sources. + if b.Compaction.Generation == 0 { + sources[b.ULID] = struct{}{} + } } + res.Compaction.Generation++ + + for s := range sources { + res.Compaction.Sources = append(res.Compaction.Sources, s) + } + sort.Slice(res.Compaction.Sources, func(i, j int) bool { + return res.Compaction.Sources[i].Compare(res.Compaction.Sources[j]) < 0 + }) + return res } @@ -293,6 +311,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { // of the provided blocks. It returns meta information for the new block. func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { var set compactionSet + var metas []BlockMeta for i, b := range blocks { all, err := b.Index().Postings("", "") @@ -309,6 +328,7 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri if err != nil { return nil, err } + metas = append(metas, b.Meta()) } // We fully rebuild the postings list index from merged series. @@ -316,7 +336,7 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri postings = &memPostings{m: make(map[term][]uint32, 512)} values = map[string]stringset{} i = uint32(0) - meta = mergeBlockMetas(blocks...) + meta = compactBlockMetas(metas...) ) for set.Next() {