From 12d5e6ca5a86834164528202a6fc73723adc8396 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Mon, 26 Aug 2013 19:12:43 +0200 Subject: [PATCH] Curation should not starve user-interactive ops. The background curation should be staggered to ensure that disk I/O yields to user-interactive operations in a timely manner. The lack of routine prioritization necessitates this. Change-Id: I9b498a74ccd933ffb856e06fedc167430e521d86 --- main.go | 4 ++++ storage/metric/curator.go | 12 ++++++++++++ storage/metric/tiered.go | 14 +++++++------- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/main.go b/main.go index 902580f9f..bd8779c54 100644 --- a/main.go +++ b/main.go @@ -114,6 +114,8 @@ func (p *prometheus) compact(olderThan time.Duration, groupSize int) error { curator := metric.Curator{ Stop: p.stopBackgroundOperations, + + ViewQueue: p.storage.ViewQueue, } return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState) @@ -129,6 +131,8 @@ func (p *prometheus) delete(olderThan time.Duration, batchSize int) error { curator := metric.Curator{ Stop: p.stopBackgroundOperations, + + ViewQueue: p.storage.ViewQueue, } return curator.Run(olderThan, time.Now(), processor, p.storage.DiskStorage.CurationRemarks, p.storage.DiskStorage.MetricSamples, p.storage.DiskStorage.MetricHighWatermarks, p.curationState) diff --git a/storage/metric/curator.go b/storage/metric/curator.go index 3fdf2a5a3..d8f8ef5ea 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -32,6 +32,8 @@ import ( dto "github.com/prometheus/prometheus/model/generated" ) +const curationYieldPeriod = 250 * time.Millisecond + // CurationStateUpdater receives updates about the curation state. type CurationStateUpdater interface { UpdateCurationState(*CurationState) @@ -54,6 +56,8 @@ type Curator struct { // The moment a value is ingested inside of it, the curator goes into drain // mode. Stop chan bool + + ViewQueue chan viewJob } // watermarkScanner converts (dto.Fingerprint, dto.MetricHighWatermark) doubles @@ -89,6 +93,8 @@ type watermarkScanner struct { status CurationStateUpdater firstBlock, lastBlock *SampleKey + + ViewQueue chan viewJob } // run facilitates the curation lifecycle. @@ -146,6 +152,8 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, proces firstBlock: firstBlock, lastBlock: lastBlock, + + ViewQueue: c.ViewQueue, } // Right now, the ability to stop a curation is limited to the beginning of @@ -276,6 +284,10 @@ func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, waterm } func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorError) { + if len(w.ViewQueue) > 0 { + time.Sleep(curationYieldPeriod) + } + fingerprint := key.(*clientmodel.Fingerprint) if fingerprint.Less(w.firstBlock.Fingerprint) { diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 23de17022..87c3d8176 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -82,7 +82,7 @@ type TieredStorage struct { memoryTTL time.Duration flushMemoryInterval time.Duration - viewQueue chan viewJob + ViewQueue chan viewJob draining chan chan<- bool @@ -131,7 +131,7 @@ func NewTieredStorage(appendToDiskQueueDepth, viewQueueDepth uint, flushMemoryIn flushMemoryInterval: flushMemoryInterval, memoryArena: NewMemorySeriesStorage(memOptions), memoryTTL: memoryTTL, - viewQueue: make(chan viewJob, viewQueueDepth), + ViewQueue: make(chan viewJob, viewQueueDepth), memorySemaphore: make(chan bool, tieredMemorySemaphores), @@ -194,7 +194,7 @@ func (t *TieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat abortChan := make(chan bool, 1) errChan := make(chan error) queryStats.GetTimer(stats.ViewQueueTime).Start() - t.viewQueue <- viewJob{ + t.ViewQueue <- viewJob{ builder: builder, output: result, abort: abortChan, @@ -240,7 +240,7 @@ func (t *TieredStorage) Serve(started chan<- bool) { select { case <-flushMemoryTicker.C: t.flushMemory(t.memoryTTL) - case viewRequest := <-t.viewQueue: + case viewRequest := <-t.ViewQueue: viewRequest.stats.GetTimer(stats.ViewQueueTime).Stop() <-t.memorySemaphore go t.renderView(viewRequest) @@ -256,8 +256,8 @@ func (t *TieredStorage) reportQueues() { queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "occupancy"}, float64(len(t.appendToDiskQueue))) queueSizes.Set(map[string]string{"queue": "append_to_disk", "facet": "capacity"}, float64(cap(t.appendToDiskQueue))) - queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "occupancy"}, float64(len(t.viewQueue))) - queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.viewQueue))) + queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "occupancy"}, float64(len(t.ViewQueue))) + queueSizes.Set(map[string]string{"queue": "view_generation", "facet": "capacity"}, float64(cap(t.ViewQueue))) } func (t *TieredStorage) Flush() { @@ -306,7 +306,7 @@ func (t *TieredStorage) close() { // BUG(matt): There is a probability that pending items may hang here and not // get flushed. close(t.appendToDiskQueue) - close(t.viewQueue) + close(t.ViewQueue) t.wmCache.Clear() t.state = tieredStorageStopping