discovery manager ApplyConfig now takes a direct ServiceDiscoveryConfig so that it can be used for the notify manager

reimplement the service discovery for the notify manager

Signed-off-by: Krasi Georgiev <krasi.root@gmail.com>
This commit is contained in:
Krasi Georgiev 2017-12-30 17:27:50 +00:00
parent b20a1b1b1b
commit d12e6f29fc
4 changed files with 76 additions and 33 deletions

View file

@ -44,6 +44,7 @@ import (
promlogflag "github.com/prometheus/common/promlog/flag"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/retrieval"
@ -234,11 +235,12 @@ func main() {
ctxWeb, cancelWeb = context.WithCancel(context.Background())
ctxRule = context.Background()
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
discoveryManager = discovery.NewManager(log.With(logger, "component", "discovery manager"))
scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ruleManager = rules.NewManager(&rules.ManagerOptions{
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
discoveryManagerScrape = discovery.NewManager(log.With(logger, "component", "discovery manager scrape"))
discoveryManagerNotify = discovery.NewManager(log.With(logger, "component", "discovery manager notify"))
scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine),
NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()),
@ -283,7 +285,21 @@ func main() {
remoteStorage.ApplyConfig,
webHandler.ApplyConfig,
notifier.ApplyConfig,
discoveryManager.ApplyConfig,
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
return discoveryManagerScrape.ApplyConfig(c)
},
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.AlertingConfig.AlertmanagerConfigs {
// AlertmanagerConfigs doesn't hold an unique identifier so we use the config pointer as the identifier.
c[fmt.Sprintf("%p", v)] = v.ServiceDiscoveryConfig
}
return discoveryManagerNotify.ApplyConfig(c)
},
scrapeManager.ApplyConfig,
func(cfg *config.Config) error {
// Get all rule files matching the configuration oaths.
@ -332,23 +348,37 @@ func main() {
)
}
{
ctxDiscovery, cancelDiscovery := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
g.Add(
func() error {
err := discoveryManager.Run(ctxDiscovery)
level.Info(logger).Log("msg", "Discovery manager stopped")
err := discoveryManagerScrape.Run(ctx)
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping discovery manager...")
cancelDiscovery()
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
cancel()
},
)
}
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(
func() error {
err := discoveryManagerNotify.Run(ctx)
level.Info(logger).Log("msg", "Notify discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping notify discovery manager...")
cancel()
},
)
}
{
g.Add(
func() error {
err := scrapeManager.Run(discoveryManager.SyncCh())
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
@ -493,7 +523,7 @@ func main() {
// so keep this interrupt after the ruleManager.Stop().
g.Add(
func() error {
notifier.Run()
notifier.Run(discoveryManagerNotify.SyncCh())
return nil
},
func(err error) {

View file

@ -20,7 +20,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
@ -101,13 +100,13 @@ func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
}
// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg *config.Config) error {
func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error {
err := make(chan error)
m.actionCh <- func(ctx context.Context) {
m.cancelDiscoverers()
for _, scfg := range cfg.ScrapeConfigs {
for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) {
m.startProvider(ctx, poolKey{setName: scfg.JobName, provider: provName}, prov)
for name, scfg := range cfg {
for provName, prov := range m.providersFromConfig(scfg) {
m.startProvider(ctx, poolKey{setName: name, provider: provName}, prov)
}
}
close(err)

View file

@ -113,9 +113,8 @@ type Notifier struct {
ctx context.Context
cancel func()
alertmanagers []*alertmanagerSet
cancelDiscovery func()
logger log.Logger
alertmanagers map[string]*alertmanagerSet
logger log.Logger
}
// Options are the configurable parameters of a Handler.
@ -247,7 +246,7 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
n.opts.ExternalLabels = conf.GlobalConfig.ExternalLabels
n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs
amSets := []*alertmanagerSet{}
amSets := make(map[string]*alertmanagerSet)
for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs {
ams, err := newAlertmanagerSet(cfg, n.logger)
@ -257,7 +256,8 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
ams.metrics = n.metrics
amSets = append(amSets, ams)
// The config pointer is used for the map lookup identifier.
amSets[fmt.Sprintf("%p", cfg)] = ams
}
n.alertmanagers = amSets
@ -292,11 +292,14 @@ func (n *Notifier) nextBatch() []*Alert {
}
// Run dispatches notifications continuously.
func (n *Notifier) Run() {
func (n *Notifier) Run(tsets <-chan map[string][]*targetgroup.Group) {
for {
select {
case <-n.ctx.Done():
return
case ts := <-tsets:
n.reload(ts)
case <-n.more:
}
alerts := n.nextBatch()
@ -311,6 +314,20 @@ func (n *Notifier) Run() {
}
}
func (n *Notifier) reload(tgs map[string][]*targetgroup.Group) {
n.mtx.Lock()
defer n.mtx.Unlock()
for id, tgroup := range tgs {
am, ok := n.alertmanagers[id]
if !ok {
level.Error(n.logger).Log("msg", "couldn't sync alert manager set", "err", fmt.Sprintf("invalid id:%v", id))
continue
}
am.sync(tgroup)
}
}
// Send queues the given notification requests for processing.
// Panics if called on a handler that is not running.
func (n *Notifier) Send(alerts ...*Alert) {
@ -515,9 +532,9 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger) (*ale
return s, nil
}
// Sync extracts a deduplicated set of Alertmanager endpoints from a list
// sync extracts a deduplicated set of Alertmanager endpoints from a list
// of target groups definitions.
func (s *alertmanagerSet) Sync(tgs []*targetgroup.Group) {
func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) {
all := []alertmanager{}
for _, tg := range tgs {

View file

@ -62,9 +62,7 @@ func (m *ScrapeManager) Run(tsets <-chan map[string][]*targetgroup.Group) error
case f := <-m.actionCh:
f()
case ts := <-tsets:
if err := m.reload(ts); err != nil {
level.Error(m.logger).Log("msg", "error reloading the scrape manager", "err", err)
}
m.reload(ts)
case <-m.graceShut:
return nil
}
@ -129,11 +127,12 @@ func (m *ScrapeManager) Targets() []*Target {
return <-targets
}
func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) error {
func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) {
for tsetName, tgroup := range t {
scrapeConfig, ok := m.scrapeConfigs[tsetName]
if !ok {
return fmt.Errorf("target set '%v' doesn't have valid config", tsetName)
level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName))
continue
}
// Scrape pool doesn't exist so start a new one.
@ -155,6 +154,4 @@ func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) error {
delete(m.scrapePools, name)
}
}
return nil
}