mirror of
synced 2025-03-05 20:59:13 -08:00
We need to be able to modify the HTTP POST in Weave Cortex to add multitenancy information to a notification. Since we only really need a special header in the end, the other option would be to just allow passing in headers to the notifier. But swapping out the whole Doer is more general and allows others to swap out the network-talky bits of the notifier for their own use. Doing this via contexts here wouldn't work well, due to the decoupled flow of data in the notifier. There was no existing interface containing the ctxhttp.Post() or ctxhttp.Do() methods, so I settled on just using Do() as a swappable function directly (and with a more minimal signature than Post).
548 lines
14 KiB
548 lines
14 KiB
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package notifier
import (
const (
alertPushEndpoint = "/api/v1/alerts"
contentTypeJSON = "application/json"
// String constants for instrumentation.
const (
namespace = "prometheus"
subsystem = "notifications"
alertmanagerLabel = "alertmanager"
// Notifier is responsible for dispatching alert notifications to an
// alert manager service.
type Notifier struct {
queue model.Alerts
opts *Options
more chan struct{}
mtx sync.RWMutex
ctx context.Context
cancel func()
latency *prometheus.SummaryVec
errors *prometheus.CounterVec
sent *prometheus.CounterVec
dropped prometheus.Counter
queueLength prometheus.Gauge
queueCapacity prometheus.Metric
alertmanagers []*alertmanagerSet
cancelDiscovery func()
// Options are the configurable parameters of a Handler.
type Options struct {
QueueCapacity int
ExternalLabels model.LabelSet
RelabelConfigs []*config.RelabelConfig
// Used for sending HTTP requests to the Alertmanager.
Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error)
// New constructs a new Notifier.
func New(o *Options) *Notifier {
ctx, cancel := context.WithCancel(context.Background())
if o.Do == nil {
o.Do = ctxhttp.Do
return &Notifier{
queue: make(model.Alerts, 0, o.QueueCapacity),
ctx: ctx,
cancel: cancel,
more: make(chan struct{}, 1),
opts: o,
latency: prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "latency_seconds",
Help: "Latency quantiles for sending alert notifications (not including dropped notifications).",
errors: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "errors_total",
Help: "Total number of errors sending alert notifications.",
sent: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sent_total",
Help: "Total number of alerts successfully sent.",
dropped: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dropped_total",
Help: "Total number of alerts dropped due to errors when sending to Alertmanager.",
queueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_length",
Help: "The number of alert notifications in the queue.",
queueCapacity: prometheus.MustNewConstMetric(
prometheus.BuildFQName(namespace, subsystem, "queue_capacity"),
"The capacity of the alert notifications queue.",
nil, nil,
// ApplyConfig updates the status state as the new config requires.
func (n *Notifier) ApplyConfig(conf *config.Config) error {
defer n.mtx.Unlock()
n.opts.ExternalLabels = conf.GlobalConfig.ExternalLabels
n.opts.RelabelConfigs = conf.AlertingConfig.AlertRelabelConfigs
amSets := []*alertmanagerSet{}
ctx, cancel := context.WithCancel(n.ctx)
for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs {
ams, err := newAlertmanagerSet(cfg)
if err != nil {
return err
amSets = append(amSets, ams)
// After all sets were created successfully, start them and cancel the
// old ones.
for _, ams := range amSets {
go ams.ts.Run(ctx)
if n.cancelDiscovery != nil {
n.cancelDiscovery = cancel
n.alertmanagers = amSets
return nil
const maxBatchSize = 64
func (n *Notifier) queueLen() int {
defer n.mtx.RUnlock()
return len(n.queue)
func (n *Notifier) nextBatch() []*model.Alert {
defer n.mtx.Unlock()
var alerts model.Alerts
if len(n.queue) > maxBatchSize {
alerts = append(make(model.Alerts, 0, maxBatchSize), n.queue[:maxBatchSize]...)
n.queue = n.queue[maxBatchSize:]
} else {
alerts = append(make(model.Alerts, 0, len(n.queue)), n.queue...)
n.queue = n.queue[:0]
return alerts
// Run dispatches notifications continuously.
func (n *Notifier) Run() {
for {
select {
case <-n.ctx.Done():
case <-n.more:
alerts := n.nextBatch()
if !n.sendAll(alerts...) {
// If the queue still has items left, kick off the next iteration.
if n.queueLen() > 0 {
// Send queues the given notification requests for processing.
// Panics if called on a handler that is not running.
func (n *Notifier) Send(alerts ...*model.Alert) {
defer n.mtx.Unlock()
// Attach external labels before relabelling and sending.
for _, a := range alerts {
for ln, lv := range n.opts.ExternalLabels {
if _, ok := a.Labels[ln]; !ok {
a.Labels[ln] = lv
alerts = n.relabelAlerts(alerts)
// Queue capacity should be significantly larger than a single alert
// batch could be.
if d := len(alerts) - n.opts.QueueCapacity; d > 0 {
alerts = alerts[d:]
log.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d)
// If the queue is full, remove the oldest alerts in favor
// of newer ones.
if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 {
n.queue = n.queue[d:]
log.Warnf("Alert notification queue full, dropping %d alerts", d)
n.queue = append(n.queue, alerts...)
// Notify sending goroutine that there are alerts to be processed.
func (n *Notifier) relabelAlerts(alerts []*model.Alert) []*model.Alert {
var relabeledAlerts []*model.Alert
for _, alert := range alerts {
labels := relabel.Process(alert.Labels, n.opts.RelabelConfigs...)
if labels != nil {
alert.Labels = labels
relabeledAlerts = append(relabeledAlerts, alert)
return relabeledAlerts
// setMore signals that the alert queue has items.
func (n *Notifier) setMore() {
// If we cannot send on the channel, it means the signal already exists
// and has not been consumed yet.
select {
case n.more <- struct{}{}:
// Alertmanagers returns a list Alertmanager URLs.
func (n *Notifier) Alertmanagers() []string {
amSets := n.alertmanagers
var res []string
for _, ams := range amSets {
for _, am := range ams.ams {
res = append(res, am.url())
return res
// sendAll sends the alerts to all configured Alertmanagers concurrently.
// It returns true if the alerts could be sent successfully to at least one Alertmanager.
func (n *Notifier) sendAll(alerts ...*model.Alert) bool {
begin := time.Now()
b, err := json.Marshal(alerts)
if err != nil {
log.Errorf("Encoding alerts failed: %s", err)
return false
amSets := n.alertmanagers
var (
wg sync.WaitGroup
numSuccess uint64
for _, ams := range amSets {
for _, am := range ams.ams {
ctx, cancel := context.WithTimeout(n.ctx, ams.cfg.Timeout)
defer cancel()
go func(am alertmanager) {
u := am.url()
if err := n.sendOne(ctx, ams.client, u, b); err != nil {
log.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err)
} else {
atomic.AddUint64(&numSuccess, 1)
return numSuccess > 0
func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {
req, err := http.NewRequest("POST", url, bytes.NewReader(b))
if err != nil {
return err
req.Header.Set("Content-Type", contentTypeJSON)
resp, err := n.opts.Do(ctx, c, req)
if err != nil {
return err
defer resp.Body.Close()
// Any HTTP status 2xx is OK.
if resp.StatusCode/100 != 2 {
return fmt.Errorf("bad response status %v", resp.Status)
return err
// Stop shuts down the notification handler.
func (n *Notifier) Stop() {
log.Info("Stopping notification handler...")
// Describe implements prometheus.Collector.
func (n *Notifier) Describe(ch chan<- *prometheus.Desc) {
ch <- n.dropped.Desc()
ch <- n.queueLength.Desc()
ch <- n.queueCapacity.Desc()
// Collect implements prometheus.Collector.
func (n *Notifier) Collect(ch chan<- prometheus.Metric) {
ch <- n.dropped
ch <- n.queueLength
ch <- n.queueCapacity
// alertmanager holds Alertmanager endpoint information.
type alertmanager interface {
url() string
type alertmanagerLabels model.LabelSet
const pathLabel = "__alerts_path__"
func (a alertmanagerLabels) url() string {
u := &url.URL{
Scheme: string(a[model.SchemeLabel]),
Host: string(a[model.AddressLabel]),
Path: string(a[pathLabel]),
return u.String()
// alertmanagerSet contains a set of Alertmanagers discovered via a group of service
// discovery definitions that have a common configuration on how alerts should be sent.
type alertmanagerSet struct {
ts *discovery.TargetSet
cfg *config.AlertmanagerConfig
client *http.Client
mtx sync.RWMutex
ams []alertmanager
func newAlertmanagerSet(cfg *config.AlertmanagerConfig) (*alertmanagerSet, error) {
client, err := retrieval.NewHTTPClient(cfg.HTTPClientConfig)
if err != nil {
return nil, err
s := &alertmanagerSet{
client: client,
cfg: cfg,
s.ts = discovery.NewTargetSet(s)
return s, nil
// Sync extracts a deduplicated set of Alertmanager endpoints from a list
// of target groups definitions.
func (s *alertmanagerSet) Sync(tgs []*config.TargetGroup) {
all := []alertmanager{}
for _, tg := range tgs {
ams, err := alertmanagerFromGroup(tg, s.cfg)
if err != nil {
log.With("err", err).Error("generating discovered Alertmanagers failed")
all = append(all, ams...)
defer s.mtx.Unlock()
// Set new Alertmanagers and deduplicate them along their unique URL.
s.ams = []alertmanager{}
seen := map[string]struct{}{}
for _, am := range all {
us := am.url()
if _, ok := seen[us]; ok {
seen[us] = struct{}{}
s.ams = append(s.ams, am)
func postPath(pre string) string {
return path.Join("/", pre, alertPushEndpoint)
// alertmanagersFromGroup extracts a list of alertmanagers from a target group and an associcated
// AlertmanagerConfig.
func alertmanagerFromGroup(tg *config.TargetGroup, cfg *config.AlertmanagerConfig) ([]alertmanager, error) {
var res []alertmanager
for _, lset := range tg.Targets {
// Set configured scheme as the initial scheme label for overwrite.
lset[model.SchemeLabel] = model.LabelValue(cfg.Scheme)
lset[pathLabel] = model.LabelValue(postPath(cfg.PathPrefix))
// Combine target labels with target group labels.
for ln, lv := range tg.Labels {
if _, ok := lset[ln]; !ok {
lset[ln] = lv
lset := relabel.Process(lset, cfg.RelabelConfigs...)
if lset == nil {
// addPort checks whether we should add a default port to the address.
// If the address is not valid, we don't append a port either.
addPort := func(s string) bool {
// If we can split, a port exists and we don't have to add one.
if _, _, err := net.SplitHostPort(s); err == nil {
return false
// If adding a port makes it valid, the previous error
// was not due to an invalid address and we can append a port.
_, _, err := net.SplitHostPort(s + ":1234")
return err == nil
// If it's an address with no trailing port, infer it based on the used scheme.
if addr := string(lset[model.AddressLabel]); addPort(addr) {
// Addresses reaching this point are already wrapped in [] if necessary.
switch lset[model.SchemeLabel] {
case "http", "":
addr = addr + ":80"
case "https":
addr = addr + ":443"
return nil, fmt.Errorf("invalid scheme: %q", cfg.Scheme)
lset[model.AddressLabel] = model.LabelValue(addr)
if err := config.CheckTargetAddress(lset[model.AddressLabel]); err != nil {
return nil, err
// Meta labels are deleted after relabelling. Other internal labels propagate to
// the target which decides whether they will be part of their label set.
for ln := range lset {
if strings.HasPrefix(string(ln), model.MetaLabelPrefix) {
delete(lset, ln)
res = append(res, alertmanagerLabels(lset))
return res, nil