diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 0278c14c5..c7e85204b 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -130,7 +130,7 @@ func (b *writeBenchmark) run() { dur := measureTime("ingestScrapes", func() { b.startProfiling() - total, err = b.ingestScrapes(metrics, 2000) + total, err = b.ingestScrapes(metrics, 3000) if err != nil { exitWithError(err) } diff --git a/db.go b/db.go index aca7249df..3622a77f5 100644 --- a/db.go +++ b/db.go @@ -378,36 +378,9 @@ func (db *DB) compact() (changes bool, err error) { return changes, errors.Wrapf(err, "compact %s", plan) } changes = true - - // close blocks in plan so that we can remove files. - var blocks []*Block - db.mtx.Lock() - oldBlocks := db.blocks - for _, b := range oldBlocks { - keep := true - for _, pd := range plan { - if pd == b.Dir() { - keep = false - break - } - } - if keep { - blocks = append(blocks, b) - } else { - b.Close() - } - } - db.blocks = blocks - db.mtx.Unlock() - - for _, pd := range plan { - if err := os.RemoveAll(pd); err != nil { - return changes, errors.Wrap(err, "delete compacted block") - } - } runtime.GC() - if err := db.reload(); err != nil { + if err := db.reload(plan...); err != nil { return changes, errors.Wrap(err, "reload blocks") } runtime.GC() @@ -461,7 +434,18 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { return nil, false } -func (db *DB) reload() (err error) { +func stringsContain(set []string, elem string) bool { + for _, e := range set { + if elem == e { + return true + } + } + return false +} + +// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes +// a list of block directories which should be deleted during reload. +func (db *DB) reload(deleteable ...string) (err error) { defer func() { if err != nil { db.metrics.reloadsFailed.Inc() @@ -483,6 +467,10 @@ func (db *DB) reload() (err error) { if err != nil { return errors.Wrapf(err, "read meta information %s", dir) } + // If the block is pending for deletion, don't add it to the new block set. + if stringsContain(deleteable, dir) { + continue + } b, ok := db.getBlock(meta.ULID) if !ok { @@ -508,8 +496,14 @@ func (db *DB) reload() (err error) { db.mtx.Unlock() for _, b := range oldBlocks { - if _, ok := exist[b.Meta().ULID]; !ok { - b.Close() + if _, ok := exist[b.Meta().ULID]; ok { + continue + } + if err := b.Close(); err != nil { + level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + } + if err := os.RemoveAll(b.Dir()); err != nil { + level.Warn(db.logger).Log("msg", "deleting block failed", "err", err) } }