From d12e6f29fc487026f0225b6e55b8a9aa3e24d576 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sat, 30 Dec 2017 17:27:50 +0000 Subject: [PATCH] discovery manager ApplyConfig now takes a direct ServiceDiscoveryConfig so that it can be used for the notify manager reimplement the service discovery for the notify manager Signed-off-by: Krasi Georgiev --- cmd/prometheus/main.go | 56 ++++++++++++++++++++++++++++++++---------- discovery/manager.go | 9 +++---- notifier/notifier.go | 33 +++++++++++++++++++------ retrieval/manager.go | 11 +++------ 4 files changed, 76 insertions(+), 33 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index e53f6e0af..689827227 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -44,6 +44,7 @@ import ( promlogflag "github.com/prometheus/common/promlog/flag" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" + sd_config "github.com/prometheus/prometheus/discovery/config" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/retrieval" @@ -234,11 +235,12 @@ func main() { ctxWeb, cancelWeb = context.WithCancel(context.Background()) ctxRule = context.Background() - notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) - discoveryManager = discovery.NewManager(log.With(logger, "component", "discovery manager")) - scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) - queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) - ruleManager = rules.NewManager(&rules.ManagerOptions{ + notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) + discoveryManagerScrape = discovery.NewManager(log.With(logger, "component", "discovery manager scrape")) + discoveryManagerNotify = discovery.NewManager(log.With(logger, "component", "discovery manager notify")) + scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) + queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) + ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, QueryFunc: rules.EngineQueryFunc(queryEngine), NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()), @@ -283,7 +285,21 @@ func main() { remoteStorage.ApplyConfig, webHandler.ApplyConfig, notifier.ApplyConfig, - discoveryManager.ApplyConfig, + func(cfg *config.Config) error { + c := make(map[string]sd_config.ServiceDiscoveryConfig) + for _, v := range cfg.ScrapeConfigs { + c[v.JobName] = v.ServiceDiscoveryConfig + } + return discoveryManagerScrape.ApplyConfig(c) + }, + func(cfg *config.Config) error { + c := make(map[string]sd_config.ServiceDiscoveryConfig) + for _, v := range cfg.AlertingConfig.AlertmanagerConfigs { + // AlertmanagerConfigs doesn't hold an unique identifier so we use the config pointer as the identifier. + c[fmt.Sprintf("%p", v)] = v.ServiceDiscoveryConfig + } + return discoveryManagerNotify.ApplyConfig(c) + }, scrapeManager.ApplyConfig, func(cfg *config.Config) error { // Get all rule files matching the configuration oaths. @@ -332,23 +348,37 @@ func main() { ) } { - ctxDiscovery, cancelDiscovery := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) g.Add( func() error { - err := discoveryManager.Run(ctxDiscovery) - level.Info(logger).Log("msg", "Discovery manager stopped") + err := discoveryManagerScrape.Run(ctx) + level.Info(logger).Log("msg", "Scrape discovery manager stopped") return err }, func(err error) { - level.Info(logger).Log("msg", "Stopping discovery manager...") - cancelDiscovery() + level.Info(logger).Log("msg", "Stopping scrape discovery manager...") + cancel() + }, + ) + } + { + ctx, cancel := context.WithCancel(context.Background()) + g.Add( + func() error { + err := discoveryManagerNotify.Run(ctx) + level.Info(logger).Log("msg", "Notify discovery manager stopped") + return err + }, + func(err error) { + level.Info(logger).Log("msg", "Stopping notify discovery manager...") + cancel() }, ) } { g.Add( func() error { - err := scrapeManager.Run(discoveryManager.SyncCh()) + err := scrapeManager.Run(discoveryManagerScrape.SyncCh()) level.Info(logger).Log("msg", "Scrape manager stopped") return err }, @@ -493,7 +523,7 @@ func main() { // so keep this interrupt after the ruleManager.Stop(). g.Add( func() error { - notifier.Run() + notifier.Run(discoveryManagerNotify.SyncCh()) return nil }, func(err error) { diff --git a/discovery/manager.go b/discovery/manager.go index 0ce6b9fc5..5372a7870 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -20,7 +20,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/prometheus/prometheus/config" sd_config "github.com/prometheus/prometheus/discovery/config" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -101,13 +100,13 @@ func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group { } // ApplyConfig removes all running discovery providers and starts new ones using the provided config. -func (m *Manager) ApplyConfig(cfg *config.Config) error { +func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error { err := make(chan error) m.actionCh <- func(ctx context.Context) { m.cancelDiscoverers() - for _, scfg := range cfg.ScrapeConfigs { - for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { - m.startProvider(ctx, poolKey{setName: scfg.JobName, provider: provName}, prov) + for name, scfg := range cfg { + for provName, prov := range m.providersFromConfig(scfg) { + m.startProvider(ctx, poolKey{setName: name, provider: provName}, prov) } } close(err) diff --git a/notifier/notifier.go b/notifier/notifier.go index dfd858f43..bd4229a22 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -113,9 +113,8 @@ type Notifier struct { ctx context.Context cancel func() - alertmanagers []*alertmanagerSet - cancelDiscovery func() - logger log.Logger + alertmanagers map[string]*alertmanagerSet + logger log.Logger } // Options are the configurable parameters of a Handler. @@ -247,7 +246,7 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { n.opts.ExternalLabels = conf.GlobalConfig.ExternalLabels n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs - amSets := []*alertmanagerSet{} + amSets := make(map[string]*alertmanagerSet) for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs { ams, err := newAlertmanagerSet(cfg, n.logger) @@ -257,7 +256,8 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { ams.metrics = n.metrics - amSets = append(amSets, ams) + // The config pointer is used for the map lookup identifier. + amSets[fmt.Sprintf("%p", cfg)] = ams } n.alertmanagers = amSets @@ -292,11 +292,14 @@ func (n *Notifier) nextBatch() []*Alert { } // Run dispatches notifications continuously. -func (n *Notifier) Run() { +func (n *Notifier) Run(tsets <-chan map[string][]*targetgroup.Group) { + for { select { case <-n.ctx.Done(): return + case ts := <-tsets: + n.reload(ts) case <-n.more: } alerts := n.nextBatch() @@ -311,6 +314,20 @@ func (n *Notifier) Run() { } } +func (n *Notifier) reload(tgs map[string][]*targetgroup.Group) { + n.mtx.Lock() + defer n.mtx.Unlock() + + for id, tgroup := range tgs { + am, ok := n.alertmanagers[id] + if !ok { + level.Error(n.logger).Log("msg", "couldn't sync alert manager set", "err", fmt.Sprintf("invalid id:%v", id)) + continue + } + am.sync(tgroup) + } +} + // Send queues the given notification requests for processing. // Panics if called on a handler that is not running. func (n *Notifier) Send(alerts ...*Alert) { @@ -515,9 +532,9 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger) (*ale return s, nil } -// Sync extracts a deduplicated set of Alertmanager endpoints from a list +// sync extracts a deduplicated set of Alertmanager endpoints from a list // of target groups definitions. -func (s *alertmanagerSet) Sync(tgs []*targetgroup.Group) { +func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) { all := []alertmanager{} for _, tg := range tgs { diff --git a/retrieval/manager.go b/retrieval/manager.go index 5cf05b79b..4f2f77c82 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -62,9 +62,7 @@ func (m *ScrapeManager) Run(tsets <-chan map[string][]*targetgroup.Group) error case f := <-m.actionCh: f() case ts := <-tsets: - if err := m.reload(ts); err != nil { - level.Error(m.logger).Log("msg", "error reloading the scrape manager", "err", err) - } + m.reload(ts) case <-m.graceShut: return nil } @@ -129,11 +127,12 @@ func (m *ScrapeManager) Targets() []*Target { return <-targets } -func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) error { +func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) { for tsetName, tgroup := range t { scrapeConfig, ok := m.scrapeConfigs[tsetName] if !ok { - return fmt.Errorf("target set '%v' doesn't have valid config", tsetName) + level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName)) + continue } // Scrape pool doesn't exist so start a new one. @@ -155,6 +154,4 @@ func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) error { delete(m.scrapePools, name) } } - - return nil }