mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #108 from Gouthamve/compact-large-dels-2
Recompact blocks with large number of tombstones
This commit is contained in:
commit
3c22488157
|
@ -43,6 +43,7 @@ type Chunk interface {
|
||||||
Encoding() Encoding
|
Encoding() Encoding
|
||||||
Appender() (Appender, error)
|
Appender() (Appender, error)
|
||||||
Iterator() Iterator
|
Iterator() Iterator
|
||||||
|
NumSamples() int
|
||||||
}
|
}
|
||||||
|
|
||||||
// FromData returns a chunk from a byte slice of chunk data.
|
// FromData returns a chunk from a byte slice of chunk data.
|
||||||
|
|
|
@ -72,6 +72,11 @@ func (c *XORChunk) Bytes() []byte {
|
||||||
return c.b.bytes()
|
return c.b.bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NumSamples returns the number of samples in the chunk.
|
||||||
|
func (c *XORChunk) NumSamples() int {
|
||||||
|
return int(binary.BigEndian.Uint16(c.Bytes()))
|
||||||
|
}
|
||||||
|
|
||||||
// Appender implements the Chunk interface.
|
// Appender implements the Chunk interface.
|
||||||
func (c *XORChunk) Appender() (Appender, error) {
|
func (c *XORChunk) Appender() (Appender, error) {
|
||||||
it := c.iterator()
|
it := c.iterator()
|
||||||
|
|
29
compact.go
29
compact.go
|
@ -160,7 +160,24 @@ func (c *compactor) Plan() ([][]string, error) {
|
||||||
return [][]string{res}
|
return [][]string{res}
|
||||||
}
|
}
|
||||||
|
|
||||||
return sliceDirs(c.selectDirs(dms)), nil
|
planDirs := sliceDirs(c.selectDirs(dms))
|
||||||
|
if len(dirs) > 1 {
|
||||||
|
return planDirs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compact any blocks that have >5% tombstones.
|
||||||
|
for i := len(dms) - 1; i >= 0; i-- {
|
||||||
|
meta := dms[i].meta
|
||||||
|
if meta.MaxTime-meta.MinTime < c.opts.blockRanges[len(c.opts.blockRanges)/2] {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if meta.Stats.NumSeries/meta.Stats.NumTombstones <= 20 { // 5%
|
||||||
|
return [][]string{{dms[i].dir}}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactor) selectDirs(ds []dirMeta) []dirMeta {
|
func (c *compactor) selectDirs(ds []dirMeta) []dirMeta {
|
||||||
|
@ -238,8 +255,6 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
|
||||||
sources := map[ulid.ULID]struct{}{}
|
sources := map[ulid.ULID]struct{}{}
|
||||||
|
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
res.Stats.NumSamples += b.Stats.NumSamples
|
|
||||||
|
|
||||||
if b.Compaction.Generation > res.Compaction.Generation {
|
if b.Compaction.Generation > res.Compaction.Generation {
|
||||||
res.Compaction.Generation = b.Compaction.Generation
|
res.Compaction.Generation = b.Compaction.Generation
|
||||||
}
|
}
|
||||||
|
@ -410,6 +425,11 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
|
||||||
for set.Next() {
|
for set.Next() {
|
||||||
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
|
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
|
||||||
|
|
||||||
|
// Skip the series with all deleted chunks.
|
||||||
|
if len(chks) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if len(dranges) > 0 {
|
if len(dranges) > 0 {
|
||||||
// Re-encode the chunk to not have deleted values.
|
// Re-encode the chunk to not have deleted values.
|
||||||
for _, chk := range chks {
|
for _, chk := range chks {
|
||||||
|
@ -439,6 +459,9 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
|
||||||
|
|
||||||
meta.Stats.NumChunks += uint64(len(chks))
|
meta.Stats.NumChunks += uint64(len(chks))
|
||||||
meta.Stats.NumSeries++
|
meta.Stats.NumSeries++
|
||||||
|
for _, chk := range chks {
|
||||||
|
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
|
||||||
|
}
|
||||||
|
|
||||||
for _, l := range lset {
|
for _, l := range lset {
|
||||||
valset, ok := values[l.Name]
|
valset, ok := values[l.Name]
|
||||||
|
|
Loading…
Reference in a new issue