diff --git a/main.go b/main.go index eaa4f5bca..5a18975e8 100644 --- a/main.go +++ b/main.go @@ -68,63 +68,38 @@ var ( type prometheus struct { unwrittenSamples chan *extraction.Result - ruleManager manager.RuleManager - targetManager retrieval.TargetManager - notifications chan notification.NotificationReqs - storage local.Storage - remoteTSDBQueue *remote.TSDBQueueManager + ruleManager manager.RuleManager + targetManager retrieval.TargetManager + notificationHandler *notification.NotificationHandler + storage local.Storage + remoteTSDBQueue *remote.TSDBQueueManager + + webService *web.WebService closeOnce sync.Once } -func (p *prometheus) interruptHandler() { - notifier := make(chan os.Signal) - signal.Notify(notifier, os.Interrupt, syscall.SIGTERM) - - <-notifier - - glog.Warning("Received SIGINT/SIGTERM; Exiting gracefully...") - - p.Close() -} - -func (p *prometheus) Close() { - p.closeOnce.Do(p.close) -} - -func (p *prometheus) close() { - // The "Done" remarks are a misnomer for some subsystems due to lack of - // blocking and synchronization. - glog.Info("Shutdown has been requested; subsytems are closing:") - p.targetManager.Stop() - glog.Info("Remote Target Manager: Done") - p.ruleManager.Stop() - glog.Info("Rule Executor: Done") - - close(p.unwrittenSamples) - // Note: Before closing the remaining subsystems (storage, ...), we have - // to wait until p.unwrittenSamples is actually drained. Therefore, - // things are closed in main(), after the loop consuming - // p.unwrittenSamples has finished. -} - -func main() { - // TODO(all): Future additions to main should be, where applicable, glumped - // into the prometheus struct above---at least where the scoping of the entire - // server is concerned. - flag.Parse() - - versionInfoTmpl.Execute(os.Stdout, BuildInfo) - - if *printVersion { - os.Exit(0) - } - +// NewPrometheus creates a new prometheus object based on flag values. +// Call Serve() to start serving and Close() for clean shutdown. +func NewPrometheus() *prometheus { conf, err := config.LoadFromFile(*configFile) if err != nil { glog.Fatalf("Error loading configuration from %s: %v", *configFile, err) } + unwrittenSamples := make(chan *extraction.Result, *samplesQueueCapacity) + + ingester := &retrieval.MergeLabelsIngester{ + Labels: conf.GlobalLabels(), + CollisionPrefix: clientmodel.ExporterLabelPrefix, + Ingester: retrieval.ChannelIngester(unwrittenSamples), + } + targetManager := retrieval.NewTargetManager(ingester) + targetManager.AddTargetsFromConfig(conf) + + notificationHandler := notification.NewNotificationHandler(*alertmanagerUrl, *notificationQueueCapacity) + registry.MustRegister(notificationHandler) + o := &local.MemorySeriesStorageOptions{ MemoryEvictionInterval: *memoryEvictionInterval, MemoryRetentionPeriod: *memoryRetentionPeriod, @@ -138,6 +113,17 @@ func main() { } registry.MustRegister(memStorage) + ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{ + Results: unwrittenSamples, + NotificationHandler: notificationHandler, + EvaluationInterval: conf.EvaluationInterval(), + Storage: memStorage, + PrometheusUrl: web.MustBuildServerUrl(), + }) + if err := ruleManager.AddRulesFromConfig(conf); err != nil { + glog.Fatal("Error loading rule files: ", err) + } + var remoteTSDBQueue *remote.TSDBQueueManager if *remoteTSDBUrl == "" { glog.Warningf("No TSDB URL provided; not sending any samples to long-term storage") @@ -145,46 +131,12 @@ func main() { openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout) remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 512) registry.MustRegister(remoteTSDBQueue) - go remoteTSDBQueue.Run() } - unwrittenSamples := make(chan *extraction.Result, *samplesQueueCapacity) - ingester := &retrieval.MergeLabelsIngester{ - Labels: conf.GlobalLabels(), - CollisionPrefix: clientmodel.ExporterLabelPrefix, - - Ingester: retrieval.ChannelIngester(unwrittenSamples), - } - - // Queue depth will need to be exposed - targetManager := retrieval.NewTargetManager(ingester) - targetManager.AddTargetsFromConfig(conf) - - notifications := make(chan notification.NotificationReqs, *notificationQueueCapacity) - - // Queue depth will need to be exposed - ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{ - Results: unwrittenSamples, - Notifications: notifications, - EvaluationInterval: conf.EvaluationInterval(), - Storage: memStorage, - PrometheusUrl: web.MustBuildServerUrl(), - }) - if err := ruleManager.AddRulesFromConfig(conf); err != nil { - glog.Fatal("Error loading rule files: ", err) - } - go ruleManager.Run() - - notificationHandler := notification.NewNotificationHandler(*alertmanagerUrl, notifications) - registry.MustRegister(notificationHandler) - go notificationHandler.Run() - flags := map[string]string{} - flag.VisitAll(func(f *flag.Flag) { flags[f.Name] = f.Value.String() }) - prometheusStatus := &web.PrometheusStatusHandler{ BuildInfo: BuildInfo, Config: conf.String(), @@ -208,62 +160,110 @@ func main() { Storage: memStorage, } - prometheus := &prometheus{ - unwrittenSamples: unwrittenSamples, - - ruleManager: ruleManager, - targetManager: targetManager, - notifications: notifications, - storage: memStorage, - remoteTSDBQueue: remoteTSDBQueue, - } - webService := &web.WebService{ StatusHandler: prometheusStatus, MetricsHandler: metricsService, ConsolesHandler: consolesHandler, AlertsHandler: alertsHandler, - - QuitDelegate: prometheus.Close, } - storageStarted := make(chan bool) - go memStorage.Serve(storageStarted) + p := &prometheus{ + unwrittenSamples: unwrittenSamples, + + ruleManager: ruleManager, + targetManager: targetManager, + notificationHandler: notificationHandler, + storage: memStorage, + remoteTSDBQueue: remoteTSDBQueue, + + webService: webService, + } + webService.QuitDelegate = p.Close + return p +} + +// Serve starts the Prometheus server. It returns after the server has been shut +// down. The method installs an interrupt handler, allowing to trigger a +// shutdown by sending SIGTERM to the process. +func (p *prometheus) Serve() { + if p.remoteTSDBQueue != nil { + go p.remoteTSDBQueue.Run() + } + go p.ruleManager.Run() + go p.notificationHandler.Run() + go p.interruptHandler() + + storageStarted := make(chan struct{}) + go p.storage.Serve(storageStarted) <-storageStarted - go prometheus.interruptHandler() - go func() { - err := webService.ServeForever() + err := p.webService.ServeForever() if err != nil { glog.Fatal(err) } }() - // TODO(all): Migrate this into prometheus.serve(). - for block := range unwrittenSamples { + for block := range p.unwrittenSamples { if block.Err == nil && len(block.Samples) > 0 { - memStorage.AppendSamples(block.Samples) - if remoteTSDBQueue != nil { - remoteTSDBQueue.Queue(block.Samples) + p.storage.AppendSamples(block.Samples) + if p.remoteTSDBQueue != nil { + p.remoteTSDBQueue.Queue(block.Samples) } } } - // Note: It might appear tempting to move the code below into - // prometheus.Close(), but we have to wait for the unwrittenSamples loop - // above to exit before we can do the below. - if err := prometheus.storage.Close(); err != nil { + // The following shut-down operations have to happen after + // unwrittenSamples is drained. So do not move them into close(). + if err := p.storage.Close(); err != nil { glog.Error("Error closing local storage: ", err) } glog.Info("Local Storage: Done") - if prometheus.remoteTSDBQueue != nil { - prometheus.remoteTSDBQueue.Close() + if p.remoteTSDBQueue != nil { + p.remoteTSDBQueue.Stop() glog.Info("Remote Storage: Done") } - close(prometheus.notifications) + p.notificationHandler.Stop() glog.Info("Sundry Queues: Done") glog.Info("See you next time!") } + +// Close cleanly shuts down the Prometheus server. +func (p *prometheus) Close() { + p.closeOnce.Do(p.close) +} + +func (p *prometheus) interruptHandler() { + notifier := make(chan os.Signal) + signal.Notify(notifier, os.Interrupt, syscall.SIGTERM) + <-notifier + + glog.Warning("Received SIGTERM, exiting gracefully...") + p.Close() +} + +func (p *prometheus) close() { + glog.Info("Shutdown has been requested; subsytems are closing:") + p.targetManager.Stop() + glog.Info("Remote Target Manager: Done") + p.ruleManager.Stop() + glog.Info("Rule Executor: Done") + + close(p.unwrittenSamples) + // Note: Before closing the remaining subsystems (storage, ...), we have + // to wait until p.unwrittenSamples is actually drained. Therefore, + // remaining shut-downs happen in Serve(). +} + +func main() { + flag.Parse() + versionInfoTmpl.Execute(os.Stdout, BuildInfo) + + if *printVersion { + os.Exit(0) + } + + NewPrometheus().Serve() +} diff --git a/notification/notification.go b/notification/notification.go index 6bb7c2658..745811e3f 100644 --- a/notification/notification.go +++ b/notification/notification.go @@ -81,21 +81,24 @@ type NotificationHandler struct { // The URL of the alert manager to send notifications to. alertmanagerUrl string // Buffer of notifications that have not yet been sent. - pendingNotifications <-chan NotificationReqs + pendingNotifications chan NotificationReqs // HTTP client with custom timeout settings. httpClient httpPoster notificationLatency *prometheus.SummaryVec notificationsQueueLength prometheus.Gauge notificationsQueueCapacity prometheus.Metric + + stopped chan struct{} } // Construct a new NotificationHandler. -func NewNotificationHandler(alertmanagerUrl string, notificationReqs <-chan NotificationReqs) *NotificationHandler { +func NewNotificationHandler(alertmanagerUrl string, notificationQueueCapacity int) *NotificationHandler { return &NotificationHandler{ alertmanagerUrl: alertmanagerUrl, - pendingNotifications: notificationReqs, - httpClient: utility.NewDeadlineClient(*deadline), + pendingNotifications: make(chan NotificationReqs, notificationQueueCapacity), + + httpClient: utility.NewDeadlineClient(*deadline), notificationLatency: prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -119,8 +122,9 @@ func NewNotificationHandler(alertmanagerUrl string, notificationReqs <-chan Noti nil, nil, ), prometheus.GaugeValue, - float64(cap(notificationReqs)), + float64(notificationQueueCapacity), ), + stopped: make(chan struct{}), } } @@ -163,7 +167,7 @@ func (n *NotificationHandler) sendNotifications(reqs NotificationReqs) error { return nil } -// Continuously dispatch notifications. +// Run dispatches notifications continuously. func (n *NotificationHandler) Run() { for reqs := range n.pendingNotifications { if n.alertmanagerUrl == "" { @@ -185,6 +189,18 @@ func (n *NotificationHandler) Run() { float64(time.Since(begin) / time.Millisecond), ) } + close(n.stopped) +} + +// SubmitReqs queues the given notification requests for processing. +func (n *NotificationHandler) SubmitReqs(reqs NotificationReqs) { + n.pendingNotifications <- reqs +} + +// Stop shuts down the notification handler. +func (n *NotificationHandler) Stop() { + close(n.pendingNotifications) + <-n.stopped } // Describe implements prometheus.Collector. diff --git a/notification/notification_test.go b/notification/notification_test.go index 5bd856b89..611748d92 100644 --- a/notification/notification_test.go +++ b/notification/notification_test.go @@ -46,9 +46,8 @@ type testNotificationScenario struct { } func (s *testNotificationScenario) test(i int, t *testing.T) { - notifications := make(chan NotificationReqs) - defer close(notifications) - h := NewNotificationHandler("alertmanager_url", notifications) + h := NewNotificationHandler("alertmanager_url", 0) + defer h.Stop() receivedPost := make(chan bool, 1) poster := testHttpPoster{receivedPost: receivedPost} @@ -56,7 +55,7 @@ func (s *testNotificationScenario) test(i int, t *testing.T) { go h.Run() - notifications <- NotificationReqs{ + h.SubmitReqs(NotificationReqs{ { Summary: s.summary, Description: s.description, @@ -68,7 +67,7 @@ func (s *testNotificationScenario) test(i int, t *testing.T) { RuleString: "Test rule string", GeneratorUrl: "prometheus_url", }, - } + }) <-receivedPost if poster.message != s.message { diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index eda6bef81..f88e71d34 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -57,7 +57,7 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool { glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName()) m.poolsByJob[job.GetName()] = targetPool - // BUG(all): Investigate whether this auto-goroutine creation is desired. + // TODO: Investigate whether this auto-goroutine creation is desired. go targetPool.Run() } @@ -111,7 +111,7 @@ func (m *targetManager) Stop() { } } -// XXX: Not really thread-safe. Only used in /status page for now. +// TODO: Not really thread-safe. Only used in /status page for now. func (m *targetManager) Pools() map[string]*TargetPool { return m.poolsByJob } diff --git a/retrieval/targetmanager_test.go b/retrieval/targetmanager_test.go index 885e6a69f..f82cfb2a3 100644 --- a/retrieval/targetmanager_test.go +++ b/retrieval/targetmanager_test.go @@ -76,7 +76,7 @@ func (t fakeTarget) State() TargetState { return ALIVE } -func (t *fakeTarget) Merge(newTarget Target) {} +func (t *fakeTarget) SetBaseLabelsFrom(newTarget Target) {} func testTargetManager(t testing.TB) { targetManager := NewTargetManager(nopIngester{}) diff --git a/retrieval/targetpool.go b/retrieval/targetpool.go index f1c4b0c60..6b670acbe 100644 --- a/retrieval/targetpool.go +++ b/retrieval/targetpool.go @@ -30,7 +30,7 @@ const ( type TargetPool struct { sync.RWMutex - done chan chan bool + done chan chan struct{} manager TargetManager targetsByAddress map[string]Target interval time.Duration @@ -48,7 +48,7 @@ func NewTargetPool(m TargetManager, p TargetProvider, ing extraction.Ingester, i targetsByAddress: make(map[string]Target), addTargetQueue: make(chan Target, targetAddQueueSize), targetProvider: p, - done: make(chan chan bool), + done: make(chan chan struct{}), } } @@ -72,14 +72,14 @@ func (p *TargetPool) Run() { case stopped := <-p.done: p.ReplaceTargets([]Target{}) glog.Info("TargetPool exiting...") - stopped <- true + close(stopped) return } } } func (p *TargetPool) Stop() { - stopped := make(chan bool) + stopped := make(chan struct{}) p.done <- stopped <-stopped } diff --git a/rules/manager/manager.go b/rules/manager/manager.go index 6a00e9615..51e63ae9f 100644 --- a/rules/manager/manager.go +++ b/rules/manager/manager.go @@ -85,8 +85,8 @@ type ruleManager struct { interval time.Duration storage local.Storage - results chan<- *extraction.Result - notifications chan<- notification.NotificationReqs + results chan<- *extraction.Result + notificationHandler *notification.NotificationHandler prometheusUrl string } @@ -95,8 +95,8 @@ type RuleManagerOptions struct { EvaluationInterval time.Duration Storage local.Storage - Notifications chan<- notification.NotificationReqs - Results chan<- *extraction.Result + NotificationHandler *notification.NotificationHandler + Results chan<- *extraction.Result PrometheusUrl string } @@ -106,11 +106,11 @@ func NewRuleManager(o *RuleManagerOptions) RuleManager { rules: []rules.Rule{}, done: make(chan bool), - interval: o.EvaluationInterval, - storage: o.Storage, - results: o.Results, - notifications: o.Notifications, - prometheusUrl: o.PrometheusUrl, + interval: o.EvaluationInterval, + storage: o.Storage, + results: o.Results, + notificationHandler: o.NotificationHandler, + prometheusUrl: o.PrometheusUrl, } return manager } @@ -187,7 +187,7 @@ func (m *ruleManager) queueAlertNotifications(rule *rules.AlertingRule, timestam GeneratorUrl: m.prometheusUrl + rules.GraphLinkForExpression(rule.Vector.String()), }) } - m.notifications <- notifications + m.notificationHandler.SubmitReqs(notifications) } func (m *ruleManager) runIteration(results chan<- *extraction.Result) { diff --git a/storage/local/interface.go b/storage/local/interface.go index 69ae6b793..97856a58d 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -41,7 +41,7 @@ type Storage interface { // Construct an iterator for a given fingerprint. NewIterator(clientmodel.Fingerprint) SeriesIterator // Run the request-serving and maintenance loop. - Serve(started chan<- bool) + Serve(started chan struct{}) // Close the MetricsStorage and releases all resources. Close() error // WaitForIndexing returns once all samples in the storage are diff --git a/storage/local/storage.go b/storage/local/storage.go index ff74e370c..acd9c8f0a 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -350,7 +350,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime } // Serve implements Storage. -func (s *memorySeriesStorage) Serve(started chan<- bool) { +func (s *memorySeriesStorage) Serve(started chan struct{}) { evictMemoryTicker := time.NewTicker(s.memoryEvictionInterval) defer evictMemoryTicker.Stop() @@ -359,7 +359,7 @@ func (s *memorySeriesStorage) Serve(started chan<- bool) { stopPurge := make(chan bool) go s.purgePeriodically(stopPurge) - started <- true + close(started) for { select { case <-evictMemoryTicker.C: diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 5ec8cf9ae..92d845041 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -48,7 +48,7 @@ func NewTestStorage(t testing.TB) (Storage, test.Closer) { t.Fatalf("Error creating storage: %s", err) } - storageStarted := make(chan bool) + storageStarted := make(chan struct{}) go storage.Serve(storageStarted) <-storageStarted diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 50e083765..7edc602a7 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -119,9 +119,9 @@ func (t *TSDBQueueManager) Queue(s clientmodel.Samples) { } } -// Close stops sending samples to the TSDB and waits for pending sends to +// Stop stops sending samples to the TSDB and waits for pending sends to // complete. -func (t *TSDBQueueManager) Close() { +func (t *TSDBQueueManager) Stop() { glog.Infof("TSDB queue manager shutting down...") close(t.queue) <-t.drained diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index f14d06cb2..91d889767 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -71,7 +71,7 @@ func TestSampleDelivery(t *testing.T) { m.Queue(samples[len(samples)/2:]) go m.Run() - defer m.Close() + defer m.Stop() c.waitForExpectedSamples(t) }