From 2dd07fbb1bfe8ebeca8ea11f4623e0f5faed2236 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 26 Jun 2024 20:32:04 +1000 Subject: [PATCH] notifier: optionally drain queued notifications before shutting down (#14290) * Add draining of queued notifications to `notifier.Manager` Signed-off-by: Charles Korn * Update docs Signed-off-by: Charles Korn * Address PR feedback Signed-off-by: Charles Korn * Add more logging Signed-off-by: Charles Korn * Address offline feedback: remove timeout Signed-off-by: Charles Korn * Ensure stopping takes priority over further processing, make tests more robust Signed-off-by: Charles Korn * Make channel unbuffered Signed-off-by: Charles Korn * Update docs Signed-off-by: Charles Korn * Fix race in test Signed-off-by: Charles Korn * Remove unnecessary context Signed-off-by: Charles Korn * Make Stop safe to call multiple times Signed-off-by: Charles Korn --------- Signed-off-by: Charles Korn --- cmd/prometheus/main.go | 3 + docs/command-line/prometheus.md | 1 + notifier/notifier.go | 139 +++++++++++++++++++------- notifier/notifier_test.go | 170 ++++++++++++++++++++++++++++++++ 4 files changed, 277 insertions(+), 36 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index cd7f533d1..7544f276a 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -445,6 +445,9 @@ func main() { serverOnlyFlag(a, "alertmanager.notification-queue-capacity", "The capacity of the queue for pending Alertmanager notifications."). Default("10000").IntVar(&cfg.notifier.QueueCapacity) + serverOnlyFlag(a, "alertmanager.drain-notification-queue-on-shutdown", "Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down."). + Default("true").BoolVar(&cfg.notifier.DrainOnShutdown) + // TODO: Remove in Prometheus 3.0. alertmanagerTimeout := a.Flag("alertmanager.timeout", "[DEPRECATED] This flag has no effect.").Hidden().String() diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index aa9bf3bfb..1fc032d09 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -50,6 +50,7 @@ The Prometheus monitoring server | --rules.alert.resend-delay | Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` | | --rules.max-concurrent-evals | Global concurrency limit for independent rules that can run concurrently. When set, "query.max-concurrency" may need to be adjusted accordingly. Use with server mode only. | `4` | | --alertmanager.notification-queue-capacity | The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` | +| --alertmanager.drain-notification-queue-on-shutdown | Send any outstanding Alertmanager notifications when shutting down. If false, any outstanding Alertmanager notifications will be dropped when shutting down. Use with server mode only. | `true` | | --query.lookback-delta | The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` | | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | --query.max-concurrency | Maximum number of queries executed concurrently. Use with server mode only. | `20` | diff --git a/notifier/notifier.go b/notifier/notifier.go index cd00a4507..68b0d4961 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -110,10 +110,11 @@ type Manager struct { metrics *alertMetrics - more chan struct{} - mtx sync.RWMutex - ctx context.Context - cancel func() + more chan struct{} + mtx sync.RWMutex + + stopOnce *sync.Once + stopRequested chan struct{} alertmanagers map[string]*alertmanagerSet logger log.Logger @@ -121,9 +122,10 @@ type Manager struct { // Options are the configurable parameters of a Handler. type Options struct { - QueueCapacity int - ExternalLabels labels.Labels - RelabelConfigs []*relabel.Config + QueueCapacity int + DrainOnShutdown bool + ExternalLabels labels.Labels + RelabelConfigs []*relabel.Config // Used for sending HTTP requests to the Alertmanager. Do func(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) @@ -217,8 +219,6 @@ func do(ctx context.Context, client *http.Client, req *http.Request) (*http.Resp // NewManager is the manager constructor. func NewManager(o *Options, logger log.Logger) *Manager { - ctx, cancel := context.WithCancel(context.Background()) - if o.Do == nil { o.Do = do } @@ -227,12 +227,12 @@ func NewManager(o *Options, logger log.Logger) *Manager { } n := &Manager{ - queue: make([]*Alert, 0, o.QueueCapacity), - ctx: ctx, - cancel: cancel, - more: make(chan struct{}, 1), - opts: o, - logger: logger, + queue: make([]*Alert, 0, o.QueueCapacity), + more: make(chan struct{}, 1), + stopRequested: make(chan struct{}), + stopOnce: &sync.Once{}, + opts: o, + logger: logger, } queueLenFunc := func() float64 { return float64(n.queueLen()) } @@ -298,42 +298,100 @@ func (n *Manager) nextBatch() []*Alert { return alerts } +// Run dispatches notifications continuously, returning once Stop has been called and all +// pending notifications have been drained from the queue (if draining is enabled). +// +// Dispatching of notifications occurs in parallel to processing target updates to avoid one starving the other. +// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details. +func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + defer wg.Done() + n.targetUpdateLoop(tsets) + }() + + go func() { + defer wg.Done() + n.sendLoop() + n.drainQueue() + }() + + wg.Wait() + level.Info(n.logger).Log("msg", "Notification manager stopped") +} + // sendLoop continuously consumes the notifications queue and sends alerts to // the configured Alertmanagers. func (n *Manager) sendLoop() { for { + // If we've been asked to stop, that takes priority over sending any further notifications. select { - case <-n.ctx.Done(): + case <-n.stopRequested: return - case <-n.more: - } - alerts := n.nextBatch() + default: + select { + case <-n.stopRequested: + return - if !n.sendAll(alerts...) { - n.metrics.dropped.Add(float64(len(alerts))) - } - // If the queue still has items left, kick off the next iteration. - if n.queueLen() > 0 { - n.setMore() + case <-n.more: + n.sendOneBatch() + + // If the queue still has items left, kick off the next iteration. + if n.queueLen() > 0 { + n.setMore() + } + } } } } -// Run receives updates of target groups and triggers a reload. -// The dispatching of notifications occurs in the background to prevent blocking the receipt of target updates. -// Refer to https://github.com/prometheus/prometheus/issues/13676 for more details. -func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) { - go n.sendLoop() +// targetUpdateLoop receives updates of target groups and triggers a reload. +func (n *Manager) targetUpdateLoop(tsets <-chan map[string][]*targetgroup.Group) { for { + // If we've been asked to stop, that takes priority over processing any further target group updates. select { - case <-n.ctx.Done(): + case <-n.stopRequested: return - case ts := <-tsets: - n.reload(ts) + default: + select { + case <-n.stopRequested: + return + case ts := <-tsets: + n.reload(ts) + } } } } +func (n *Manager) sendOneBatch() { + alerts := n.nextBatch() + + if !n.sendAll(alerts...) { + n.metrics.dropped.Add(float64(len(alerts))) + } +} + +func (n *Manager) drainQueue() { + if !n.opts.DrainOnShutdown { + if n.queueLen() > 0 { + level.Warn(n.logger).Log("msg", "Draining remaining notifications on shutdown is disabled, and some notifications have been dropped", "count", n.queueLen()) + n.metrics.dropped.Add(float64(n.queueLen())) + } + + return + } + + level.Info(n.logger).Log("msg", "Draining any remaining notifications...") + + for n.queueLen() > 0 { + n.sendOneBatch() + } + + level.Info(n.logger).Log("msg", "Remaining notifications drained") +} + func (n *Manager) reload(tgs map[string][]*targetgroup.Group) { n.mtx.Lock() defer n.mtx.Unlock() @@ -546,7 +604,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool { for _, am := range ams.ams { wg.Add(1) - ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.cfg.Timeout)) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(ams.cfg.Timeout)) defer cancel() go func(ctx context.Context, client *http.Client, url string, payload []byte, count int) { @@ -624,10 +682,19 @@ func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []b return nil } -// Stop shuts down the notification handler. +// Stop signals the notification manager to shut down and immediately returns. +// +// Run will return once the notification manager has successfully shut down. +// +// The manager will optionally drain any queued notifications before shutting down. +// +// Stop is safe to call multiple times. func (n *Manager) Stop() { level.Info(n.logger).Log("msg", "Stopping notification manager...") - n.cancel() + + n.stopOnce.Do(func() { + close(n.stopRequested) + }) } // Alertmanager holds Alertmanager endpoint information. diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 03290a58c..2cdaa9e06 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -847,3 +847,173 @@ loop2: } } } + +func TestStop_DrainingDisabled(t *testing.T) { + releaseReceiver := make(chan struct{}) + receiverReceivedRequest := make(chan struct{}, 2) + alertsReceived := atomic.NewInt64(0) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Let the test know we've received a request. + receiverReceivedRequest <- struct{}{} + + var alerts []*Alert + + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + + err = json.Unmarshal(b, &alerts) + require.NoError(t, err) + + alertsReceived.Add(int64(len(alerts))) + + // Wait for the test to release us. + <-releaseReceiver + + w.WriteHeader(http.StatusOK) + })) + defer func() { + server.Close() + }() + + m := NewManager( + &Options{ + QueueCapacity: 10, + DrainOnShutdown: false, + }, + nil, + ) + + m.alertmanagers = make(map[string]*alertmanagerSet) + + am1Cfg := config.DefaultAlertmanagerConfig + am1Cfg.Timeout = model.Duration(time.Second) + + m.alertmanagers["1"] = &alertmanagerSet{ + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server.URL }, + }, + }, + cfg: &am1Cfg, + } + + notificationManagerStopped := make(chan struct{}) + + go func() { + defer close(notificationManagerStopped) + m.Run(nil) + }() + + // Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later. + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")}) + + select { + case <-receiverReceivedRequest: + // Nothing more to do. + case <-time.After(time.Second): + require.FailNow(t, "gave up waiting for receiver to receive notification of first alert") + } + + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")}) + + // Stop the notification manager, pause to allow the shutdown to be observed, and then allow the receiver to proceed. + m.Stop() + time.Sleep(time.Second) + close(releaseReceiver) + + // Wait for the notification manager to stop and confirm only the first notification was sent. + // The second notification should be dropped. + select { + case <-notificationManagerStopped: + // Nothing more to do. + case <-time.After(time.Second): + require.FailNow(t, "gave up waiting for notification manager to stop") + } + + require.Equal(t, int64(1), alertsReceived.Load()) +} + +func TestStop_DrainingEnabled(t *testing.T) { + releaseReceiver := make(chan struct{}) + receiverReceivedRequest := make(chan struct{}, 2) + alertsReceived := atomic.NewInt64(0) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Let the test know we've received a request. + receiverReceivedRequest <- struct{}{} + + var alerts []*Alert + + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + + err = json.Unmarshal(b, &alerts) + require.NoError(t, err) + + alertsReceived.Add(int64(len(alerts))) + + // Wait for the test to release us. + <-releaseReceiver + + w.WriteHeader(http.StatusOK) + })) + defer func() { + server.Close() + }() + + m := NewManager( + &Options{ + QueueCapacity: 10, + DrainOnShutdown: true, + }, + nil, + ) + + m.alertmanagers = make(map[string]*alertmanagerSet) + + am1Cfg := config.DefaultAlertmanagerConfig + am1Cfg.Timeout = model.Duration(time.Second) + + m.alertmanagers["1"] = &alertmanagerSet{ + ams: []alertmanager{ + alertmanagerMock{ + urlf: func() string { return server.URL }, + }, + }, + cfg: &am1Cfg, + } + + notificationManagerStopped := make(chan struct{}) + + go func() { + defer close(notificationManagerStopped) + m.Run(nil) + }() + + // Queue two alerts. The first should be immediately sent to the receiver, which should block until we release it later. + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-1")}) + + select { + case <-receiverReceivedRequest: + // Nothing more to do. + case <-time.After(time.Second): + require.FailNow(t, "gave up waiting for receiver to receive notification of first alert") + } + + m.Send(&Alert{Labels: labels.FromStrings(labels.AlertName, "alert-2")}) + + // Stop the notification manager and allow the receiver to proceed. + m.Stop() + close(releaseReceiver) + + // Wait for the notification manager to stop and confirm both notifications were sent. + select { + case <-notificationManagerStopped: + // Nothing more to do. + case <-time.After(200 * time.Millisecond): + require.FailNow(t, "gave up waiting for notification manager to stop") + } + + require.Equal(t, int64(2), alertsReceived.Load()) +}