Fix compacting disable/enable

Enabling and disabling compaction no longer blocks are potentially
causes panics.
This commit is contained in:
Fabian Reinartz 2017-07-14 10:06:07 +02:00
parent 3065be97d8
commit 74f67e8271

43
db.go
View file

@ -100,7 +100,6 @@ type DB struct {
opts *Options opts *Options
// Mutex for that must be held when modifying the general block layout. // Mutex for that must be held when modifying the general block layout.
// cmtx must be held before acquiring it.
mtx sync.RWMutex mtx sync.RWMutex
blocks []Block blocks []Block
@ -117,8 +116,8 @@ type DB struct {
stopc chan struct{} stopc chan struct{}
// cmtx is used to control compactions and deletions. // cmtx is used to control compactions and deletions.
cmtx sync.Mutex cmtx sync.Mutex
compacting bool compactionsEnabled bool
} }
type dbMetrics struct { type dbMetrics struct {
@ -197,13 +196,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
} }
db = &DB{ db = &DB{
dir: dir, dir: dir,
logger: l, logger: l,
opts: opts, opts: opts,
compactc: make(chan struct{}, 1), compactc: make(chan struct{}, 1),
donec: make(chan struct{}), donec: make(chan struct{}),
stopc: make(chan struct{}), stopc: make(chan struct{}),
compacting: true, compactionsEnabled: true,
} }
db.metrics = newDBMetrics(db, r) db.metrics = newDBMetrics(db, r)
@ -371,6 +370,10 @@ func (db *DB) compact() (changes bool, err error) {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
if !db.compactionsEnabled {
return false, nil
}
// Check whether we have pending head blocks that are ready to be persisted. // Check whether we have pending head blocks that are ready to be persisted.
// They have the highest priority. // They have the highest priority.
for _, h := range db.completedHeads() { for _, h := range db.completedHeads() {
@ -579,20 +582,20 @@ func (db *DB) Close() error {
// DisableCompactions disables compactions. // DisableCompactions disables compactions.
func (db *DB) DisableCompactions() { func (db *DB) DisableCompactions() {
if db.compacting { db.cmtx.Lock()
db.cmtx.Lock() defer db.cmtx.Unlock()
db.compacting = false
db.logger.Log("msg", "compactions disabled") db.compactionsEnabled = false
} db.logger.Log("msg", "compactions disabled")
} }
// EnableCompactions enables compactions. // EnableCompactions enables compactions.
func (db *DB) EnableCompactions() { func (db *DB) EnableCompactions() {
if !db.compacting { db.cmtx.Lock()
db.cmtx.Unlock() defer db.cmtx.Unlock()
db.compacting = true
db.logger.Log("msg", "compactions enabled") db.compactionsEnabled = true
} db.logger.Log("msg", "compactions enabled")
} }
// Snapshot writes the current data to the directory. // Snapshot writes the current data to the directory.