diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 9aa9d6d038..a5885e3762 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -28,6 +28,7 @@ import ( "path/filepath" "runtime" "strings" + "sync" "syscall" "time" @@ -286,7 +287,10 @@ func main() { reloaders := []func(cfg *config.Config) error{ remoteStorage.ApplyConfig, webHandler.ApplyConfig, + // The Scrape and notifier managers need to reload before the Discovery manager as + // they need to read the most updated config when receiving the new targets list. notifier.ApplyConfig, + scrapeManager.ApplyConfig, func(cfg *config.Config) error { c := make(map[string]sd_config.ServiceDiscoveryConfig) for _, v := range cfg.ScrapeConfigs { @@ -306,7 +310,6 @@ func main() { } return discoveryManagerNotify.ApplyConfig(c) }, - scrapeManager.ApplyConfig, func(cfg *config.Config) error { // Get all rule files matching the configuration oaths. var files []string @@ -328,8 +331,22 @@ func main() { // Start all components while we wait for TSDB to open but only load // initial config and mark ourselves as ready after it completed. dbOpen := make(chan struct{}) - // Wait until the server is ready to handle reloading - reloadReady := make(chan struct{}) + + // sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded). + type closeOnce struct { + C chan struct{} + once sync.Once + Close func() + } + // Wait until the server is ready to handle reloading. + reloadReady := &closeOnce{ + C: make(chan struct{}), + } + reloadReady.Close = func() { + reloadReady.once.Do(func() { + close(reloadReady.C) + }) + } var g group.Group { @@ -338,12 +355,16 @@ func main() { cancel := make(chan struct{}) g.Add( func() error { + // Don't forget to release the reloadReady channel so that waiting blocks can exit normally. select { case <-term: level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...") + reloadReady.Close() + case <-webHandler.Quit(): level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...") case <-cancel: + reloadReady.Close() break } return nil @@ -384,6 +405,15 @@ func main() { { g.Add( func() error { + // When the scrape manager receives a new targets list + // it needs to read a valid config for each job. + // It depends on the config being in sync with the discovery manager so + // we wait until the config is fully loaded. + select { + case <-reloadReady.C: + break + } + err := scrapeManager.Run(discoveryManagerScrape.SyncCh()) level.Info(logger).Log("msg", "Scrape manager stopped") return err @@ -405,11 +435,8 @@ func main() { g.Add( func() error { select { - case <-reloadReady: + case <-reloadReady.C: break - // In case a shutdown is initiated before the reloadReady is released. - case <-cancel: - return nil } for { @@ -445,6 +472,7 @@ func main() { break // In case a shutdown is initiated before the dbOpen is released case <-cancel: + reloadReady.Close() return nil } @@ -452,9 +480,10 @@ func main() { return fmt.Errorf("Error loading config %s", err) } - close(reloadReady) + reloadReady.Close() + webHandler.Ready() - level.Info(logger).Log("msg", "Server is ready to receive requests.") + level.Info(logger).Log("msg", "Server is ready to receive web requests.") <-cancel return nil }, @@ -529,7 +558,16 @@ func main() { // so keep this interrupt after the ruleManager.Stop(). g.Add( func() error { + // When the notifier manager receives a new targets list + // it needs to read a valid config for each job. + // It depends on the config being in sync with the discovery manager + // so we wait until the config is fully loaded. + select { + case <-reloadReady.C: + break + } notifier.Run(discoveryManagerNotify.SyncCh()) + level.Info(logger).Log("msg", "Notifier manager stopped") return nil }, func(err error) { diff --git a/discovery/manager.go b/discovery/manager.go index 5372a7870c..ae10de5cb8 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -16,6 +16,7 @@ package discovery import ( "context" "fmt" + "sync" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -61,10 +62,10 @@ type poolKey struct { func NewManager(logger log.Logger) *Manager { return &Manager{ logger: logger, - actionCh: make(chan func(context.Context)), syncCh: make(chan map[string][]*targetgroup.Group), targets: make(map[poolKey]map[string]*targetgroup.Group), discoverCancel: []context.CancelFunc{}, + ctx: context.Background(), } } @@ -72,7 +73,8 @@ func NewManager(logger log.Logger) *Manager { // Targets are grouped by the target set name. type Manager struct { logger log.Logger - actionCh chan func(context.Context) + mtx sync.RWMutex + ctx context.Context discoverCancel []context.CancelFunc // Some Discoverers(eg. k8s) send only the updates for a given target group // so we use map[tg.Source]*targetgroup.Group to know which group to update. @@ -83,10 +85,9 @@ type Manager struct { // Run starts the background processing func (m *Manager) Run(ctx context.Context) error { + m.ctx = ctx for { select { - case f := <-m.actionCh: - f(ctx) case <-ctx.Done(): m.cancelDiscoverers() return ctx.Err() @@ -101,18 +102,17 @@ func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group { // ApplyConfig removes all running discovery providers and starts new ones using the provided config. func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error { - err := make(chan error) - m.actionCh <- func(ctx context.Context) { - m.cancelDiscoverers() - for name, scfg := range cfg { - for provName, prov := range m.providersFromConfig(scfg) { - m.startProvider(ctx, poolKey{setName: name, provider: provName}, prov) - } + m.mtx.Lock() + defer m.mtx.Unlock() + + m.cancelDiscoverers() + for name, scfg := range cfg { + for provName, prov := range m.providersFromConfig(scfg) { + m.startProvider(m.ctx, poolKey{setName: name, provider: provName}, prov) } - close(err) } - return <-err + return nil } func (m *Manager) startProvider(ctx context.Context, poolKey poolKey, worker Discoverer) { @@ -151,39 +151,32 @@ func (m *Manager) cancelDiscoverers() { } func (m *Manager) updateGroup(poolKey poolKey, tgs []*targetgroup.Group) { - done := make(chan struct{}) + m.mtx.Lock() + defer m.mtx.Unlock() - m.actionCh <- func(ctx context.Context) { - for _, tg := range tgs { - if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. - if _, ok := m.targets[poolKey]; !ok { - m.targets[poolKey] = make(map[string]*targetgroup.Group) - } - m.targets[poolKey][tg.Source] = tg + for _, tg := range tgs { + if tg != nil { // Some Discoverers send nil target group so need to check for it to avoid panics. + if _, ok := m.targets[poolKey]; !ok { + m.targets[poolKey] = make(map[string]*targetgroup.Group) } + m.targets[poolKey][tg.Source] = tg } - close(done) - } - <-done } func (m *Manager) allGroups() map[string][]*targetgroup.Group { - tSets := make(chan map[string][]*targetgroup.Group) + m.mtx.Lock() + defer m.mtx.Unlock() - m.actionCh <- func(ctx context.Context) { - tSetsAll := map[string][]*targetgroup.Group{} - for pkey, tsets := range m.targets { - for _, tg := range tsets { - // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' - // to signal that it needs to stop all scrape loops for this target set. - tSetsAll[pkey.setName] = append(tSetsAll[pkey.setName], tg) - } + tSets := map[string][]*targetgroup.Group{} + for pkey, tsets := range m.targets { + for _, tg := range tsets { + // Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager' + // to signal that it needs to stop all scrape loops for this target set. + tSets[pkey.setName] = append(tSets[pkey.setName], tg) } - tSets <- tSetsAll } - return <-tSets - + return tSets } func (m *Manager) providersFromConfig(cfg sd_config.ServiceDiscoveryConfig) map[string]Discoverer { diff --git a/notifier/notifier.go b/notifier/notifier.go index 411104383c..9a857a2876 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -490,7 +490,7 @@ func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b [] // Stop shuts down the notification handler. func (n *Notifier) Stop() { - level.Info(n.logger).Log("msg", "Stopping notification handler...") + level.Info(n.logger).Log("msg", "Stopping notification manager...") n.cancel() } diff --git a/retrieval/manager.go b/retrieval/manager.go index 4f2f77c820..724e7788a9 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -15,6 +15,8 @@ package retrieval import ( "fmt" + "reflect" + "sync" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -35,7 +37,6 @@ func NewScrapeManager(logger log.Logger, app Appendable) *ScrapeManager { return &ScrapeManager{ append: app, logger: logger, - actionCh: make(chan func()), scrapeConfigs: make(map[string]*config.ScrapeConfig), scrapePools: make(map[string]*scrapePool), graceShut: make(chan struct{}), @@ -49,7 +50,7 @@ type ScrapeManager struct { append Appendable scrapeConfigs map[string]*config.ScrapeConfig scrapePools map[string]*scrapePool - actionCh chan func() + mtx sync.RWMutex graceShut chan struct{} } @@ -59,8 +60,6 @@ func (m *ScrapeManager) Run(tsets <-chan map[string][]*targetgroup.Group) error for { select { - case f := <-m.actionCh: - f() case ts := <-tsets: m.reload(ts) case <-m.graceShut: @@ -79,52 +78,60 @@ func (m *ScrapeManager) Stop() { // ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. func (m *ScrapeManager) ApplyConfig(cfg *config.Config) error { - done := make(chan struct{}) - m.actionCh <- func() { - c := make(map[string]*config.ScrapeConfig) - for _, scfg := range cfg.ScrapeConfigs { - c[scfg.JobName] = scfg - } - m.scrapeConfigs = c - close(done) + m.mtx.Lock() + defer m.mtx.Unlock() + c := make(map[string]*config.ScrapeConfig) + for _, scfg := range cfg.ScrapeConfigs { + c[scfg.JobName] = scfg } - <-done + m.scrapeConfigs = c + + // Cleanup and reload pool if config has changed. + for name, sp := range m.scrapePools { + if cfg, ok := m.scrapeConfigs[name]; !ok { + sp.stop() + delete(m.scrapePools, name) + } else if !reflect.DeepEqual(sp.config, cfg) { + sp.reload(cfg) + } + } + return nil } // TargetMap returns map of active and dropped targets and their corresponding scrape config job name. func (m *ScrapeManager) TargetMap() map[string][]*Target { - targetsMap := make(chan map[string][]*Target) - m.actionCh <- func() { - targets := make(map[string][]*Target) - for jobName, sp := range m.scrapePools { - sp.mtx.RLock() - for _, t := range sp.targets { - targets[jobName] = append(targets[jobName], t) - } - targets[jobName] = append(targets[jobName], sp.droppedTargets...) - sp.mtx.RUnlock() + m.mtx.Lock() + defer m.mtx.Unlock() + + targets := make(map[string][]*Target) + for jobName, sp := range m.scrapePools { + sp.mtx.RLock() + for _, t := range sp.targets { + targets[jobName] = append(targets[jobName], t) } - targetsMap <- targets + targets[jobName] = append(targets[jobName], sp.droppedTargets...) + sp.mtx.RUnlock() } - return <-targetsMap + + return targets } // Targets returns the targets currently being scraped. func (m *ScrapeManager) Targets() []*Target { - targets := make(chan []*Target) - m.actionCh <- func() { - var t []*Target - for _, p := range m.scrapePools { - p.mtx.RLock() - for _, tt := range p.targets { - t = append(t, tt) - } - p.mtx.RUnlock() + m.mtx.Lock() + defer m.mtx.Unlock() + + var targets []*Target + for _, p := range m.scrapePools { + p.mtx.RLock() + for _, tt := range p.targets { + targets = append(targets, tt) } - targets <- t + p.mtx.RUnlock() } - return <-targets + + return targets } func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) { @@ -146,12 +153,4 @@ func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) { existing.Sync(tgroup) } } - - // Cleanup - check the config and cancel the scrape loops if it don't exist in the scrape config. - for name, sp := range m.scrapePools { - if _, ok := m.scrapeConfigs[name]; !ok { - sp.stop() - delete(m.scrapePools, name) - } - } }