diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go
index cd7f533d1..7544f276a 100644
--- a/cmd/prometheus/main.go
+++ b/cmd/prometheus/main.go
@@ -445,6 +445,9 @@ func main() {
serverOnlyFlag(a, "alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications.").
Default("10000").IntVar(&cfg.notifier.QueueCapacity)
+ serverOnlyFlag(a, "alertmanager.drain-notification-queue-on-shutdown", "Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down.").
+ Default("true").BoolVar(&cfg.notifier.DrainOnShutdown)
+
// TODO: Remove in Prometheus 3.0.
alertmanagerTimeout := a.Flag("alertmanager.timeout", "[DEPRECATED] This flag has no effect.").Hidden().String()
diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md
index aa9bf3bfb..1fc032d09 100644
--- a/docs/command-line/prometheus.md
+++ b/docs/command-line/prometheus.md
@@ -50,6 +50,7 @@ The Prometheus monitoring server
| --rules.alert.resend-delay
| Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` |
| --rules.max-concurrent-evals
| Global concurrency limit for independent rules that can run concurrently. When set, "query.max-concurrency" may need to be adjusted accordingly. Use with server mode only. | `4` |
| --alertmanager.notification-queue-capacity
| The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` |
+| --alertmanager.drain-notification-queue-on-shutdown
| Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down. Use with server mode only. | `true` |
| --query.lookback-delta
| The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` |
| --query.timeout
| Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
| --query.max-concurrency
| Maximum number of queries executed concurrently. Use with server mode only. | `20` |
diff --git a/notifier/notifier.go b/notifier/notifier.go
index cd00a4507..68b0d4961 100644
--- a/notifier/notifier.go
+++ b/notifier/notifier.go
@@ -110,10 +110,11 @@ type Manager struct {
metrics *alertMetrics
- more chan struct{}
- mtx sync.RWMutex
- ctx context.Context
- cancel func()
+ more chan struct{}
+ mtx sync.RWMutex
+
+ stopOnce *sync.Once
+ stopRequested chan struct{}
alertmanagers map[string]*alertmanagerSet
logger log.Logger
@@ -121,9 +122,10 @@ type Manager struct {
// Options are the configurable parameters of a Handler.
type Options struct {
- QueueCapacity int
- ExternalLabels labels.Labels
- RelabelConfigs []*relabel.Config
+ QueueCapacity int
+ DrainOnShutdown bool
+ ExternalLabels labels.Labels
+ RelabelConfigs []*relabel.Config
// Used for sending HTTP requests to the Alertmanager.
Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error)
@@ -217,8 +219,6 @@ func do(ctx context.Context, client *http.Client, req *http.Request) (*http.Resp
// NewManager is the manager constructor.
func NewManager(o *Options, logger log.Logger) *Manager {
- ctx, cancel := context.WithCancel(context.Background())
-
if o.Do == nil {
o.Do = do
}
@@ -227,12 +227,12 @@ func NewManager(o *Options, logger log.Logger) *Manager {
}
n := &Manager{
- queue: make([]*Alert, 0, o.QueueCapacity),
- ctx: ctx,
- cancel: cancel,
- more: make(chan struct{}, 1),
- opts: o,
- logger: logger,
+ queue: make([]*Alert, 0, o.QueueCapacity),
+ more: make(chan struct{}, 1),
+ stopRequested: make(chan struct{}),
+ stopOnce: &sync.Once{},
+ opts: o,
+ logger: logger,
}
queueLenFunc := func() float64 { return float64(n.queueLen()) }
@@ -298,42 +298,100 @@ func (n *Manager) nextBatch() []*Alert {
return alerts
}
+// Run dispatches notifications continuously, returning once Stop has been called and all
+// pending notifications have been drained from the queue (if draining is enabled).
+//
+// Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other.
+// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
+func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
+ wg := sync.WaitGroup{}
+ wg.Add(2)
+
+ go func() {
+ defer wg.Done()
+ n.targetUpdateLoop(tsets)
+ }()
+
+ go func() {
+ defer wg.Done()
+ n.sendLoop()
+ n.drainQueue()
+ }()
+
+ wg.Wait()
+ level.Info(n.logger).Log("msg", "Notification manager stopped")
+}
+
// sendLoop continuously consumes the notifications queue and sends alerts to
// the configured Alertmanagers.
func (n *Manager) sendLoop() {
for {
+ // If we've been asked to stop, that takes priority over sending any further notifications.
select {
- case <-n.ctx.Done():
+ case <-n.stopRequested:
return
- case <-n.more:
- }
- alerts := n.nextBatch()
+ default:
+ select {
+ case <-n.stopRequested:
+ return
- if !n.sendAll(alerts...) {
- n.metrics.dropped.Add(float64(len(alerts)))
- }
- // If the queue still has items left, kick off the next iteration.
- if n.queueLen() > 0 {
- n.setMore()
+ case <-n.more:
+ n.sendOneBatch()
+
+ // If the queue still has items left, kick off the next iteration.
+ if n.queueLen() > 0 {
+ n.setMore()
+ }
+ }
}
}
}
-// Run receives updates of target groups and triggers a reload.
-// The dispatching of notifications occurs in the background to prevent blocking the receipt of target updates.
-// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details.
-func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
- go n.sendLoop()
+// targetUpdateLoop receives updates of target groups and triggers a reload.
+func (n *Manager) targetUpdateLoop(tsets <-chan map[string][]*targetgroup.Group) {
for {
+ // If we've been asked to stop, that takes priority over processing any further target group updates.
select {
- case <-n.ctx.Done():
+ case <-n.stopRequested:
return
- case ts := <-tsets:
- n.reload(ts)
+ default:
+ select {
+ case <-n.stopRequested:
+ return
+ case ts := <-tsets:
+ n.reload(ts)
+ }
}
}
}
+func (n *Manager) sendOneBatch() {
+ alerts := n.nextBatch()
+
+ if !n.sendAll(alerts...) {
+ n.metrics.dropped.Add(float64(len(alerts)))
+ }
+}
+
+func (n *Manager) drainQueue() {
+ if !n.opts.DrainOnShutdown {
+ if n.queueLen() > 0 {
+ level.Warn(n.logger).Log("msg", "Draining remaining notifications on shutdown is disabled, and some notifications have been dropped", "count", n.queueLen())
+ n.metrics.dropped.Add(float64(n.queueLen()))
+ }
+
+ return
+ }
+
+ level.Info(n.logger).Log("msg", "Draining any remaining notifications...")
+
+ for n.queueLen() > 0 {
+ n.sendOneBatch()
+ }
+
+ level.Info(n.logger).Log("msg", "Remaining notifications drained")
+}
+
func (n *Manager) reload(tgs map[string][]*targetgroup.Group) {
n.mtx.Lock()
defer n.mtx.Unlock()
@@ -546,7 +604,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
for _, am := range ams.ams {
wg.Add(1)
- ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.cfg.Timeout))
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(ams.cfg.Timeout))
defer cancel()
go func(ctx context.Context, client *http.Client, url string, payload []byte, count int) {
@@ -624,10 +682,19 @@ func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []b
return nil
}
-// Stop shuts down the notification handler.
+// Stop signals the notification manager to shut down and immediately returns.
+//
+// Run will return once the notification manager has successfully shut down.
+//
+// The manager will optionally drain any queued notifications before shutting down.
+//
+// Stop is safe to call multiple times.
func (n *Manager) Stop() {
level.Info(n.logger).Log("msg", "Stopping notification manager...")
- n.cancel()
+
+ n.stopOnce.Do(func() {
+ close(n.stopRequested)
+ })
}
// Alertmanager holds Alertmanager endpoint information.
diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go
index 03290a58c..2cdaa9e06 100644
--- a/notifier/notifier_test.go
+++ b/notifier/notifier_test.go
@@ -847,3 +847,173 @@ loop2:
}
}
}
+
+func TestStop_DrainingDisabled(t *testing.T) {
+ releaseReceiver := make(chan struct{})
+ receiverReceivedRequest := make(chan struct{}, 2)
+ alertsReceived := atomic.NewInt64(0)
+
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ // Let the test know we've received a request.
+ receiverReceivedRequest <- struct{}{}
+
+ var alerts []*Alert
+
+ b, err := io.ReadAll(r.Body)
+ require.NoError(t, err)
+
+ err = json.Unmarshal(b, &alerts)
+ require.NoError(t, err)
+
+ alertsReceived.Add(int64(len(alerts)))
+
+ // Wait for the test to release us.
+ <-releaseReceiver
+
+ w.WriteHeader(http.StatusOK)
+ }))
+ defer func() {
+ server.Close()
+ }()
+
+ m := NewManager(
+ &Options{
+ QueueCapacity: 10,
+ DrainOnShutdown: false,
+ },
+ nil,
+ )
+
+ m.alertmanagers = make(map[string]*alertmanagerSet)
+
+ am1Cfg := config.DefaultAlertmanagerConfig
+ am1Cfg.Timeout = model.Duration(time.Second)
+
+ m.alertmanagers["1"] = &alertmanagerSet{
+ ams: []alertmanager{
+ alertmanagerMock{
+ urlf: func() string { return server.URL },
+ },
+ },
+ cfg: &am1Cfg,
+ }
+
+ notificationManagerStopped := make(chan struct{})
+
+ go func() {
+ defer close(notificationManagerStopped)
+ m.Run(nil)
+ }()
+
+ // Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later.
+ m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")})
+
+ select {
+ case <-receiverReceivedRequest:
+ // Nothing more to do.
+ case <-time.After(time.Second):
+ require.FailNow(t, "gave up waiting for receiver to receive notification of first alert")
+ }
+
+ m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")})
+
+ // Stop the notification manager, pause to allow the shutdown to be observed, and then allow the receiver to proceed.
+ m.Stop()
+ time.Sleep(time.Second)
+ close(releaseReceiver)
+
+ // Wait for the notification manager to stop and confirm only the first notification was sent.
+ // The second notification should be dropped.
+ select {
+ case <-notificationManagerStopped:
+ // Nothing more to do.
+ case <-time.After(time.Second):
+ require.FailNow(t, "gave up waiting for notification manager to stop")
+ }
+
+ require.Equal(t, int64(1), alertsReceived.Load())
+}
+
+func TestStop_DrainingEnabled(t *testing.T) {
+ releaseReceiver := make(chan struct{})
+ receiverReceivedRequest := make(chan struct{}, 2)
+ alertsReceived := atomic.NewInt64(0)
+
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ // Let the test know we've received a request.
+ receiverReceivedRequest <- struct{}{}
+
+ var alerts []*Alert
+
+ b, err := io.ReadAll(r.Body)
+ require.NoError(t, err)
+
+ err = json.Unmarshal(b, &alerts)
+ require.NoError(t, err)
+
+ alertsReceived.Add(int64(len(alerts)))
+
+ // Wait for the test to release us.
+ <-releaseReceiver
+
+ w.WriteHeader(http.StatusOK)
+ }))
+ defer func() {
+ server.Close()
+ }()
+
+ m := NewManager(
+ &Options{
+ QueueCapacity: 10,
+ DrainOnShutdown: true,
+ },
+ nil,
+ )
+
+ m.alertmanagers = make(map[string]*alertmanagerSet)
+
+ am1Cfg := config.DefaultAlertmanagerConfig
+ am1Cfg.Timeout = model.Duration(time.Second)
+
+ m.alertmanagers["1"] = &alertmanagerSet{
+ ams: []alertmanager{
+ alertmanagerMock{
+ urlf: func() string { return server.URL },
+ },
+ },
+ cfg: &am1Cfg,
+ }
+
+ notificationManagerStopped := make(chan struct{})
+
+ go func() {
+ defer close(notificationManagerStopped)
+ m.Run(nil)
+ }()
+
+ // Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later.
+ m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")})
+
+ select {
+ case <-receiverReceivedRequest:
+ // Nothing more to do.
+ case <-time.After(time.Second):
+ require.FailNow(t, "gave up waiting for receiver to receive notification of first alert")
+ }
+
+ m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")})
+
+ // Stop the notification manager and allow the receiver to proceed.
+ m.Stop()
+ close(releaseReceiver)
+
+ // Wait for the notification manager to stop and confirm both notifications were sent.
+ select {
+ case <-notificationManagerStopped:
+ // Nothing more to do.
+ case <-time.After(200 * time.Millisecond):
+ require.FailNow(t, "gave up waiting for notification manager to stop")
+ }
+
+ require.Equal(t, int64(2), alertsReceived.Load())
+}