mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
db: delete old blocks during reload
Windows requires blocks to be closed before deleting their directories. This adds a set of deleteable blocks to reload(), which then deletes them without causing disruption to querying.
This commit is contained in:
parent
296faccf2f
commit
d226411a2a
|
@ -130,7 +130,7 @@ func (b *writeBenchmark) run() {
|
|||
|
||||
dur := measureTime("ingestScrapes", func() {
|
||||
b.startProfiling()
|
||||
total, err = b.ingestScrapes(metrics, 2000)
|
||||
total, err = b.ingestScrapes(metrics, 3000)
|
||||
if err != nil {
|
||||
exitWithError(err)
|
||||
}
|
||||
|
|
56
db.go
56
db.go
|
@ -378,36 +378,9 @@ func (db *DB) compact() (changes bool, err error) {
|
|||
return changes, errors.Wrapf(err, "compact %s", plan)
|
||||
}
|
||||
changes = true
|
||||
|
||||
// close blocks in plan so that we can remove files.
|
||||
var blocks []*Block
|
||||
db.mtx.Lock()
|
||||
oldBlocks := db.blocks
|
||||
for _, b := range oldBlocks {
|
||||
keep := true
|
||||
for _, pd := range plan {
|
||||
if pd == b.Dir() {
|
||||
keep = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if keep {
|
||||
blocks = append(blocks, b)
|
||||
} else {
|
||||
b.Close()
|
||||
}
|
||||
}
|
||||
db.blocks = blocks
|
||||
db.mtx.Unlock()
|
||||
|
||||
for _, pd := range plan {
|
||||
if err := os.RemoveAll(pd); err != nil {
|
||||
return changes, errors.Wrap(err, "delete compacted block")
|
||||
}
|
||||
}
|
||||
runtime.GC()
|
||||
|
||||
if err := db.reload(); err != nil {
|
||||
if err := db.reload(plan...); err != nil {
|
||||
return changes, errors.Wrap(err, "reload blocks")
|
||||
}
|
||||
runtime.GC()
|
||||
|
@ -461,7 +434,18 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
|
|||
return nil, false
|
||||
}
|
||||
|
||||
func (db *DB) reload() (err error) {
|
||||
func stringsContain(set []string, elem string) bool {
|
||||
for _, e := range set {
|
||||
if elem == e {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes
|
||||
// a list of block directories which should be deleted during reload.
|
||||
func (db *DB) reload(deleteable ...string) (err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
db.metrics.reloadsFailed.Inc()
|
||||
|
@ -483,6 +467,10 @@ func (db *DB) reload() (err error) {
|
|||
if err != nil {
|
||||
return errors.Wrapf(err, "read meta information %s", dir)
|
||||
}
|
||||
// If the block is pending for deletion, don't add it to the new block set.
|
||||
if stringsContain(deleteable, dir) {
|
||||
continue
|
||||
}
|
||||
|
||||
b, ok := db.getBlock(meta.ULID)
|
||||
if !ok {
|
||||
|
@ -508,8 +496,14 @@ func (db *DB) reload() (err error) {
|
|||
db.mtx.Unlock()
|
||||
|
||||
for _, b := range oldBlocks {
|
||||
if _, ok := exist[b.Meta().ULID]; !ok {
|
||||
b.Close()
|
||||
if _, ok := exist[b.Meta().ULID]; ok {
|
||||
continue
|
||||
}
|
||||
if err := b.Close(); err != nil {
|
||||
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
|
||||
}
|
||||
if err := os.RemoveAll(b.Dir()); err != nil {
|
||||
level.Warn(db.logger).Log("msg", "deleting block failed", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue