From 81367893fdfa731265e521dbc3645f97e4ca0539 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Mon, 14 Apr 2014 23:34:17 +0200 Subject: [PATCH] Use idiomatic one-to-many one-time signal pattern. The idiomatic pattern for signalling a one-time message to multiple consumers from a single producer is as follows: ``` c := make(chan struct{}) w := new(sync.WaitGroup) // Boilerplate to ensure synchronization. for i := 0; i < 1000; i++ { w.Add(1) go func() { defer w.Done() for { select { case _, ok := <- c: if !ok { return } default: // Do something here. } } }() } close(c) // Signal the one-to-many single-use message. w.Wait() ``` Change-Id: I755f73ba4c70a923afd342a4dea63365bdf2144b --- main.go | 11 ++++---- storage/metric/compaction_regression_test.go | 2 +- storage/metric/curator.go | 27 +++++++++----------- storage/metric/processor_test.go | 4 +-- 4 files changed, 20 insertions(+), 24 deletions(-) diff --git a/main.go b/main.go index 300bb79b58..cef288e311 100644 --- a/main.go +++ b/main.go @@ -75,7 +75,7 @@ type prometheus struct { deletionTimer *time.Ticker curationSema chan struct{} - stopBackgroundOperations chan bool + stopBackgroundOperations chan struct{} unwrittenSamples chan *extraction.Result @@ -167,8 +167,10 @@ func (p *prometheus) delete(olderThan time.Duration, batchSize int) error { } func (p *prometheus) close() { + // Disallow further curation work. close(p.curationSema) + // Stop curation timers. if p.compactionTimer != nil { p.compactionTimer.Stop() } @@ -177,9 +179,7 @@ func (p *prometheus) close() { } // Stop any currently active curation (deletion or compaction). - if len(p.stopBackgroundOperations) == 0 { - p.stopBackgroundOperations <- true - } + close(p.stopBackgroundOperations) p.ruleManager.Stop() p.targetManager.Stop() @@ -192,7 +192,6 @@ func (p *prometheus) close() { } close(p.notifications) - close(p.stopBackgroundOperations) } func main() { @@ -306,7 +305,7 @@ func main() { unwrittenSamples: unwrittenSamples, - stopBackgroundOperations: make(chan bool, 1), + stopBackgroundOperations: make(chan struct{}), ruleManager: ruleManager, targetManager: targetManager, diff --git a/storage/metric/compaction_regression_test.go b/storage/metric/compaction_regression_test.go index 58466ceeb6..d84ddc944e 100644 --- a/storage/metric/compaction_regression_test.go +++ b/storage/metric/compaction_regression_test.go @@ -140,7 +140,7 @@ func (s compactionTestScenario) test(t *testing.T) { defer processor.Close() curator := NewCurator(&CuratorOptions{ - Stop: make(chan bool), + Stop: make(chan struct{}), ViewQueue: ts.ViewQueue, }) defer curator.Close() diff --git a/storage/metric/curator.go b/storage/metric/curator.go index 787695655c..7102cb5862 100644 --- a/storage/metric/curator.go +++ b/storage/metric/curator.go @@ -52,10 +52,7 @@ type CurationState struct { // CuratorOptions bundles the parameters needed to create a Curator. type CuratorOptions struct { - // Stop functions as a channel that when empty allows the curator to operate. - // The moment a value is ingested inside of it, the curator goes into drain - // mode. - Stop chan bool + Stop chan struct{} ViewQueue chan viewJob } @@ -64,7 +61,7 @@ type CuratorOptions struct { // stored samples on-disk. This is useful to compact sparse sample values into // single sample entities to reduce keyspace load on the datastore. type Curator struct { - stop chan bool + stop chan struct{} viewQueue chan viewJob @@ -112,7 +109,7 @@ type watermarkScanner struct { stopAt clientmodel.Timestamp // stop functions as the global stop channel for all future operations. - stop chan bool + stop chan struct{} // status is the outbound channel for notifying the status page of its state. status CurationStateUpdater @@ -216,14 +213,6 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant clientmodel.Times return } -// Drain instructs the curator to stop at the next convenient moment as to not -// introduce data inconsistencies. -func (c *Curator) Drain() { - if len(c.stop) == 0 { - c.stop <- true - } -} - // Close needs to be called to cleanly dispose of a curator. func (c *Curator) Close() { c.dtoSampleKeys.Close() @@ -259,7 +248,15 @@ func (w *watermarkScanner) DecodeValue(in interface{}) (interface{}, error) { } func (w *watermarkScanner) shouldStop() bool { - return len(w.stop) != 0 + select { + case _, ok := <-w.stop: + if ok { + panic("channel should be closed only") + } + return true + default: + return false + } } func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResult) { diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go index b7639fea1a..a95ad8accb 100644 --- a/storage/metric/processor_test.go +++ b/storage/metric/processor_test.go @@ -875,7 +875,7 @@ func TestCuratorCompactionProcessor(t *testing.T) { updates := &noopUpdater{} - stop := make(chan bool) + stop := make(chan struct{}) defer close(stop) c := NewCurator(&CuratorOptions{ @@ -1401,7 +1401,7 @@ func TestCuratorDeletionProcessor(t *testing.T) { updates := &noopUpdater{} - stop := make(chan bool) + stop := make(chan struct{}) defer close(stop) c := NewCurator(&CuratorOptions{