Merge pull request #3638 from krasi-georgiev/notifier-reimplement-discovery

reimplement the Notifier discovery
This commit is contained in:
Goutham Veeramachaneni 2018-01-17 16:08:11 +05:30 committed by GitHub
commit a3de70ed19
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 166 additions and 42 deletions

View file

@ -16,6 +16,8 @@ package main
import (
"context"
"crypto/md5"
"encoding/json"
"fmt"
"net"
"net/http"
@ -44,6 +46,7 @@ import (
promlogflag "github.com/prometheus/common/promlog/flag"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/retrieval"
@ -234,11 +237,12 @@ func main() {
ctxWeb, cancelWeb = context.WithCancel(context.Background())
ctxRule = context.Background()
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
discoveryManager = discovery.NewManager(log.With(logger, "component", "discovery manager"))
scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ruleManager = rules.NewManager(&rules.ManagerOptions{
notifier = notifier.New(&cfg.notifier, log.With(logger, "component", "notifier"))
discoveryManagerScrape = discovery.NewManager(log.With(logger, "component", "discovery manager scrape"))
discoveryManagerNotify = discovery.NewManager(log.With(logger, "component", "discovery manager notify"))
scrapeManager = retrieval.NewScrapeManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine),
NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()),
@ -283,7 +287,25 @@ func main() {
remoteStorage.ApplyConfig,
webHandler.ApplyConfig,
notifier.ApplyConfig,
discoveryManager.ApplyConfig,
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
return discoveryManagerScrape.ApplyConfig(c)
},
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.AlertingConfig.AlertmanagerConfigs {
// AlertmanagerConfigs doesn't hold an unique identifier so we use the config hash as the identifier.
b, err := json.Marshal(v)
if err != nil {
return err
}
c[fmt.Sprintf("%x", md5.Sum(b))] = v.ServiceDiscoveryConfig
}
return discoveryManagerNotify.ApplyConfig(c)
},
scrapeManager.ApplyConfig,
func(cfg *config.Config) error {
// Get all rule files matching the configuration oaths.
@ -332,23 +354,37 @@ func main() {
)
}
{
ctxDiscovery, cancelDiscovery := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
g.Add(
func() error {
err := discoveryManager.Run(ctxDiscovery)
level.Info(logger).Log("msg", "Discovery manager stopped")
err := discoveryManagerScrape.Run(ctx)
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping discovery manager...")
cancelDiscovery()
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
cancel()
},
)
}
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(
func() error {
err := discoveryManagerNotify.Run(ctx)
level.Info(logger).Log("msg", "Notify discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping notify discovery manager...")
cancel()
},
)
}
{
g.Add(
func() error {
err := scrapeManager.Run(discoveryManager.SyncCh())
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
@ -493,7 +529,7 @@ func main() {
// so keep this interrupt after the ruleManager.Stop().
g.Add(
func() error {
notifier.Run()
notifier.Run(discoveryManagerNotify.SyncCh())
return nil
},
func(err error) {

View file

@ -20,7 +20,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/config"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
@ -101,13 +100,13 @@ func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
}
// ApplyConfig removes all running discovery providers and starts new ones using the provided config.
func (m *Manager) ApplyConfig(cfg *config.Config) error {
func (m *Manager) ApplyConfig(cfg map[string]sd_config.ServiceDiscoveryConfig) error {
err := make(chan error)
m.actionCh <- func(ctx context.Context) {
m.cancelDiscoverers()
for _, scfg := range cfg.ScrapeConfigs {
for provName, prov := range m.providersFromConfig(scfg.ServiceDiscoveryConfig) {
m.startProvider(ctx, poolKey{setName: scfg.JobName, provider: provName}, prov)
for name, scfg := range cfg {
for provName, prov := range m.providersFromConfig(scfg) {
m.startProvider(ctx, poolKey{setName: name, provider: provName}, prov)
}
}
close(err)

View file

@ -24,6 +24,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"gopkg.in/yaml.v2"
)
@ -743,7 +744,11 @@ scrape_configs:
discoveryManager := NewManager(nil)
go discoveryManager.Run(ctx)
discoveryManager.ApplyConfig(cfg)
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
discoveryManager.ApplyConfig(c)
_ = <-discoveryManager.SyncCh()
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)
@ -758,7 +763,11 @@ scrape_configs:
if err := yaml.Unmarshal([]byte(sTwo), cfg); err != nil {
t.Fatalf("Unable to load YAML config sOne: %s", err)
}
discoveryManager.ApplyConfig(cfg)
c = make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
discoveryManager.ApplyConfig(c)
_ = <-discoveryManager.SyncCh()
verifyPresence(discoveryManager.targets, poolKey{setName: "prometheus", provider: "static/0"}, "{__address__=\"foo:9090\"}", true)

View file

@ -16,6 +16,7 @@ package notifier
import (
"bytes"
"context"
"crypto/md5"
"encoding/json"
"fmt"
"net"
@ -113,9 +114,8 @@ type Notifier struct {
ctx context.Context
cancel func()
alertmanagers []*alertmanagerSet
cancelDiscovery func()
logger log.Logger
alertmanagers map[string]*alertmanagerSet
logger log.Logger
}
// Options are the configurable parameters of a Handler.
@ -247,7 +247,7 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
n.opts.ExternalLabels = conf.GlobalConfig.ExternalLabels
n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs
amSets := []*alertmanagerSet{}
amSets := make(map[string]*alertmanagerSet)
for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs {
ams, err := newAlertmanagerSet(cfg, n.logger)
@ -257,7 +257,12 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error {
ams.metrics = n.metrics
amSets = append(amSets, ams)
// The config hash is used for the map lookup identifier.
b, err := json.Marshal(cfg)
if err != nil {
return err
}
amSets[fmt.Sprintf("%x", md5.Sum(b))] = ams
}
n.alertmanagers = amSets
@ -292,11 +297,14 @@ func (n *Notifier) nextBatch() []*Alert {
}
// Run dispatches notifications continuously.
func (n *Notifier) Run() {
func (n *Notifier) Run(tsets <-chan map[string][]*targetgroup.Group) {
for {
select {
case <-n.ctx.Done():
return
case ts := <-tsets:
n.reload(ts)
case <-n.more:
}
alerts := n.nextBatch()
@ -311,6 +319,20 @@ func (n *Notifier) Run() {
}
}
func (n *Notifier) reload(tgs map[string][]*targetgroup.Group) {
n.mtx.Lock()
defer n.mtx.Unlock()
for id, tgroup := range tgs {
am, ok := n.alertmanagers[id]
if !ok {
level.Error(n.logger).Log("msg", "couldn't sync alert manager set", "err", fmt.Sprintf("invalid id:%v", id))
continue
}
am.sync(tgroup)
}
}
// Send queues the given notification requests for processing.
// Panics if called on a handler that is not running.
func (n *Notifier) Send(alerts ...*Alert) {
@ -515,9 +537,9 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger) (*ale
return s, nil
}
// Sync extracts a deduplicated set of Alertmanager endpoints from a list
// sync extracts a deduplicated set of Alertmanager endpoints from a list
// of target groups definitions.
func (s *alertmanagerSet) Sync(tgs []*targetgroup.Group) {
func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) {
all := []alertmanager{}
for _, tg := range tgs {

View file

@ -15,6 +15,7 @@ package notifier
import (
"context"
"crypto/md5"
"encoding/json"
"fmt"
"io/ioutil"
@ -26,6 +27,7 @@ import (
"time"
old_ctx "golang.org/x/net/context"
yaml "gopkg.in/yaml.v2"
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
@ -33,6 +35,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/util/httputil"
"github.com/prometheus/prometheus/util/testutil"
)
func TestPostPath(t *testing.T) {
@ -173,7 +176,10 @@ func TestHandlerSendAll(t *testing.T) {
Password: "testing_password",
},
}, "auth_alertmanager")
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
h.alertmanagers = make(map[string]*alertmanagerSet)
h.alertmanagers["1"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server1.URL },
@ -183,9 +189,9 @@ func TestHandlerSendAll(t *testing.T) {
Timeout: time.Second,
},
client: authClient,
})
}
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
h.alertmanagers["2"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server2.URL },
@ -194,7 +200,7 @@ func TestHandlerSendAll(t *testing.T) {
cfg: &config.AlertmanagerConfig{
Timeout: time.Second,
},
})
}
for i := range make([]struct{}, maxBatchSize) {
h.queue = append(h.queue, &Alert{
@ -355,7 +361,10 @@ func TestHandlerQueueing(t *testing.T) {
},
nil,
)
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
h.alertmanagers = make(map[string]*alertmanagerSet)
h.alertmanagers["1"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server.URL },
@ -364,7 +373,7 @@ func TestHandlerQueueing(t *testing.T) {
cfg: &config.AlertmanagerConfig{
Timeout: time.Second,
},
})
}
var alerts []*Alert
@ -374,7 +383,8 @@ func TestHandlerQueueing(t *testing.T) {
})
}
go h.Run()
c := make(chan map[string][]*targetgroup.Group)
go h.Run(c)
defer h.Stop()
h.Send(alerts[:4*maxBatchSize]...)
@ -442,6 +452,57 @@ func TestLabelSetNotReused(t *testing.T) {
}
}
func TestReload(t *testing.T) {
var tests = []struct {
in *targetgroup.Group
out string
}{
{
in: &targetgroup.Group{
Targets: []model.LabelSet{
{
"__address__": "alertmanager:9093",
},
},
},
out: "http://alertmanager:9093/api/v1/alerts",
},
}
n := New(&Options{}, nil)
cfg := &config.Config{}
s := `
alerting:
alertmanagers:
- static_configs:
`
if err := yaml.Unmarshal([]byte(s), cfg); err != nil {
t.Fatalf("Unable to load YAML config: %s", err)
}
if err := n.ApplyConfig(cfg); err != nil {
t.Fatalf("Error Applying the config:%v", err)
}
tgs := make(map[string][]*targetgroup.Group)
for _, tt := range tests {
b, err := json.Marshal(cfg.AlertingConfig.AlertmanagerConfigs[0])
if err != nil {
t.Fatalf("Error creating config hash:%v", err)
}
tgs[fmt.Sprintf("%x", md5.Sum(b))] = []*targetgroup.Group{
tt.in,
}
n.reload(tgs)
res := n.Alertmanagers()[0].String()
testutil.Equals(t, res, tt.out)
}
}
func makeInputTargetGroup() *targetgroup.Group {
return &targetgroup.Group{
Targets: []model.LabelSet{

View file

@ -62,9 +62,7 @@ func (m *ScrapeManager) Run(tsets <-chan map[string][]*targetgroup.Group) error
case f := <-m.actionCh:
f()
case ts := <-tsets:
if err := m.reload(ts); err != nil {
level.Error(m.logger).Log("msg", "error reloading the scrape manager", "err", err)
}
m.reload(ts)
case <-m.graceShut:
return nil
}
@ -129,11 +127,12 @@ func (m *ScrapeManager) Targets() []*Target {
return <-targets
}
func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) error {
func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) {
for tsetName, tgroup := range t {
scrapeConfig, ok := m.scrapeConfigs[tsetName]
if !ok {
return fmt.Errorf("target set '%v' doesn't have valid config", tsetName)
level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName))
continue
}
// Scrape pool doesn't exist so start a new one.
@ -155,6 +154,4 @@ func (m *ScrapeManager) reload(t map[string][]*targetgroup.Group) error {
delete(m.scrapePools, name)
}
}
return nil
}