Merge "Use idiomatic one-to-many one-time signal pattern."

This commit is contained in:
Matt T. Proud 2014-04-15 21:26:31 +02:00 committed by Gerrit Code Review
commit 58ef638e72
4 changed files with 20 additions and 24 deletions

11
main.go
View file

@ -75,7 +75,7 @@ type prometheus struct {
deletionTimer *time.Ticker deletionTimer *time.Ticker
curationSema chan struct{} curationSema chan struct{}
stopBackgroundOperations chan bool stopBackgroundOperations chan struct{}
unwrittenSamples chan *extraction.Result unwrittenSamples chan *extraction.Result
@ -167,8 +167,10 @@ func (p *prometheus) delete(olderThan time.Duration, batchSize int) error {
} }
func (p *prometheus) close() { func (p *prometheus) close() {
// Disallow further curation work.
close(p.curationSema) close(p.curationSema)
// Stop curation timers.
if p.compactionTimer != nil { if p.compactionTimer != nil {
p.compactionTimer.Stop() p.compactionTimer.Stop()
} }
@ -177,9 +179,7 @@ func (p *prometheus) close() {
} }
// Stop any currently active curation (deletion or compaction). // Stop any currently active curation (deletion or compaction).
if len(p.stopBackgroundOperations) == 0 { close(p.stopBackgroundOperations)
p.stopBackgroundOperations <- true
}
p.ruleManager.Stop() p.ruleManager.Stop()
p.targetManager.Stop() p.targetManager.Stop()
@ -192,7 +192,6 @@ func (p *prometheus) close() {
} }
close(p.notifications) close(p.notifications)
close(p.stopBackgroundOperations)
} }
func main() { func main() {
@ -306,7 +305,7 @@ func main() {
unwrittenSamples: unwrittenSamples, unwrittenSamples: unwrittenSamples,
stopBackgroundOperations: make(chan bool, 1), stopBackgroundOperations: make(chan struct{}),
ruleManager: ruleManager, ruleManager: ruleManager,
targetManager: targetManager, targetManager: targetManager,

View file

@ -140,7 +140,7 @@ func (s compactionTestScenario) test(t *testing.T) {
defer processor.Close() defer processor.Close()
curator := NewCurator(&CuratorOptions{ curator := NewCurator(&CuratorOptions{
Stop: make(chan bool), Stop: make(chan struct{}),
ViewQueue: ts.ViewQueue, ViewQueue: ts.ViewQueue,
}) })
defer curator.Close() defer curator.Close()

View file

@ -52,10 +52,7 @@ type CurationState struct {
// CuratorOptions bundles the parameters needed to create a Curator. // CuratorOptions bundles the parameters needed to create a Curator.
type CuratorOptions struct { type CuratorOptions struct {
// Stop functions as a channel that when empty allows the curator to operate. Stop chan struct{}
// The moment a value is ingested inside of it, the curator goes into drain
// mode.
Stop chan bool
ViewQueue chan viewJob ViewQueue chan viewJob
} }
@ -64,7 +61,7 @@ type CuratorOptions struct {
// stored samples on-disk. This is useful to compact sparse sample values into // stored samples on-disk. This is useful to compact sparse sample values into
// single sample entities to reduce keyspace load on the datastore. // single sample entities to reduce keyspace load on the datastore.
type Curator struct { type Curator struct {
stop chan bool stop chan struct{}
viewQueue chan viewJob viewQueue chan viewJob
@ -112,7 +109,7 @@ type watermarkScanner struct {
stopAt clientmodel.Timestamp stopAt clientmodel.Timestamp
// stop functions as the global stop channel for all future operations. // stop functions as the global stop channel for all future operations.
stop chan bool stop chan struct{}
// status is the outbound channel for notifying the status page of its state. // status is the outbound channel for notifying the status page of its state.
status CurationStateUpdater status CurationStateUpdater
@ -216,14 +213,6 @@ func (c *Curator) Run(ignoreYoungerThan time.Duration, instant clientmodel.Times
return return
} }
// Drain instructs the curator to stop at the next convenient moment as to not
// introduce data inconsistencies.
func (c *Curator) Drain() {
if len(c.stop) == 0 {
c.stop <- true
}
}
// Close needs to be called to cleanly dispose of a curator. // Close needs to be called to cleanly dispose of a curator.
func (c *Curator) Close() { func (c *Curator) Close() {
c.dtoSampleKeys.Close() c.dtoSampleKeys.Close()
@ -259,7 +248,15 @@ func (w *watermarkScanner) DecodeValue(in interface{}) (interface{}, error) {
} }
func (w *watermarkScanner) shouldStop() bool { func (w *watermarkScanner) shouldStop() bool {
return len(w.stop) != 0 select {
case _, ok := <-w.stop:
if ok {
panic("channel should be closed only")
}
return true
default:
return false
}
} }
func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResult) { func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResult) {

View file

@ -875,7 +875,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
updates := &noopUpdater{} updates := &noopUpdater{}
stop := make(chan bool) stop := make(chan struct{})
defer close(stop) defer close(stop)
c := NewCurator(&CuratorOptions{ c := NewCurator(&CuratorOptions{
@ -1401,7 +1401,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
updates := &noopUpdater{} updates := &noopUpdater{}
stop := make(chan bool) stop := make(chan struct{})
defer close(stop) defer close(stop)
c := NewCurator(&CuratorOptions{ c := NewCurator(&CuratorOptions{