Limit the number of SSE Subscribers to 16 by default

Signed-off-by: Julien <roidelapluie@o11y.eu>
This commit is contained in:
Julien 2024-09-27 13:51:50 +02:00
parent 7aa4721373
commit f9bbad1148
7 changed files with 94 additions and 46 deletions

View file

@ -153,6 +153,7 @@ type flagConfig struct {
queryMaxSamples int
RemoteFlushDeadline model.Duration
nameEscapingScheme string
maxNotificationsSubscribers int
enableAutoReload bool
autoReloadInterval model.Duration
@ -274,8 +275,6 @@ func main() {
)
}
notifs := api.NewNotifications(prometheus.DefaultRegisterer)
cfg := flagConfig{
notifier: notifier.Options{
Registerer: prometheus.DefaultRegisterer,
@ -283,8 +282,6 @@ func main() {
web: web.Options{
Registerer: prometheus.DefaultRegisterer,
Gatherer: prometheus.DefaultGatherer,
NotificationsSub: notifs.Sub,
NotificationsGetter: notifs.Get,
},
promlogConfig: promlog.Config{},
}
@ -319,6 +316,9 @@ func main() {
a.Flag("web.max-connections", "Maximum number of simultaneous connections across all listeners.").
Default("512").IntVar(&cfg.web.MaxConnections)
a.Flag("web.max-notifications-subscribers", "Limits the maximum number of subscribers that can concurrently receive live notifications. If the limit is reached, new subscription requests will be denied until existing connections close.").
Default("16").IntVar(&cfg.maxNotificationsSubscribers)
a.Flag("web.external-url",
"The URL under which Prometheus is externally reachable (for example, if Prometheus is served via a reverse proxy). Used for generating relative and absolute links back to Prometheus itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Prometheus. If omitted, relevant URL components will be derived automatically.").
PlaceHolder("<URL>").StringVar(&cfg.prometheusURL)
@ -500,6 +500,10 @@ func main() {
logger := promlog.New(&cfg.promlogConfig)
notifs := api.NewNotifications(cfg.maxNotificationsSubscribers, prometheus.DefaultRegisterer)
cfg.web.NotificationsSub = notifs.Sub
cfg.web.NotificationsGetter = notifs.Get
if err := cfg.setFeatureListOptions(logger); err != nil {
fmt.Fprintln(os.Stderr, fmt.Errorf("Error parsing feature list: %w", err))
os.Exit(1)

View file

@ -21,6 +21,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--web.config.file</code> | [EXPERIMENTAL] Path to configuration file that can enable TLS or authentication. | |
| <code class="text-nowrap">--web.read-timeout</code> | Maximum duration before timing out read of the request, and closing idle connections. | `5m` |
| <code class="text-nowrap">--web.max-connections</code> | Maximum number of simultaneous connections across all listeners. | `512` |
| <code class="text-nowrap">--web.max-notifications-subscribers</code> | Limits the maximum number of subscribers that can concurrently receive live notifications. If the limit is reached, new subscription requests will be denied until existing connections close. | `16` |
| <code class="text-nowrap">--web.external-url</code> | The URL under which Prometheus is externally reachable (for example, if Prometheus is served via a reverse proxy). Used for generating relative and absolute links back to Prometheus itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Prometheus. If omitted, relevant URL components will be derived automatically. | |
| <code class="text-nowrap">--web.route-prefix</code> | Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url. | |
| <code class="text-nowrap">--web.user-assets</code> | Path to static asset directory, available at /user. | |

View file

@ -37,6 +37,7 @@ type Notifications struct {
mu sync.Mutex
notifications []Notification
subscribers map[chan Notification]struct{} // Active subscribers.
maxSubscribers int
subscriberGauge prometheus.Gauge
notificationsSent prometheus.Counter
@ -44,9 +45,10 @@ type Notifications struct {
}
// NewNotifications creates a new Notifications instance.
func NewNotifications(reg prometheus.Registerer) *Notifications {
func NewNotifications(maxSubscribers int, reg prometheus.Registerer) *Notifications {
n := &Notifications{
subscribers: make(map[chan Notification]struct{}),
maxSubscribers: maxSubscribers,
subscriberGauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "prometheus",
Subsystem: "api",
@ -147,10 +149,16 @@ func (n *Notifications) Get() []Notification {
// Sub allows a client to subscribe to live notifications.
// It returns a channel where the subscriber will receive notifications and a function to unsubscribe.
// Each subscriber has its own goroutine to handle notifications and prevent blocking.
func (n *Notifications) Sub() (<-chan Notification, func()) {
func (n *Notifications) Sub() (<-chan Notification, func(), bool) {
n.mu.Lock()
defer n.mu.Unlock()
if len(n.subscribers) >= n.maxSubscribers {
return nil, nil, false
}
ch := make(chan Notification, 10) // Buffered channel to prevent blocking.
n.mu.Lock()
// Add the new subscriber to the list.
n.subscribers[ch] = struct{}{}
n.subscriberGauge.Set(float64(len(n.subscribers)))
@ -159,7 +167,6 @@ func (n *Notifications) Sub() (<-chan Notification, func()) {
for _, notification := range n.notifications {
ch <- notification
}
n.mu.Unlock()
// Unsubscribe function to remove the channel from subscribers.
unsubscribe := func() {
@ -172,5 +179,5 @@ func (n *Notifications) Sub() (<-chan Notification, func()) {
n.subscriberGauge.Set(float64(len(n.subscribers)))
}
return ch, unsubscribe
return ch, unsubscribe, true
}

View file

@ -23,7 +23,7 @@ import (
// TestNotificationLifecycle tests adding, modifying, and deleting notifications.
func TestNotificationLifecycle(t *testing.T) {
notifs := NewNotifications(nil)
notifs := NewNotifications(10, nil)
// Add a notification.
notifs.AddNotification("Test Notification 1")
@ -47,10 +47,11 @@ func TestNotificationLifecycle(t *testing.T) {
// TestSubscriberReceivesNotifications tests that a subscriber receives notifications, including modifications and deletions.
func TestSubscriberReceivesNotifications(t *testing.T) {
notifs := NewNotifications(nil)
notifs := NewNotifications(10, nil)
// Subscribe to notifications.
sub, unsubscribe := notifs.Sub()
sub, unsubscribe, ok := notifs.Sub()
require.True(t, ok)
var wg sync.WaitGroup
wg.Add(1)
@ -103,12 +104,14 @@ func TestSubscriberReceivesNotifications(t *testing.T) {
// TestMultipleSubscribers tests that multiple subscribers receive notifications independently.
func TestMultipleSubscribers(t *testing.T) {
notifs := NewNotifications(nil)
notifs := NewNotifications(10, nil)
// Subscribe two subscribers to notifications.
sub1, unsubscribe1 := notifs.Sub()
sub1, unsubscribe1, ok1 := notifs.Sub()
require.True(t, ok1)
sub2, unsubscribe2 := notifs.Sub()
sub2, unsubscribe2, ok2 := notifs.Sub()
require.True(t, ok2)
var wg sync.WaitGroup
wg.Add(2)
@ -157,10 +160,11 @@ func TestMultipleSubscribers(t *testing.T) {
// TestUnsubscribe tests that unsubscribing prevents further notifications from being received.
func TestUnsubscribe(t *testing.T) {
notifs := NewNotifications(nil)
notifs := NewNotifications(10, nil)
// Subscribe to notifications.
sub, unsubscribe := notifs.Sub()
sub, unsubscribe, ok := notifs.Sub()
require.True(t, ok)
var wg sync.WaitGroup
wg.Add(1)
@ -190,3 +194,30 @@ func TestUnsubscribe(t *testing.T) {
require.Len(t, receivedNotifications, 1, "Expected 1 notification before unsubscribe.")
require.Equal(t, "Test Notification 1", receivedNotifications[0].Text, "Unexpected notification text.")
}
// TestMaxSubscribers tests that exceeding the max subscribers limit prevents additional subscriptions.
func TestMaxSubscribers(t *testing.T) {
maxSubscribers := 2
notifs := NewNotifications(maxSubscribers, nil)
// Subscribe the maximum number of subscribers.
_, unsubscribe1, ok1 := notifs.Sub()
require.True(t, ok1, "Expected first subscription to succeed.")
_, unsubscribe2, ok2 := notifs.Sub()
require.True(t, ok2, "Expected second subscription to succeed.")
// Try to subscribe more than the max allowed.
_, _, ok3 := notifs.Sub()
require.False(t, ok3, "Expected third subscription to fail due to max subscriber limit.")
// Unsubscribe one subscriber and try again.
unsubscribe1()
_, unsubscribe4, ok4 := notifs.Sub()
require.True(t, ok4, "Expected subscription to succeed after unsubscribing a subscriber.")
// Clean up the subscriptions.
unsubscribe2()
unsubscribe4()
}

View file

@ -215,7 +215,7 @@ type API struct {
isAgent bool
statsRenderer StatsRenderer
notificationsGetter func() []api.Notification
notificationsSub func() (<-chan api.Notification, func())
notificationsSub func() (<-chan api.Notification, func(), bool)
remoteWriteHandler http.Handler
remoteReadHandler http.Handler
@ -250,7 +250,7 @@ func NewAPI(
runtimeInfo func() (RuntimeInfo, error),
buildInfo *PrometheusVersion,
notificationsGetter func() []api.Notification,
notificationsSub func() (<-chan api.Notification, func()),
notificationsSub func() (<-chan api.Notification, func(), bool),
gatherer prometheus.Gatherer,
registerer prometheus.Registerer,
statsRenderer StatsRenderer,
@ -1690,7 +1690,11 @@ func (api *API) notificationsSSE(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Connection", "keep-alive")
// Subscribe to notifications.
notifications, unsubscribe := api.notificationsSub()
notifications, unsubscribe, ok := api.notificationsSub()
if !ok {
w.WriteHeader(http.StatusNoContent)
return
}
defer unsubscribe()
// Set up a flusher to push the response to the client.

View file

@ -42,7 +42,8 @@ export const NotificationsProvider: React.FC<{ children: React.ReactNode }> = ({
eventSource.onerror = () => {
eventSource.close();
setIsConnectionError(true);
// We do not call setIsConnectionError(true), we only set it to true if
// the fallback API does not work either.
setShouldFetchFromAPI(true);
};

View file

@ -268,7 +268,7 @@ type Options struct {
Notifier *notifier.Manager
Version *PrometheusVersion
NotificationsGetter func() []api.Notification
NotificationsSub func() (<-chan api.Notification, func())
NotificationsSub func() (<-chan api.Notification, func(), bool)
Flags map[string]string
ListenAddresses []string