Clean up start-up and shut-down.

Change-Id: Idff4bbb0a15a9f879bfbb3da5b1025179cab5e2c
This commit is contained in:
Bjoern Rabenstein 2014-10-10 14:19:02 +02:00
parent 4447708c9f
commit b3ed9aa7a2
12 changed files with 161 additions and 146 deletions

222
main.go
View file

@ -68,63 +68,38 @@ var (
type prometheus struct {
unwrittenSamples chan *extraction.Result
ruleManager manager.RuleManager
targetManager retrieval.TargetManager
notifications chan notification.NotificationReqs
storage local.Storage
remoteTSDBQueue *remote.TSDBQueueManager
ruleManager manager.RuleManager
targetManager retrieval.TargetManager
notificationHandler *notification.NotificationHandler
storage local.Storage
remoteTSDBQueue *remote.TSDBQueueManager
webService *web.WebService
closeOnce sync.Once
}
func (p *prometheus) interruptHandler() {
notifier := make(chan os.Signal)
signal.Notify(notifier, os.Interrupt, syscall.SIGTERM)
<-notifier
glog.Warning("Received SIGINT/SIGTERM; Exiting gracefully...")
p.Close()
}
func (p *prometheus) Close() {
p.closeOnce.Do(p.close)
}
func (p *prometheus) close() {
// The "Done" remarks are a misnomer for some subsystems due to lack of
// blocking and synchronization.
glog.Info("Shutdown has been requested; subsytems are closing:")
p.targetManager.Stop()
glog.Info("Remote Target Manager: Done")
p.ruleManager.Stop()
glog.Info("Rule Executor: Done")
close(p.unwrittenSamples)
// Note: Before closing the remaining subsystems (storage, ...), we have
// to wait until p.unwrittenSamples is actually drained. Therefore,
// things are closed in main(), after the loop consuming
// p.unwrittenSamples has finished.
}
func main() {
// TODO(all): Future additions to main should be, where applicable, glumped
// into the prometheus struct above---at least where the scoping of the entire
// server is concerned.
flag.Parse()
versionInfoTmpl.Execute(os.Stdout, BuildInfo)
if *printVersion {
os.Exit(0)
}
// NewPrometheus creates a new prometheus object based on flag values.
// Call Serve() to start serving and Close() for clean shutdown.
func NewPrometheus() *prometheus {
conf, err := config.LoadFromFile(*configFile)
if err != nil {
glog.Fatalf("Error loading configuration from %s: %v", *configFile, err)
}
unwrittenSamples := make(chan *extraction.Result, *samplesQueueCapacity)
ingester := &retrieval.MergeLabelsIngester{
Labels: conf.GlobalLabels(),
CollisionPrefix: clientmodel.ExporterLabelPrefix,
Ingester: retrieval.ChannelIngester(unwrittenSamples),
}
targetManager := retrieval.NewTargetManager(ingester)
targetManager.AddTargetsFromConfig(conf)
notificationHandler := notification.NewNotificationHandler(*alertmanagerUrl, *notificationQueueCapacity)
registry.MustRegister(notificationHandler)
o := &local.MemorySeriesStorageOptions{
MemoryEvictionInterval: *memoryEvictionInterval,
MemoryRetentionPeriod: *memoryRetentionPeriod,
@ -138,6 +113,17 @@ func main() {
}
registry.MustRegister(memStorage)
ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{
Results: unwrittenSamples,
NotificationHandler: notificationHandler,
EvaluationInterval: conf.EvaluationInterval(),
Storage: memStorage,
PrometheusUrl: web.MustBuildServerUrl(),
})
if err := ruleManager.AddRulesFromConfig(conf); err != nil {
glog.Fatal("Error loading rule files: ", err)
}
var remoteTSDBQueue *remote.TSDBQueueManager
if *remoteTSDBUrl == "" {
glog.Warningf("No TSDB URL provided; not sending any samples to long-term storage")
@ -145,46 +131,12 @@ func main() {
openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout)
remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 512)
registry.MustRegister(remoteTSDBQueue)
go remoteTSDBQueue.Run()
}
unwrittenSamples := make(chan *extraction.Result, *samplesQueueCapacity)
ingester := &retrieval.MergeLabelsIngester{
Labels: conf.GlobalLabels(),
CollisionPrefix: clientmodel.ExporterLabelPrefix,
Ingester: retrieval.ChannelIngester(unwrittenSamples),
}
// Queue depth will need to be exposed
targetManager := retrieval.NewTargetManager(ingester)
targetManager.AddTargetsFromConfig(conf)
notifications := make(chan notification.NotificationReqs, *notificationQueueCapacity)
// Queue depth will need to be exposed
ruleManager := manager.NewRuleManager(&manager.RuleManagerOptions{
Results: unwrittenSamples,
Notifications: notifications,
EvaluationInterval: conf.EvaluationInterval(),
Storage: memStorage,
PrometheusUrl: web.MustBuildServerUrl(),
})
if err := ruleManager.AddRulesFromConfig(conf); err != nil {
glog.Fatal("Error loading rule files: ", err)
}
go ruleManager.Run()
notificationHandler := notification.NewNotificationHandler(*alertmanagerUrl, notifications)
registry.MustRegister(notificationHandler)
go notificationHandler.Run()
flags := map[string]string{}
flag.VisitAll(func(f *flag.Flag) {
flags[f.Name] = f.Value.String()
})
prometheusStatus := &web.PrometheusStatusHandler{
BuildInfo: BuildInfo,
Config: conf.String(),
@ -208,62 +160,110 @@ func main() {
Storage: memStorage,
}
prometheus := &prometheus{
unwrittenSamples: unwrittenSamples,
ruleManager: ruleManager,
targetManager: targetManager,
notifications: notifications,
storage: memStorage,
remoteTSDBQueue: remoteTSDBQueue,
}
webService := &web.WebService{
StatusHandler: prometheusStatus,
MetricsHandler: metricsService,
ConsolesHandler: consolesHandler,
AlertsHandler: alertsHandler,
QuitDelegate: prometheus.Close,
}
storageStarted := make(chan bool)
go memStorage.Serve(storageStarted)
p := &prometheus{
unwrittenSamples: unwrittenSamples,
ruleManager: ruleManager,
targetManager: targetManager,
notificationHandler: notificationHandler,
storage: memStorage,
remoteTSDBQueue: remoteTSDBQueue,
webService: webService,
}
webService.QuitDelegate = p.Close
return p
}
// Serve starts the Prometheus server. It returns after the server has been shut
// down. The method installs an interrupt handler, allowing to trigger a
// shutdown by sending SIGTERM to the process.
func (p *prometheus) Serve() {
if p.remoteTSDBQueue != nil {
go p.remoteTSDBQueue.Run()
}
go p.ruleManager.Run()
go p.notificationHandler.Run()
go p.interruptHandler()
storageStarted := make(chan struct{})
go p.storage.Serve(storageStarted)
<-storageStarted
go prometheus.interruptHandler()
go func() {
err := webService.ServeForever()
err := p.webService.ServeForever()
if err != nil {
glog.Fatal(err)
}
}()
// TODO(all): Migrate this into prometheus.serve().
for block := range unwrittenSamples {
for block := range p.unwrittenSamples {
if block.Err == nil && len(block.Samples) > 0 {
memStorage.AppendSamples(block.Samples)
if remoteTSDBQueue != nil {
remoteTSDBQueue.Queue(block.Samples)
p.storage.AppendSamples(block.Samples)
if p.remoteTSDBQueue != nil {
p.remoteTSDBQueue.Queue(block.Samples)
}
}
}
// Note: It might appear tempting to move the code below into
// prometheus.Close(), but we have to wait for the unwrittenSamples loop
// above to exit before we can do the below.
if err := prometheus.storage.Close(); err != nil {
// The following shut-down operations have to happen after
// unwrittenSamples is drained. So do not move them into close().
if err := p.storage.Close(); err != nil {
glog.Error("Error closing local storage: ", err)
}
glog.Info("Local Storage: Done")
if prometheus.remoteTSDBQueue != nil {
prometheus.remoteTSDBQueue.Close()
if p.remoteTSDBQueue != nil {
p.remoteTSDBQueue.Stop()
glog.Info("Remote Storage: Done")
}
close(prometheus.notifications)
p.notificationHandler.Stop()
glog.Info("Sundry Queues: Done")
glog.Info("See you next time!")
}
// Close cleanly shuts down the Prometheus server.
func (p *prometheus) Close() {
p.closeOnce.Do(p.close)
}
func (p *prometheus) interruptHandler() {
notifier := make(chan os.Signal)
signal.Notify(notifier, os.Interrupt, syscall.SIGTERM)
<-notifier
glog.Warning("Received SIGTERM, exiting gracefully...")
p.Close()
}
func (p *prometheus) close() {
glog.Info("Shutdown has been requested; subsytems are closing:")
p.targetManager.Stop()
glog.Info("Remote Target Manager: Done")
p.ruleManager.Stop()
glog.Info("Rule Executor: Done")
close(p.unwrittenSamples)
// Note: Before closing the remaining subsystems (storage, ...), we have
// to wait until p.unwrittenSamples is actually drained. Therefore,
// remaining shut-downs happen in Serve().
}
func main() {
flag.Parse()
versionInfoTmpl.Execute(os.Stdout, BuildInfo)
if *printVersion {
os.Exit(0)
}
NewPrometheus().Serve()
}

View file

@ -81,21 +81,24 @@ type NotificationHandler struct {
// The URL of the alert manager to send notifications to.
alertmanagerUrl string
// Buffer of notifications that have not yet been sent.
pendingNotifications <-chan NotificationReqs
pendingNotifications chan NotificationReqs
// HTTP client with custom timeout settings.
httpClient httpPoster
notificationLatency *prometheus.SummaryVec
notificationsQueueLength prometheus.Gauge
notificationsQueueCapacity prometheus.Metric
stopped chan struct{}
}
// Construct a new NotificationHandler.
func NewNotificationHandler(alertmanagerUrl string, notificationReqs <-chan NotificationReqs) *NotificationHandler {
func NewNotificationHandler(alertmanagerUrl string, notificationQueueCapacity int) *NotificationHandler {
return &NotificationHandler{
alertmanagerUrl: alertmanagerUrl,
pendingNotifications: notificationReqs,
httpClient: utility.NewDeadlineClient(*deadline),
pendingNotifications: make(chan NotificationReqs, notificationQueueCapacity),
httpClient: utility.NewDeadlineClient(*deadline),
notificationLatency: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
@ -119,8 +122,9 @@ func NewNotificationHandler(alertmanagerUrl string, notificationReqs <-chan Noti
nil, nil,
),
prometheus.GaugeValue,
float64(cap(notificationReqs)),
float64(notificationQueueCapacity),
),
stopped: make(chan struct{}),
}
}
@ -163,7 +167,7 @@ func (n *NotificationHandler) sendNotifications(reqs NotificationReqs) error {
return nil
}
// Continuously dispatch notifications.
// Run dispatches notifications continuously.
func (n *NotificationHandler) Run() {
for reqs := range n.pendingNotifications {
if n.alertmanagerUrl == "" {
@ -185,6 +189,18 @@ func (n *NotificationHandler) Run() {
float64(time.Since(begin) / time.Millisecond),
)
}
close(n.stopped)
}
// SubmitReqs queues the given notification requests for processing.
func (n *NotificationHandler) SubmitReqs(reqs NotificationReqs) {
n.pendingNotifications <- reqs
}
// Stop shuts down the notification handler.
func (n *NotificationHandler) Stop() {
close(n.pendingNotifications)
<-n.stopped
}
// Describe implements prometheus.Collector.

View file

@ -46,9 +46,8 @@ type testNotificationScenario struct {
}
func (s *testNotificationScenario) test(i int, t *testing.T) {
notifications := make(chan NotificationReqs)
defer close(notifications)
h := NewNotificationHandler("alertmanager_url", notifications)
h := NewNotificationHandler("alertmanager_url", 0)
defer h.Stop()
receivedPost := make(chan bool, 1)
poster := testHttpPoster{receivedPost: receivedPost}
@ -56,7 +55,7 @@ func (s *testNotificationScenario) test(i int, t *testing.T) {
go h.Run()
notifications <- NotificationReqs{
h.SubmitReqs(NotificationReqs{
{
Summary: s.summary,
Description: s.description,
@ -68,7 +67,7 @@ func (s *testNotificationScenario) test(i int, t *testing.T) {
RuleString: "Test rule string",
GeneratorUrl: "prometheus_url",
},
}
})
<-receivedPost
if poster.message != s.message {

View file

@ -57,7 +57,7 @@ func (m *targetManager) TargetPoolForJob(job config.JobConfig) *TargetPool {
glog.Infof("Pool for job %s does not exist; creating and starting...", job.GetName())
m.poolsByJob[job.GetName()] = targetPool
// BUG(all): Investigate whether this auto-goroutine creation is desired.
// TODO: Investigate whether this auto-goroutine creation is desired.
go targetPool.Run()
}
@ -111,7 +111,7 @@ func (m *targetManager) Stop() {
}
}
// XXX: Not really thread-safe. Only used in /status page for now.
// TODO: Not really thread-safe. Only used in /status page for now.
func (m *targetManager) Pools() map[string]*TargetPool {
return m.poolsByJob
}

View file

@ -76,7 +76,7 @@ func (t fakeTarget) State() TargetState {
return ALIVE
}
func (t *fakeTarget) Merge(newTarget Target) {}
func (t *fakeTarget) SetBaseLabelsFrom(newTarget Target) {}
func testTargetManager(t testing.TB) {
targetManager := NewTargetManager(nopIngester{})

View file

@ -30,7 +30,7 @@ const (
type TargetPool struct {
sync.RWMutex
done chan chan bool
done chan chan struct{}
manager TargetManager
targetsByAddress map[string]Target
interval time.Duration
@ -48,7 +48,7 @@ func NewTargetPool(m TargetManager, p TargetProvider, ing extraction.Ingester, i
targetsByAddress: make(map[string]Target),
addTargetQueue: make(chan Target, targetAddQueueSize),
targetProvider: p,
done: make(chan chan bool),
done: make(chan chan struct{}),
}
}
@ -72,14 +72,14 @@ func (p *TargetPool) Run() {
case stopped := <-p.done:
p.ReplaceTargets([]Target{})
glog.Info("TargetPool exiting...")
stopped <- true
close(stopped)
return
}
}
}
func (p *TargetPool) Stop() {
stopped := make(chan bool)
stopped := make(chan struct{})
p.done <- stopped
<-stopped
}

View file

@ -85,8 +85,8 @@ type ruleManager struct {
interval time.Duration
storage local.Storage
results chan<- *extraction.Result
notifications chan<- notification.NotificationReqs
results chan<- *extraction.Result
notificationHandler *notification.NotificationHandler
prometheusUrl string
}
@ -95,8 +95,8 @@ type RuleManagerOptions struct {
EvaluationInterval time.Duration
Storage local.Storage
Notifications chan<- notification.NotificationReqs
Results chan<- *extraction.Result
NotificationHandler *notification.NotificationHandler
Results chan<- *extraction.Result
PrometheusUrl string
}
@ -106,11 +106,11 @@ func NewRuleManager(o *RuleManagerOptions) RuleManager {
rules: []rules.Rule{},
done: make(chan bool),
interval: o.EvaluationInterval,
storage: o.Storage,
results: o.Results,
notifications: o.Notifications,
prometheusUrl: o.PrometheusUrl,
interval: o.EvaluationInterval,
storage: o.Storage,
results: o.Results,
notificationHandler: o.NotificationHandler,
prometheusUrl: o.PrometheusUrl,
}
return manager
}
@ -187,7 +187,7 @@ func (m *ruleManager) queueAlertNotifications(rule *rules.AlertingRule, timestam
GeneratorUrl: m.prometheusUrl + rules.GraphLinkForExpression(rule.Vector.String()),
})
}
m.notifications <- notifications
m.notificationHandler.SubmitReqs(notifications)
}
func (m *ruleManager) runIteration(results chan<- *extraction.Result) {

View file

@ -41,7 +41,7 @@ type Storage interface {
// Construct an iterator for a given fingerprint.
NewIterator(clientmodel.Fingerprint) SeriesIterator
// Run the request-serving and maintenance loop.
Serve(started chan<- bool)
Serve(started chan struct{})
// Close the MetricsStorage and releases all resources.
Close() error
// WaitForIndexing returns once all samples in the storage are

View file

@ -350,7 +350,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
}
// Serve implements Storage.
func (s *memorySeriesStorage) Serve(started chan<- bool) {
func (s *memorySeriesStorage) Serve(started chan struct{}) {
evictMemoryTicker := time.NewTicker(s.memoryEvictionInterval)
defer evictMemoryTicker.Stop()
@ -359,7 +359,7 @@ func (s *memorySeriesStorage) Serve(started chan<- bool) {
stopPurge := make(chan bool)
go s.purgePeriodically(stopPurge)
started <- true
close(started)
for {
select {
case <-evictMemoryTicker.C:

View file

@ -48,7 +48,7 @@ func NewTestStorage(t testing.TB) (Storage, test.Closer) {
t.Fatalf("Error creating storage: %s", err)
}
storageStarted := make(chan bool)
storageStarted := make(chan struct{})
go storage.Serve(storageStarted)
<-storageStarted

View file

@ -119,9 +119,9 @@ func (t *TSDBQueueManager) Queue(s clientmodel.Samples) {
}
}
// Close stops sending samples to the TSDB and waits for pending sends to
// Stop stops sending samples to the TSDB and waits for pending sends to
// complete.
func (t *TSDBQueueManager) Close() {
func (t *TSDBQueueManager) Stop() {
glog.Infof("TSDB queue manager shutting down...")
close(t.queue)
<-t.drained

View file

@ -71,7 +71,7 @@ func TestSampleDelivery(t *testing.T) {
m.Queue(samples[len(samples)/2:])
go m.Run()
defer m.Close()
defer m.Stop()
c.waitForExpectedSamples(t)
}