mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Misc compaction fixes
This commit is contained in:
parent
b281e4e39b
commit
db5c88ea9a
|
@ -92,9 +92,9 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
|
||||||
|
|
||||||
st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{
|
st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{
|
||||||
WALFlushInterval: 200 * time.Millisecond,
|
WALFlushInterval: 200 * time.Millisecond,
|
||||||
RetentionDuration: 1 * 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||||
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
||||||
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
MaxBlockDuration: 27 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||||
AppendableBlocks: 2,
|
AppendableBlocks: 2,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -62,6 +62,7 @@ func newCompactor(r prometheus.Registerer, opts *compactorOptions) *compactor {
|
||||||
}
|
}
|
||||||
|
|
||||||
type compactionInfo struct {
|
type compactionInfo struct {
|
||||||
|
seq int
|
||||||
generation int
|
generation int
|
||||||
mint, maxt int64
|
mint, maxt int64
|
||||||
}
|
}
|
||||||
|
@ -105,8 +106,8 @@ func (c *compactor) pick(bs []compactionInfo) (i, j int, ok bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then we care about compacting multiple blocks, starting with the oldest.
|
// Then we care about compacting multiple blocks, starting with the oldest.
|
||||||
for i := 0; i < len(bs)-compactionBlocksLen; i += compactionBlocksLen {
|
for i := 0; i < len(bs)-compactionBlocksLen+1; i += compactionBlocksLen {
|
||||||
if c.match(bs[i : i+2]) {
|
if c.match(bs[i : i+3]) {
|
||||||
return i, i + compactionBlocksLen, true
|
return i, i + compactionBlocksLen, true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -118,14 +119,10 @@ func (c *compactor) match(bs []compactionInfo) bool {
|
||||||
g := bs[0].generation
|
g := bs[0].generation
|
||||||
|
|
||||||
for _, b := range bs {
|
for _, b := range bs {
|
||||||
if b.generation == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if b.generation != g {
|
if b.generation != g {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange
|
return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
9
db.go
9
db.go
|
@ -192,6 +192,7 @@ func (db *DB) run() {
|
||||||
case <-db.compactc:
|
case <-db.compactc:
|
||||||
db.metrics.compactionsTriggered.Inc()
|
db.metrics.compactionsTriggered.Inc()
|
||||||
|
|
||||||
|
var seqs []int
|
||||||
var infos []compactionInfo
|
var infos []compactionInfo
|
||||||
for _, b := range db.compactable() {
|
for _, b := range db.compactable() {
|
||||||
m := b.Meta()
|
m := b.Meta()
|
||||||
|
@ -200,17 +201,16 @@ func (db *DB) run() {
|
||||||
generation: m.Compaction.Generation,
|
generation: m.Compaction.Generation,
|
||||||
mint: m.MinTime,
|
mint: m.MinTime,
|
||||||
maxt: m.MaxTime,
|
maxt: m.MaxTime,
|
||||||
|
seq: m.Sequence,
|
||||||
})
|
})
|
||||||
|
seqs = append(seqs, m.Sequence)
|
||||||
}
|
}
|
||||||
|
|
||||||
i, j, ok := db.compactor.pick(infos)
|
i, j, ok := db.compactor.pick(infos)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
db.logger.Log("msg", "picked", "i", i, "j", j)
|
db.logger.Log("msg", "compact", "seqs", fmt.Sprintf("%v", seqs[i:j]))
|
||||||
for k := i; k < j; k++ {
|
|
||||||
db.logger.Log("k", k, "generation", infos[k].generation)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := db.compact(i, j); err != nil {
|
if err := db.compact(i, j); err != nil {
|
||||||
db.logger.Log("msg", "compaction failed", "err", err)
|
db.logger.Log("msg", "compaction failed", "err", err)
|
||||||
|
@ -301,6 +301,7 @@ func (db *DB) compact(i, j int) error {
|
||||||
db.persisted = append(db.persisted, pb)
|
db.persisted = append(db.persisted, pb)
|
||||||
|
|
||||||
for _, b := range blocks[1:] {
|
for _, b := range blocks[1:] {
|
||||||
|
db.logger.Log("msg", "remove old dir", "dir", b.Dir())
|
||||||
if err := os.RemoveAll(b.Dir()); err != nil {
|
if err := os.RemoveAll(b.Dir()); err != nil {
|
||||||
return errors.Wrap(err, "removing old block")
|
return errors.Wrap(err, "removing old block")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue