diff --git a/tsdb/db.go b/tsdb/db.go index a5b3a5e602..feaf4483ea 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1057,49 +1057,67 @@ func (db *DB) Dir() string { func (db *DB) run(ctx context.Context) { defer close(db.donec) + wg := sync.WaitGroup{} + wg.Add(1) - backoff := time.Duration(0) - - for { - select { - case <-db.stopc: - return - case <-time.After(backoff): - } - - select { - case <-time.After(1 * time.Minute): - db.cmtx.Lock() - if err := db.reloadBlocks(); err != nil { - level.Error(db.logger).Log("msg", "reloadBlocks", "err", err) + go func() { + defer wg.Done() + for { + select { + case <-db.stopc: + return + case <-time.After(1 * time.Minute): + // We attempt mmapping of head chunks regularly. + db.head.mmapHeadChunks() + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + backoff := time.Duration(0) + for { + select { + case <-db.stopc: + return + case <-time.After(backoff): } - db.cmtx.Unlock() select { - case db.compactc <- struct{}{}: - default: - } - // We attempt mmapping of head chunks regularly. - db.head.mmapHeadChunks() - case <-db.compactc: - db.metrics.compactionsTriggered.Inc() - - db.autoCompactMtx.Lock() - if db.autoCompact { - if err := db.Compact(ctx); err != nil { - level.Error(db.logger).Log("msg", "compaction failed", "err", err) - backoff = exponential(backoff, 1*time.Second, 1*time.Minute) - } else { - backoff = 0 + case <-time.After(1 * time.Minute): + db.cmtx.Lock() + if err := db.reloadBlocks(); err != nil { + level.Error(db.logger).Log("msg", "reloadBlocks", "err", err) } - } else { - db.metrics.compactionsSkipped.Inc() + db.cmtx.Unlock() + + select { + case db.compactc <- struct{}{}: + default: + } + case <-db.compactc: + db.metrics.compactionsTriggered.Inc() + + db.autoCompactMtx.Lock() + if db.autoCompact { + if err := db.Compact(ctx); err != nil { + level.Error(db.logger).Log("msg", "compaction failed", "err", err) + backoff = exponential(backoff, 1*time.Second, 1*time.Minute) + } else { + backoff = 0 + } + } else { + db.metrics.compactionsSkipped.Inc() + } + db.autoCompactMtx.Unlock() + case <-db.stopc: + return } - db.autoCompactMtx.Unlock() - case <-db.stopc: - return } - } + }() + + wg.Wait() } // Appender opens a new appender against the database.