diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index dd068b86c..f39eba3c3 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -135,24 +135,25 @@ func agentOnlyFlag(app *kingpin.Application, name, help string) *kingpin.FlagCla type flagConfig struct { configFile string - agentStoragePath string - serverStoragePath string - notifier notifier.Options - forGracePeriod model.Duration - outageTolerance model.Duration - resendDelay model.Duration - maxConcurrentEvals int64 - web web.Options - scrape scrape.Options - tsdb tsdbOptions - agent agentOptions - lookbackDelta model.Duration - webTimeout model.Duration - queryTimeout model.Duration - queryConcurrency int - queryMaxSamples int - RemoteFlushDeadline model.Duration - nameEscapingScheme string + agentStoragePath string + serverStoragePath string + notifier notifier.Options + forGracePeriod model.Duration + outageTolerance model.Duration + resendDelay model.Duration + maxConcurrentEvals int64 + web web.Options + scrape scrape.Options + tsdb tsdbOptions + agent agentOptions + lookbackDelta model.Duration + webTimeout model.Duration + queryTimeout model.Duration + queryConcurrency int + queryMaxSamples int + RemoteFlushDeadline model.Duration + nameEscapingScheme string + maxNotificationsSubscribers int enableAutoReload bool autoReloadInterval model.Duration @@ -274,17 +275,13 @@ func main() { ) } - notifs := api.NewNotifications(prometheus.DefaultRegisterer) - cfg := flagConfig{ notifier: notifier.Options{ Registerer: prometheus.DefaultRegisterer, }, web: web.Options{ - Registerer: prometheus.DefaultRegisterer, - Gatherer: prometheus.DefaultGatherer, - NotificationsSub: notifs.Sub, - NotificationsGetter: notifs.Get, + Registerer: prometheus.DefaultRegisterer, + Gatherer: prometheus.DefaultGatherer, }, promlogConfig: promlog.Config{}, } @@ -319,6 +316,9 @@ func main() { a.Flag("web.max-connections", "Maximum number of simultaneous connections across all listeners."). Default("512").IntVar(&cfg.web.MaxConnections) + a.Flag("web.max-notifications-subscribers", "Limits the maximum number of subscribers that can concurrently receive live notifications. If the limit is reached, new subscription requests will be denied until existing connections close."). + Default("16").IntVar(&cfg.maxNotificationsSubscribers) + a.Flag("web.external-url", "The URL under which Prometheus is externally reachable (for example, if Prometheus is served via a reverse proxy). Used for generating relative and absolute links back to Prometheus itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Prometheus. If omitted, relevant URL components will be derived automatically."). PlaceHolder("").StringVar(&cfg.prometheusURL) @@ -500,6 +500,10 @@ func main() { logger := promlog.New(&cfg.promlogConfig) + notifs := api.NewNotifications(cfg.maxNotificationsSubscribers, prometheus.DefaultRegisterer) + cfg.web.NotificationsSub = notifs.Sub + cfg.web.NotificationsGetter = notifs.Get + if err := cfg.setFeatureListOptions(logger); err != nil { fmt.Fprintln(os.Stderr, fmt.Errorf("Error parsing feature list: %w", err)) os.Exit(1) diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 7737b5021..eacb45ad0 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -21,6 +21,7 @@ The Prometheus monitoring server | --web.config.file | [EXPERIMENTAL] Path to configuration file that can enable TLS or authentication. | | | --web.read-timeout | Maximum duration before timing out read of the request, and closing idle connections. | `5m` | | --web.max-connections | Maximum number of simultaneous connections across all listeners. | `512` | +| --web.max-notifications-subscribers | Limits the maximum number of subscribers that can concurrently receive live notifications. If the limit is reached, new subscription requests will be denied until existing connections close. | `16` | | --web.external-url | The URL under which Prometheus is externally reachable (for example, if Prometheus is served via a reverse proxy). Used for generating relative and absolute links back to Prometheus itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Prometheus. If omitted, relevant URL components will be derived automatically. | | | --web.route-prefix | Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url. | | | --web.user-assets | Path to static asset directory, available at /user. | | diff --git a/web/api/notifications.go b/web/api/notifications.go index 47f29f6eb..976f0b076 100644 --- a/web/api/notifications.go +++ b/web/api/notifications.go @@ -34,9 +34,10 @@ type Notification struct { // Notifications stores a list of Notification objects. // It also manages live subscribers that receive notifications via channels. type Notifications struct { - mu sync.Mutex - notifications []Notification - subscribers map[chan Notification]struct{} // Active subscribers. + mu sync.Mutex + notifications []Notification + subscribers map[chan Notification]struct{} // Active subscribers. + maxSubscribers int subscriberGauge prometheus.Gauge notificationsSent prometheus.Counter @@ -44,9 +45,10 @@ type Notifications struct { } // NewNotifications creates a new Notifications instance. -func NewNotifications(reg prometheus.Registerer) *Notifications { +func NewNotifications(maxSubscribers int, reg prometheus.Registerer) *Notifications { n := &Notifications{ - subscribers: make(map[chan Notification]struct{}), + subscribers: make(map[chan Notification]struct{}), + maxSubscribers: maxSubscribers, subscriberGauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "prometheus", Subsystem: "api", @@ -147,10 +149,16 @@ func (n *Notifications) Get() []Notification { // Sub allows a client to subscribe to live notifications. // It returns a channel where the subscriber will receive notifications and a function to unsubscribe. // Each subscriber has its own goroutine to handle notifications and prevent blocking. -func (n *Notifications) Sub() (<-chan Notification, func()) { +func (n *Notifications) Sub() (<-chan Notification, func(), bool) { + n.mu.Lock() + defer n.mu.Unlock() + + if len(n.subscribers) >= n.maxSubscribers { + return nil, nil, false + } + ch := make(chan Notification, 10) // Buffered channel to prevent blocking. - n.mu.Lock() // Add the new subscriber to the list. n.subscribers[ch] = struct{}{} n.subscriberGauge.Set(float64(len(n.subscribers))) @@ -159,7 +167,6 @@ func (n *Notifications) Sub() (<-chan Notification, func()) { for _, notification := range n.notifications { ch <- notification } - n.mu.Unlock() // Unsubscribe function to remove the channel from subscribers. unsubscribe := func() { @@ -172,5 +179,5 @@ func (n *Notifications) Sub() (<-chan Notification, func()) { n.subscriberGauge.Set(float64(len(n.subscribers))) } - return ch, unsubscribe + return ch, unsubscribe, true } diff --git a/web/api/notifications_test.go b/web/api/notifications_test.go index 7aa596163..437ff1ec4 100644 --- a/web/api/notifications_test.go +++ b/web/api/notifications_test.go @@ -23,7 +23,7 @@ import ( // TestNotificationLifecycle tests adding, modifying, and deleting notifications. func TestNotificationLifecycle(t *testing.T) { - notifs := NewNotifications(nil) + notifs := NewNotifications(10, nil) // Add a notification. notifs.AddNotification("Test Notification 1") @@ -47,10 +47,11 @@ func TestNotificationLifecycle(t *testing.T) { // TestSubscriberReceivesNotifications tests that a subscriber receives notifications, including modifications and deletions. func TestSubscriberReceivesNotifications(t *testing.T) { - notifs := NewNotifications(nil) + notifs := NewNotifications(10, nil) // Subscribe to notifications. - sub, unsubscribe := notifs.Sub() + sub, unsubscribe, ok := notifs.Sub() + require.True(t, ok) var wg sync.WaitGroup wg.Add(1) @@ -103,12 +104,14 @@ func TestSubscriberReceivesNotifications(t *testing.T) { // TestMultipleSubscribers tests that multiple subscribers receive notifications independently. func TestMultipleSubscribers(t *testing.T) { - notifs := NewNotifications(nil) + notifs := NewNotifications(10, nil) // Subscribe two subscribers to notifications. - sub1, unsubscribe1 := notifs.Sub() + sub1, unsubscribe1, ok1 := notifs.Sub() + require.True(t, ok1) - sub2, unsubscribe2 := notifs.Sub() + sub2, unsubscribe2, ok2 := notifs.Sub() + require.True(t, ok2) var wg sync.WaitGroup wg.Add(2) @@ -157,10 +160,11 @@ func TestMultipleSubscribers(t *testing.T) { // TestUnsubscribe tests that unsubscribing prevents further notifications from being received. func TestUnsubscribe(t *testing.T) { - notifs := NewNotifications(nil) + notifs := NewNotifications(10, nil) // Subscribe to notifications. - sub, unsubscribe := notifs.Sub() + sub, unsubscribe, ok := notifs.Sub() + require.True(t, ok) var wg sync.WaitGroup wg.Add(1) @@ -190,3 +194,30 @@ func TestUnsubscribe(t *testing.T) { require.Len(t, receivedNotifications, 1, "Expected 1 notification before unsubscribe.") require.Equal(t, "Test Notification 1", receivedNotifications[0].Text, "Unexpected notification text.") } + +// TestMaxSubscribers tests that exceeding the max subscribers limit prevents additional subscriptions. +func TestMaxSubscribers(t *testing.T) { + maxSubscribers := 2 + notifs := NewNotifications(maxSubscribers, nil) + + // Subscribe the maximum number of subscribers. + _, unsubscribe1, ok1 := notifs.Sub() + require.True(t, ok1, "Expected first subscription to succeed.") + + _, unsubscribe2, ok2 := notifs.Sub() + require.True(t, ok2, "Expected second subscription to succeed.") + + // Try to subscribe more than the max allowed. + _, _, ok3 := notifs.Sub() + require.False(t, ok3, "Expected third subscription to fail due to max subscriber limit.") + + // Unsubscribe one subscriber and try again. + unsubscribe1() + + _, unsubscribe4, ok4 := notifs.Sub() + require.True(t, ok4, "Expected subscription to succeed after unsubscribing a subscriber.") + + // Clean up the subscriptions. + unsubscribe2() + unsubscribe4() +} diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 5eadbdbe7..d3cc7d718 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -215,7 +215,7 @@ type API struct { isAgent bool statsRenderer StatsRenderer notificationsGetter func() []api.Notification - notificationsSub func() (<-chan api.Notification, func()) + notificationsSub func() (<-chan api.Notification, func(), bool) remoteWriteHandler http.Handler remoteReadHandler http.Handler @@ -250,7 +250,7 @@ func NewAPI( runtimeInfo func() (RuntimeInfo, error), buildInfo *PrometheusVersion, notificationsGetter func() []api.Notification, - notificationsSub func() (<-chan api.Notification, func()), + notificationsSub func() (<-chan api.Notification, func(), bool), gatherer prometheus.Gatherer, registerer prometheus.Registerer, statsRenderer StatsRenderer, @@ -1690,7 +1690,11 @@ func (api *API) notificationsSSE(w http.ResponseWriter, r *http.Request) { w.Header().Set("Connection", "keep-alive") // Subscribe to notifications. - notifications, unsubscribe := api.notificationsSub() + notifications, unsubscribe, ok := api.notificationsSub() + if !ok { + w.WriteHeader(http.StatusNoContent) + return + } defer unsubscribe() // Set up a flusher to push the response to the client. @@ -1700,6 +1704,10 @@ func (api *API) notificationsSSE(w http.ResponseWriter, r *http.Request) { return } + // Flush the response to ensure the headers are immediately and eventSource + // onopen is triggered client-side. + flusher.Flush() + for { select { case notification := <-notifications: diff --git a/web/ui/mantine-ui/package.json b/web/ui/mantine-ui/package.json index ec8ef8902..aae8ba99b 100644 --- a/web/ui/mantine-ui/package.json +++ b/web/ui/mantine-ui/package.json @@ -25,6 +25,7 @@ "@mantine/dates": "^7.11.2", "@mantine/hooks": "^7.11.2", "@mantine/notifications": "^7.11.2", + "@microsoft/fetch-event-source": "^2.0.1", "@nexucis/fuzzy": "^0.5.1", "@nexucis/kvsearch": "^0.9.1", "@prometheus-io/codemirror-promql": "0.300.0-beta.0", diff --git a/web/ui/mantine-ui/src/components/NotificationsProvider.tsx b/web/ui/mantine-ui/src/components/NotificationsProvider.tsx index 73de54131..a331e524b 100644 --- a/web/ui/mantine-ui/src/components/NotificationsProvider.tsx +++ b/web/ui/mantine-ui/src/components/NotificationsProvider.tsx @@ -3,6 +3,7 @@ import { useSettings } from '../state/settingsSlice'; import { NotificationsContext } from '../state/useNotifications'; import { Notification, NotificationsResult } from "../api/responseTypes/notifications"; import { useAPIQuery } from '../api/api'; +import { fetchEventSource } from '@microsoft/fetch-event-source'; export const NotificationsProvider: React.FC<{ children: React.ReactNode }> = ({ children }) => { const { pathPrefix } = useSettings(); @@ -24,30 +25,47 @@ export const NotificationsProvider: React.FC<{ children: React.ReactNode }> = ({ }, [data, isError]); useEffect(() => { - const eventSource = new EventSource(`${pathPrefix}/api/v1/notifications/live`); - - eventSource.onmessage = (event) => { - const notification: Notification = JSON.parse(event.data); - - setNotifications((prev: Notification[]) => { - const updatedNotifications = [...prev.filter((n: Notification) => n.text !== notification.text)]; - - if (notification.active) { - updatedNotifications.push(notification); + const controller = new AbortController(); + fetchEventSource(`${pathPrefix}/api/v1/notifications/live`, { + signal: controller.signal, + async onopen(response) { + if (response.ok) { + if (response.status === 200) { + setNotifications([]); + setIsConnectionError(false); + } else if (response.status === 204) { + controller.abort(); + setShouldFetchFromAPI(true); + } + } else { + setIsConnectionError(true); + throw new Error(`Unexpected response: ${response.status} ${response.statusText}`); } + }, + onmessage(event) { + const notification: Notification = JSON.parse(event.data); - return updatedNotifications; - }); - }; + setNotifications((prev: Notification[]) => { + const updatedNotifications = [...prev.filter((n: Notification) => n.text !== notification.text)]; - eventSource.onerror = () => { - eventSource.close(); - setIsConnectionError(true); - setShouldFetchFromAPI(true); - }; + if (notification.active) { + updatedNotifications.push(notification); + } + + return updatedNotifications; + }); + }, + onclose() { + throw new Error("Server closed the connection"); + }, + onerror() { + setIsConnectionError(true); + return 5000; + }, + }); return () => { - eventSource.close(); + controller.abort(); }; }, [pathPrefix]); diff --git a/web/ui/package-lock.json b/web/ui/package-lock.json index 2dc1fcdfe..49a907480 100644 --- a/web/ui/package-lock.json +++ b/web/ui/package-lock.json @@ -39,6 +39,7 @@ "@mantine/dates": "^7.11.2", "@mantine/hooks": "^7.11.2", "@mantine/notifications": "^7.11.2", + "@microsoft/fetch-event-source": "^2.0.1", "@nexucis/fuzzy": "^0.5.1", "@nexucis/kvsearch": "^0.9.1", "@prometheus-io/codemirror-promql": "0.300.0-beta.0", @@ -2255,6 +2256,11 @@ "react": "^18.2.0" } }, + "node_modules/@microsoft/fetch-event-source": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz", + "integrity": "sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA==" + }, "node_modules/@nexucis/fuzzy": { "version": "0.5.1", "resolved": "https://registry.npmjs.org/@nexucis/fuzzy/-/fuzzy-0.5.1.tgz", diff --git a/web/web.go b/web/web.go index 87e4164c5..724ca9105 100644 --- a/web/web.go +++ b/web/web.go @@ -268,7 +268,7 @@ type Options struct { Notifier *notifier.Manager Version *PrometheusVersion NotificationsGetter func() []api.Notification - NotificationsSub func() (<-chan api.Notification, func()) + NotificationsSub func() (<-chan api.Notification, func(), bool) Flags map[string]string ListenAddresses []string