Code Review: Fix to-disk queue infinite growth.

We discovered a bug while manually testing this branch on a live
instance, whereby the to-disk queue was never actually dumped to
disk.
This commit is contained in:
Matt T. Proud 2013-05-22 17:31:49 +02:00
parent 285a8b701b
commit b586801830

View file

@ -56,7 +56,6 @@ type TieredStorage struct {
// BUG(matt): This introduces a Law of Demeter violation. Ugh.
DiskStorage *LevelDBMetricPersistence
// BUG(matt): Replace this with a map?
appendToDiskQueue chan model.Samples
diskFrontier *diskFrontier
@ -183,7 +182,7 @@ func (t *TieredStorage) Serve() {
for {
select {
case <-flushMemoryTicker.C:
t.flushMemory()
t.flushMemory(t.memoryTTL)
case viewRequest := <-t.viewQueue:
t.renderView(viewRequest)
case drainingDone := <-t.draining:
@ -203,14 +202,16 @@ func (t *TieredStorage) reportQueues() {
}
func (t *TieredStorage) Flush() {
t.flushMemory()
t.flushMemory(0)
}
func (t *TieredStorage) flushMemory() {
func (t *TieredStorage) flushMemory(ttl time.Duration) {
t.memoryArena.RLock()
defer t.memoryArena.RUnlock()
cutOff := time.Now().Add(-1 * t.memoryTTL)
cutOff := time.Now().Add(-1 * ttl)
log.Println("Flushing...")
for _, stream := range t.memoryArena.fingerprintToSeries {
finder := func(i int) bool {
@ -237,6 +238,20 @@ func (t *TieredStorage) flushMemory() {
stream.values = toKeep
stream.Unlock()
}
queueLength := len(t.appendToDiskQueue)
if queueLength > 0 {
log.Printf("Writing %d samples ...", queueLength)
samples := model.Samples{}
for i := 0; i < queueLength; i++ {
chunk := <-t.appendToDiskQueue
samples = append(samples, chunk...)
}
t.DiskStorage.AppendSamples(samples)
}
log.Println("Done flushing...")
}
func (t *TieredStorage) Close() {