Wait until storage is drained before closing the underlying leveldb.

This commit is contained in:
Julius Volz 2013-03-21 17:53:57 +01:00
parent becc278eb6
commit 0be0aa59c2

View file

@ -33,7 +33,7 @@ type tieredStorage struct {
appendToMemoryQueue chan model.Sample
diskFrontier *diskFrontier
diskStorage *LevelDBMetricPersistence
draining chan bool
draining chan chan bool
flushMemoryInterval time.Duration
memoryArena memorySeriesStorage
memoryTTL time.Duration
@ -74,7 +74,7 @@ func NewTieredStorage(appendToMemoryQueueDepth, appendToDiskQueueDepth, viewQueu
appendToDiskQueue: make(chan model.Sample, appendToDiskQueueDepth),
appendToMemoryQueue: make(chan model.Sample, appendToMemoryQueueDepth),
diskStorage: diskStorage,
draining: make(chan bool),
draining: make(chan chan bool),
flushMemoryInterval: flushMemoryInterval,
memoryArena: NewMemorySeriesStorage(),
memoryTTL: memoryTTL,
@ -94,9 +94,11 @@ func (t *tieredStorage) AppendSample(s model.Sample) (err error) {
}
func (t *tieredStorage) Drain() {
drainingDone := make(chan bool)
if len(t.draining) == 0 {
t.draining <- true
t.draining <- drainingDone
}
<-drainingDone
}
func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Duration) (view View, err error) {
@ -162,8 +164,9 @@ func (t *tieredStorage) Serve() {
t.flushMemory()
case viewRequest := <-t.viewQueue:
t.renderView(viewRequest)
case <-t.draining:
case drainingDone := <-t.draining:
t.flush()
drainingDone <- true
break
}
}