mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
notifier: extract alertmanager into interface
This commit is contained in:
parent
cc35104504
commit
2ad56aabd4
|
@ -331,7 +331,7 @@ func (n *Notifier) sendAll(alerts ...*model.Alert) bool {
|
||||||
go func(am alertmanager) {
|
go func(am alertmanager) {
|
||||||
u := am.url()
|
u := am.url()
|
||||||
|
|
||||||
if err := am.send(ctx, ams.client, b); err != nil {
|
if err := n.sendOne(ctx, ams.client, u, b); err != nil {
|
||||||
log.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err)
|
log.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err)
|
||||||
n.errors.WithLabelValues(u).Inc()
|
n.errors.WithLabelValues(u).Inc()
|
||||||
} else {
|
} else {
|
||||||
|
@ -350,6 +350,20 @@ func (n *Notifier) sendAll(alerts ...*model.Alert) bool {
|
||||||
return numSuccess > 0
|
return numSuccess > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {
|
||||||
|
resp, err := ctxhttp.Post(ctx, c, url, contentTypeJSON, bytes.NewReader(b))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
// Any HTTP status 2xx is OK.
|
||||||
|
if resp.StatusCode/100 != 2 {
|
||||||
|
return fmt.Errorf("bad response status %v", resp.Status)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Stop shuts down the notification handler.
|
// Stop shuts down the notification handler.
|
||||||
func (n *Notifier) Stop() {
|
func (n *Notifier) Stop() {
|
||||||
log.Info("Stopping notification handler...")
|
log.Info("Stopping notification handler...")
|
||||||
|
@ -381,39 +395,23 @@ func (n *Notifier) Collect(ch chan<- prometheus.Metric) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// alertmanager holds Alertmanager endpoint information.
|
// alertmanager holds Alertmanager endpoint information.
|
||||||
type alertmanager struct {
|
type alertmanager interface {
|
||||||
plainURL string // test injection hook
|
url() string
|
||||||
labels model.LabelSet
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type alertmanagerLabels model.LabelSet
|
||||||
|
|
||||||
const pathLabel = "__alerts_path__"
|
const pathLabel = "__alerts_path__"
|
||||||
|
|
||||||
func (a alertmanager) url() string {
|
func (a alertmanagerLabels) url() string {
|
||||||
if a.plainURL != "" {
|
|
||||||
return a.plainURL
|
|
||||||
}
|
|
||||||
u := &url.URL{
|
u := &url.URL{
|
||||||
Scheme: string(a.labels[model.SchemeLabel]),
|
Scheme: string(a[model.SchemeLabel]),
|
||||||
Host: string(a.labels[model.AddressLabel]),
|
Host: string(a[model.AddressLabel]),
|
||||||
Path: string(a.labels[pathLabel]),
|
Path: string(a[pathLabel]),
|
||||||
}
|
}
|
||||||
return u.String()
|
return u.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a alertmanager) send(ctx context.Context, c *http.Client, b []byte) error {
|
|
||||||
resp, err := ctxhttp.Post(ctx, c, a.url(), contentTypeJSON, bytes.NewReader(b))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
// Any HTTP status 2xx is OK.
|
|
||||||
if resp.StatusCode/100 != 2 {
|
|
||||||
return fmt.Errorf("bad response status %v", resp.Status)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// alertmanagerSet contains a set of Alertmanagers discovered via a group of service
|
// alertmanagerSet contains a set of Alertmanagers discovered via a group of service
|
||||||
// discovery definitions that have a common configuration on how alerts should be sent.
|
// discovery definitions that have a common configuration on how alerts should be sent.
|
||||||
type alertmanagerSet struct {
|
type alertmanagerSet struct {
|
||||||
|
@ -532,7 +530,7 @@ func alertmanagerFromGroup(tg *config.TargetGroup, cfg *config.AlertmanagerConfi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
res = append(res, alertmanager{labels: lset})
|
res = append(res, alertmanagerLabels(lset))
|
||||||
}
|
}
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/config"
|
"github.com/prometheus/prometheus/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -150,8 +149,12 @@ func TestHandlerSendAll(t *testing.T) {
|
||||||
h := New(&Options{})
|
h := New(&Options{})
|
||||||
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
|
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
|
||||||
ams: []alertmanager{
|
ams: []alertmanager{
|
||||||
{plainURL: server1.URL},
|
alertmanagerMock{
|
||||||
{plainURL: server2.URL},
|
urlf: func() string { return server1.URL },
|
||||||
|
},
|
||||||
|
alertmanagerMock{
|
||||||
|
urlf: func() string { return server2.URL },
|
||||||
|
},
|
||||||
},
|
},
|
||||||
cfg: &config.AlertmanagerConfig{
|
cfg: &config.AlertmanagerConfig{
|
||||||
Timeout: time.Second,
|
Timeout: time.Second,
|
||||||
|
@ -312,7 +315,9 @@ func TestHandlerQueueing(t *testing.T) {
|
||||||
})
|
})
|
||||||
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
|
h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{
|
||||||
ams: []alertmanager{
|
ams: []alertmanager{
|
||||||
{plainURL: server.URL},
|
alertmanagerMock{
|
||||||
|
urlf: func() string { return server.URL },
|
||||||
|
},
|
||||||
},
|
},
|
||||||
cfg: &config.AlertmanagerConfig{
|
cfg: &config.AlertmanagerConfig{
|
||||||
Timeout: time.Second,
|
Timeout: time.Second,
|
||||||
|
@ -371,3 +376,11 @@ func TestHandlerQueueing(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type alertmanagerMock struct {
|
||||||
|
urlf func() string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a alertmanagerMock) url() string {
|
||||||
|
return a.urlf()
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue