mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 14:27:27 -08:00
Recompact blocks with large number of tombstones
Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
parent
7b9c536883
commit
1263a68875
30
compact.go
30
compact.go
|
@ -14,6 +14,7 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
|
@ -160,7 +161,24 @@ func (c *compactor) Plan() ([][]string, error) {
|
|||
return [][]string{res}
|
||||
}
|
||||
|
||||
return sliceDirs(c.selectDirs(dms)), nil
|
||||
planDirs := sliceDirs(c.selectDirs(dms))
|
||||
if len(dirs) > 1 {
|
||||
return planDirs, nil
|
||||
}
|
||||
|
||||
// Compact any blocks that have >5% tombstones.
|
||||
for i := len(dms) - 1; i >= 0; i-- {
|
||||
meta := dms[i].meta
|
||||
if meta.MaxTime-meta.MinTime < c.opts.blockRanges[len(c.opts.blockRanges)/2] {
|
||||
break
|
||||
}
|
||||
|
||||
if meta.Stats.NumSeries/meta.Stats.NumTombstones <= 20 { // 5%
|
||||
return [][]string{{dms[i].dir}}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *compactor) selectDirs(ds []dirMeta) []dirMeta {
|
||||
|
@ -238,8 +256,6 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
|
|||
sources := map[ulid.ULID]struct{}{}
|
||||
|
||||
for _, b := range blocks {
|
||||
res.Stats.NumSamples += b.Stats.NumSamples
|
||||
|
||||
if b.Compaction.Generation > res.Compaction.Generation {
|
||||
res.Compaction.Generation = b.Compaction.Generation
|
||||
}
|
||||
|
@ -410,6 +426,11 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
|
|||
for set.Next() {
|
||||
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
|
||||
|
||||
// Skip the series with all deleted chunks.
|
||||
if len(chks) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(dranges) > 0 {
|
||||
// Re-encode the chunk to not have deleted values.
|
||||
for _, chk := range chks {
|
||||
|
@ -439,6 +460,9 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
|
|||
|
||||
meta.Stats.NumChunks += uint64(len(chks))
|
||||
meta.Stats.NumSeries++
|
||||
for _, chk := range chks {
|
||||
meta.Stats.NumSamples += uint64(binary.BigEndian.Uint16(chk.Chunk.Bytes()))
|
||||
}
|
||||
|
||||
for _, l := range lset {
|
||||
valset, ok := values[l.Name]
|
||||
|
|
Loading…
Reference in a new issue