diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index e53f6e0af..9aa9d6d03 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -16,6 +16,8 @@ package main import ( "context" + "crypto/md5" + "encoding/json" "fmt" "net" "net/http" @@ -44,6 +46,7 @@ import ( promlogflag "github.com/prometheus/common/promlog/flag" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" + sd_config "github.com/prometheus/prometheus/discovery/config" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/retrieval" @@ -234,11 +237,12 @@ func main() { ctxWeb, cancelWeb = context.WithCancel(context.Background()) ctxRule = context.Background() - notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) - discoveryManager = discovery.NewManager(log.With(logger, "component", "discovery manager")) - scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) - queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) - ruleManager = rules.NewManager(&rules.ManagerOptions{ + notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier")) + discoveryManagerScrape = discovery.NewManager(log.With(logger, "component", "discovery manager scrape")) + discoveryManagerNotify = discovery.NewManager(log.With(logger, "component", "discovery manager notify")) + scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage) + queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) + ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable: fanoutStorage, QueryFunc: rules.EngineQueryFunc(queryEngine), NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()), @@ -283,7 +287,25 @@ func main() { remoteStorage.ApplyConfig, webHandler.ApplyConfig, notifier.ApplyConfig, - discoveryManager.ApplyConfig, + func(cfg *config.Config) error { + c := make(map[string]sd_config.ServiceDiscoveryConfig) + for _, v := range cfg.ScrapeConfigs { + c[v.JobName] = v.ServiceDiscoveryConfig + } + return discoveryManagerScrape.ApplyConfig(c) + }, + func(cfg *config.Config) error { + c := make(map[string]sd_config.ServiceDiscoveryConfig) + for _, v := range cfg.AlertingConfig.AlertmanagerConfigs { + // AlertmanagerConfigs doesn't hold an unique identifier so we use the config hash as the identifier. + b, err := json.Marshal(v) + if err != nil { + return err + } + c[fmt.Sprintf("%x", md5.Sum(b))] = v.ServiceDiscoveryConfig + } + return discoveryManagerNotify.ApplyConfig(c) + }, scrapeManager.ApplyConfig, func(cfg *config.Config) error { // Get all rule files matching the configuration oaths. @@ -332,23 +354,37 @@ func main() { ) } { - ctxDiscovery, cancelDiscovery := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) g.Add( func() error { - err := discoveryManager.Run(ctxDiscovery) - level.Info(logger).Log("msg", "Discovery manager stopped") + err := discoveryManagerScrape.Run(ctx) + level.Info(logger).Log("msg", "Scrape discovery manager stopped") return err }, func(err error) { - level.Info(logger).Log("msg", "Stopping discovery manager...") - cancelDiscovery() + level.Info(logger).Log("msg", "Stopping scrape discovery manager...") + cancel() + }, + ) + } + { + ctx, cancel := context.WithCancel(context.Background()) + g.Add( + func() error { + err := discoveryManagerNotify.Run(ctx) + level.Info(logger).Log("msg", "Notify discovery manager stopped") + return err + }, + func(err error) { + level.Info(logger).Log("msg", "Stopping notify discovery manager...") + cancel() }, ) } { g.Add( func() error { - err := scrapeManager.Run(discoveryManager.SyncCh()) + err := scrapeManager.Run(discoveryManagerScrape.SyncCh()) level.Info(logger).Log("msg", "Scrape manager stopped") return err }, @@ -493,7 +529,7 @@ func main() { // so keep this interrupt after the ruleManager.Stop(). g.Add( func() error { - notifier.Run() + notifier.Run(discoveryManagerNotify.SyncCh()) return nil }, func(err error) { diff --git a/discovery/manager.go b/discovery/manager.go index 0ce6b9fc5..5372a7870 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -20,7 +20,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/prometheus/prometheus/config" sd_config "github.com/prometheus/prometheus/discovery/config" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -101,13 +100,13 @@ func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group { } // ApplyConfig removes all running discovery providers and starts new ones using the provided config. -func (m *Manager) ApplyConfig(cfg *config.Config) error { +func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error { err := make(chan error) m.actionCh <- func(ctx context.Context) { m.cancelDiscoverers() - for _, scfg := range cfg.ScrapeConfigs { - for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) { - m.startProvider(ctx, poolKey{setName: scfg.JobName, provider: provName}, prov) + for name, scfg := range cfg { + for provName, prov := range m.providersFromConfig(scfg) { + m.startProvider(ctx, poolKey{setName: name, provider: provName}, prov) } } close(err) diff --git a/discovery/manager_test.go b/discovery/manager_test.go index f3ecf784e..cf2b7e81e 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + sd_config "github.com/prometheus/prometheus/discovery/config" "github.com/prometheus/prometheus/discovery/targetgroup" "gopkg.in/yaml.v2" ) @@ -743,7 +744,11 @@ scrape_configs: discoveryManager := NewManager(nil) go discoveryManager.Run(ctx) - discoveryManager.ApplyConfig(cfg) + c := make(map[string]sd_config.ServiceDiscoveryConfig) + for _, v := range cfg.ScrapeConfigs { + c[v.JobName] = v.ServiceDiscoveryConfig + } + discoveryManager.ApplyConfig(c) _ = <-discoveryManager.SyncCh() verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) @@ -758,7 +763,11 @@ scrape_configs: if err := yaml.Unmarshal([]byte(sTwo), cfg); err != nil { t.Fatalf("Unable to load YAML config sOne: %s", err) } - discoveryManager.ApplyConfig(cfg) + c = make(map[string]sd_config.ServiceDiscoveryConfig) + for _, v := range cfg.ScrapeConfigs { + c[v.JobName] = v.ServiceDiscoveryConfig + } + discoveryManager.ApplyConfig(c) _ = <-discoveryManager.SyncCh() verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true) diff --git a/notifier/notifier.go b/notifier/notifier.go index dfd858f43..411104383 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -16,6 +16,7 @@ package notifier import ( "bytes" "context" + "crypto/md5" "encoding/json" "fmt" "net" @@ -113,9 +114,8 @@ type Notifier struct { ctx context.Context cancel func() - alertmanagers []*alertmanagerSet - cancelDiscovery func() - logger log.Logger + alertmanagers map[string]*alertmanagerSet + logger log.Logger } // Options are the configurable parameters of a Handler. @@ -247,7 +247,7 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { n.opts.ExternalLabels = conf.GlobalConfig.ExternalLabels n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs - amSets := []*alertmanagerSet{} + amSets := make(map[string]*alertmanagerSet) for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs { ams, err := newAlertmanagerSet(cfg, n.logger) @@ -257,7 +257,12 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { ams.metrics = n.metrics - amSets = append(amSets, ams) + // The config hash is used for the map lookup identifier. + b, err := json.Marshal(cfg) + if err != nil { + return err + } + amSets[fmt.Sprintf("%x", md5.Sum(b))] = ams } n.alertmanagers = amSets @@ -292,11 +297,14 @@ func (n *Notifier) nextBatch() []*Alert { } // Run dispatches notifications continuously. -func (n *Notifier) Run() { +func (n *Notifier) Run(tsets <-chan map[string][]*targetgroup.Group) { + for { select { case <-n.ctx.Done(): return + case ts := <-tsets: + n.reload(ts) case <-n.more: } alerts := n.nextBatch() @@ -311,6 +319,20 @@ func (n *Notifier) Run() { } } +func (n *Notifier) reload(tgs map[string][]*targetgroup.Group) { + n.mtx.Lock() + defer n.mtx.Unlock() + + for id, tgroup := range tgs { + am, ok := n.alertmanagers[id] + if !ok { + level.Error(n.logger).Log("msg", "couldn't sync alert manager set", "err", fmt.Sprintf("invalid id:%v", id)) + continue + } + am.sync(tgroup) + } +} + // Send queues the given notification requests for processing. // Panics if called on a handler that is not running. func (n *Notifier) Send(alerts ...*Alert) { @@ -515,9 +537,9 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger) (*ale return s, nil } -// Sync extracts a deduplicated set of Alertmanager endpoints from a list +// sync extracts a deduplicated set of Alertmanager endpoints from a list // of target groups definitions. -func (s *alertmanagerSet) Sync(tgs []*targetgroup.Group) { +func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) { all := []alertmanager{} for _, tg := range tgs { diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 7ffc8d55e..d994cfccb 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -15,6 +15,7 @@ package notifier import ( "context" + "crypto/md5" "encoding/json" "fmt" "io/ioutil" @@ -26,6 +27,7 @@ import ( "time" old_ctx "golang.org/x/net/context" + yaml "gopkg.in/yaml.v2" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -33,6 +35,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/util/httputil" + "github.com/prometheus/prometheus/util/testutil" ) func TestPostPath(t *testing.T) { @@ -173,7 +176,10 @@ func TestHandlerSendAll(t *testing.T) { Password: "testing_password", }, }, "auth_alertmanager") - h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{ + + h.alertmanagers = make(map[string]*alertmanagerSet) + + h.alertmanagers["1"] = &alertmanagerSet{ ams: []alertmanager{ alertmanagerMock{ urlf: func() string { return server1.URL }, @@ -183,9 +189,9 @@ func TestHandlerSendAll(t *testing.T) { Timeout: time.Second, }, client: authClient, - }) + } - h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{ + h.alertmanagers["2"] = &alertmanagerSet{ ams: []alertmanager{ alertmanagerMock{ urlf: func() string { return server2.URL }, @@ -194,7 +200,7 @@ func TestHandlerSendAll(t *testing.T) { cfg: &config.AlertmanagerConfig{ Timeout: time.Second, }, - }) + } for i := range make([]struct{}, maxBatchSize) { h.queue = append(h.queue, &Alert{ @@ -355,7 +361,10 @@ func TestHandlerQueueing(t *testing.T) { }, nil, ) - h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{ + + h.alertmanagers = make(map[string]*alertmanagerSet) + + h.alertmanagers["1"] = &alertmanagerSet{ ams: []alertmanager{ alertmanagerMock{ urlf: func() string { return server.URL }, @@ -364,7 +373,7 @@ func TestHandlerQueueing(t *testing.T) { cfg: &config.AlertmanagerConfig{ Timeout: time.Second, }, - }) + } var alerts []*Alert @@ -374,7 +383,8 @@ func TestHandlerQueueing(t *testing.T) { }) } - go h.Run() + c := make(chan map[string][]*targetgroup.Group) + go h.Run(c) defer h.Stop() h.Send(alerts[:4*maxBatchSize]...) @@ -442,6 +452,57 @@ func TestLabelSetNotReused(t *testing.T) { } } +func TestReload(t *testing.T) { + var tests = []struct { + in *targetgroup.Group + out string + }{ + { + in: &targetgroup.Group{ + Targets: []model.LabelSet{ + { + "__address__": "alertmanager:9093", + }, + }, + }, + out: "http://alertmanager:9093/api/v1/alerts", + }, + } + + n := New(&Options{}, nil) + + cfg := &config.Config{} + s := ` +alerting: + alertmanagers: + - static_configs: +` + if err := yaml.Unmarshal([]byte(s), cfg); err != nil { + t.Fatalf("Unable to load YAML config: %s", err) + } + + if err := n.ApplyConfig(cfg); err != nil { + t.Fatalf("Error Applying the config:%v", err) + } + + tgs := make(map[string][]*targetgroup.Group) + for _, tt := range tests { + + b, err := json.Marshal(cfg.AlertingConfig.AlertmanagerConfigs[0]) + if err != nil { + t.Fatalf("Error creating config hash:%v", err) + } + tgs[fmt.Sprintf("%x", md5.Sum(b))] = []*targetgroup.Group{ + tt.in, + } + n.reload(tgs) + res := n.Alertmanagers()[0].String() + + testutil.Equals(t, res, tt.out) + } + +} + func makeInputTargetGroup() *targetgroup.Group { return &targetgroup.Group{ Targets: []model.LabelSet{ diff --git a/retrieval/manager.go b/retrieval/manager.go index 5cf05b79b..4f2f77c82 100644 --- a/retrieval/manager.go +++ b/retrieval/manager.go @@ -62,9 +62,7 @@ func (m *ScrapeManager) Run(tsets <-chan map[string][]*targetgroup.Group) error case f := <-m.actionCh: f() case ts := <-tsets: - if err := m.reload(ts); err != nil { - level.Error(m.logger).Log("msg", "error reloading the scrape manager", "err", err) - } + m.reload(ts) case <-m.graceShut: return nil } @@ -129,11 +127,12 @@ func (m *ScrapeManager) Targets() []*Target { return <-targets } -func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) error { +func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) { for tsetName, tgroup := range t { scrapeConfig, ok := m.scrapeConfigs[tsetName] if !ok { - return fmt.Errorf("target set '%v' doesn't have valid config", tsetName) + level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName)) + continue } // Scrape pool doesn't exist so start a new one. @@ -155,6 +154,4 @@ func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) error { delete(m.scrapePools, name) } } - - return nil }