Allow mmap to run during head compaction

Signed-off-by: alanprot <alanprot@gmail.com>
This commit is contained in:
alanprot 2024-08-06 21:34:14 -07:00
parent b09eaf8acd
commit e16b7b3449

View file

@ -1057,49 +1057,67 @@ func (db *DB) Dir() string {
func (db *DB) run(ctx context.Context) { func (db *DB) run(ctx context.Context) {
defer close(db.donec) defer close(db.donec)
wg := sync.WaitGroup{}
wg.Add(1)
backoff := time.Duration(0) go func() {
defer wg.Done()
for { for {
select { select {
case <-db.stopc: case <-db.stopc:
return return
case <-time.After(backoff): case <-time.After(1 * time.Minute):
} // We attempt mmapping of head chunks regularly.
db.head.mmapHeadChunks()
select { }
case <-time.After(1 * time.Minute): }
db.cmtx.Lock() }()
if err := db.reloadBlocks(); err != nil {
level.Error(db.logger).Log("msg", "reloadBlocks", "err", err) wg.Add(1)
go func() {
defer wg.Done()
backoff := time.Duration(0)
for {
select {
case <-db.stopc:
return
case <-time.After(backoff):
} }
db.cmtx.Unlock()
select { select {
case db.compactc <- struct{}{}: case <-time.After(1 * time.Minute):
default: db.cmtx.Lock()
} if err := db.reloadBlocks(); err != nil {
// We attempt mmapping of head chunks regularly. level.Error(db.logger).Log("msg", "reloadBlocks", "err", err)
db.head.mmapHeadChunks()
case <-db.compactc:
db.metrics.compactionsTriggered.Inc()
db.autoCompactMtx.Lock()
if db.autoCompact {
if err := db.Compact(ctx); err != nil {
level.Error(db.logger).Log("msg", "compaction failed", "err", err)
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
} else {
backoff = 0
} }
} else { db.cmtx.Unlock()
db.metrics.compactionsSkipped.Inc()
select {
case db.compactc <- struct{}{}:
default:
}
case <-db.compactc:
db.metrics.compactionsTriggered.Inc()
db.autoCompactMtx.Lock()
if db.autoCompact {
if err := db.Compact(ctx); err != nil {
level.Error(db.logger).Log("msg", "compaction failed", "err", err)
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
} else {
backoff = 0
}
} else {
db.metrics.compactionsSkipped.Inc()
}
db.autoCompactMtx.Unlock()
case <-db.stopc:
return
} }
db.autoCompactMtx.Unlock()
case <-db.stopc:
return
} }
} }()
wg.Wait()
} }
// Appender opens a new appender against the database. // Appender opens a new appender against the database.