Merge pull request #14994 from roidelapluie/notifications2

Follow-up on notifications via SSE
This commit is contained in:
Julien 2024-09-30 10:17:34 +02:00 committed by GitHub
commit 537c5dbbcf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 140 additions and 64 deletions

View file

@ -153,6 +153,7 @@ type flagConfig struct {
queryMaxSamples int
RemoteFlushDeadline model.Duration
nameEscapingScheme string
maxNotificationsSubscribers int
enableAutoReload bool
autoReloadInterval model.Duration
@ -274,8 +275,6 @@ func main() {
)
}
notifs := api.NewNotifications(prometheus.DefaultRegisterer)
cfg := flagConfig{
notifier: notifier.Options{
Registerer: prometheus.DefaultRegisterer,
@ -283,8 +282,6 @@ func main() {
web: web.Options{
Registerer: prometheus.DefaultRegisterer,
Gatherer: prometheus.DefaultGatherer,
NotificationsSub: notifs.Sub,
NotificationsGetter: notifs.Get,
},
promlogConfig: promlog.Config{},
}
@ -319,6 +316,9 @@ func main() {
a.Flag("web.max-connections", "Maximum number of simultaneous connections across all listeners.").
Default("512").IntVar(&cfg.web.MaxConnections)
a.Flag("web.max-notifications-subscribers", "Limits the maximum number of subscribers that can concurrently receive live notifications. If the limit is reached, new subscription requests will be denied until existing connections close.").
Default("16").IntVar(&cfg.maxNotificationsSubscribers)
a.Flag("web.external-url",
"The URL under which Prometheus is externally reachable (for example, if Prometheus is served via a reverse proxy). Used for generating relative and absolute links back to Prometheus itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Prometheus. If omitted, relevant URL components will be derived automatically.").
PlaceHolder("<URL>").StringVar(&cfg.prometheusURL)
@ -500,6 +500,10 @@ func main() {
logger := promlog.New(&cfg.promlogConfig)
notifs := api.NewNotifications(cfg.maxNotificationsSubscribers, prometheus.DefaultRegisterer)
cfg.web.NotificationsSub = notifs.Sub
cfg.web.NotificationsGetter = notifs.Get
if err := cfg.setFeatureListOptions(logger); err != nil {
fmt.Fprintln(os.Stderr, fmt.Errorf("Error parsing feature list: %w", err))
os.Exit(1)

View file

@ -21,6 +21,7 @@ The Prometheus monitoring server
| <code class="text-nowrap">--web.config.file</code> | [EXPERIMENTAL] Path to configuration file that can enable TLS or authentication. | |
| <code class="text-nowrap">--web.read-timeout</code> | Maximum duration before timing out read of the request, and closing idle connections. | `5m` |
| <code class="text-nowrap">--web.max-connections</code> | Maximum number of simultaneous connections across all listeners. | `512` |
| <code class="text-nowrap">--web.max-notifications-subscribers</code> | Limits the maximum number of subscribers that can concurrently receive live notifications. If the limit is reached, new subscription requests will be denied until existing connections close. | `16` |
| <code class="text-nowrap">--web.external-url</code> | The URL under which Prometheus is externally reachable (for example, if Prometheus is served via a reverse proxy). Used for generating relative and absolute links back to Prometheus itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Prometheus. If omitted, relevant URL components will be derived automatically. | |
| <code class="text-nowrap">--web.route-prefix</code> | Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url. | |
| <code class="text-nowrap">--web.user-assets</code> | Path to static asset directory, available at /user. | |

View file

@ -37,6 +37,7 @@ type Notifications struct {
mu sync.Mutex
notifications []Notification
subscribers map[chan Notification]struct{} // Active subscribers.
maxSubscribers int
subscriberGauge prometheus.Gauge
notificationsSent prometheus.Counter
@ -44,9 +45,10 @@ type Notifications struct {
}
// NewNotifications creates a new Notifications instance.
func NewNotifications(reg prometheus.Registerer) *Notifications {
func NewNotifications(maxSubscribers int, reg prometheus.Registerer) *Notifications {
n := &Notifications{
subscribers: make(map[chan Notification]struct{}),
maxSubscribers: maxSubscribers,
subscriberGauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "prometheus",
Subsystem: "api",
@ -147,10 +149,16 @@ func (n *Notifications) Get() []Notification {
// Sub allows a client to subscribe to live notifications.
// It returns a channel where the subscriber will receive notifications and a function to unsubscribe.
// Each subscriber has its own goroutine to handle notifications and prevent blocking.
func (n *Notifications) Sub() (<-chan Notification, func()) {
func (n *Notifications) Sub() (<-chan Notification, func(), bool) {
n.mu.Lock()
defer n.mu.Unlock()
if len(n.subscribers) >= n.maxSubscribers {
return nil, nil, false
}
ch := make(chan Notification, 10) // Buffered channel to prevent blocking.
n.mu.Lock()
// Add the new subscriber to the list.
n.subscribers[ch] = struct{}{}
n.subscriberGauge.Set(float64(len(n.subscribers)))
@ -159,7 +167,6 @@ func (n *Notifications) Sub() (<-chan Notification, func()) {
for _, notification := range n.notifications {
ch <- notification
}
n.mu.Unlock()
// Unsubscribe function to remove the channel from subscribers.
unsubscribe := func() {
@ -172,5 +179,5 @@ func (n *Notifications) Sub() (<-chan Notification, func()) {
n.subscriberGauge.Set(float64(len(n.subscribers)))
}
return ch, unsubscribe
return ch, unsubscribe, true
}

View file

@ -23,7 +23,7 @@ import (
// TestNotificationLifecycle tests adding, modifying, and deleting notifications.
func TestNotificationLifecycle(t *testing.T) {
notifs := NewNotifications(nil)
notifs := NewNotifications(10, nil)
// Add a notification.
notifs.AddNotification("Test Notification 1")
@ -47,10 +47,11 @@ func TestNotificationLifecycle(t *testing.T) {
// TestSubscriberReceivesNotifications tests that a subscriber receives notifications, including modifications and deletions.
func TestSubscriberReceivesNotifications(t *testing.T) {
notifs := NewNotifications(nil)
notifs := NewNotifications(10, nil)
// Subscribe to notifications.
sub, unsubscribe := notifs.Sub()
sub, unsubscribe, ok := notifs.Sub()
require.True(t, ok)
var wg sync.WaitGroup
wg.Add(1)
@ -103,12 +104,14 @@ func TestSubscriberReceivesNotifications(t *testing.T) {
// TestMultipleSubscribers tests that multiple subscribers receive notifications independently.
func TestMultipleSubscribers(t *testing.T) {
notifs := NewNotifications(nil)
notifs := NewNotifications(10, nil)
// Subscribe two subscribers to notifications.
sub1, unsubscribe1 := notifs.Sub()
sub1, unsubscribe1, ok1 := notifs.Sub()
require.True(t, ok1)
sub2, unsubscribe2 := notifs.Sub()
sub2, unsubscribe2, ok2 := notifs.Sub()
require.True(t, ok2)
var wg sync.WaitGroup
wg.Add(2)
@ -157,10 +160,11 @@ func TestMultipleSubscribers(t *testing.T) {
// TestUnsubscribe tests that unsubscribing prevents further notifications from being received.
func TestUnsubscribe(t *testing.T) {
notifs := NewNotifications(nil)
notifs := NewNotifications(10, nil)
// Subscribe to notifications.
sub, unsubscribe := notifs.Sub()
sub, unsubscribe, ok := notifs.Sub()
require.True(t, ok)
var wg sync.WaitGroup
wg.Add(1)
@ -190,3 +194,30 @@ func TestUnsubscribe(t *testing.T) {
require.Len(t, receivedNotifications, 1, "Expected 1 notification before unsubscribe.")
require.Equal(t, "Test Notification 1", receivedNotifications[0].Text, "Unexpected notification text.")
}
// TestMaxSubscribers tests that exceeding the max subscribers limit prevents additional subscriptions.
func TestMaxSubscribers(t *testing.T) {
maxSubscribers := 2
notifs := NewNotifications(maxSubscribers, nil)
// Subscribe the maximum number of subscribers.
_, unsubscribe1, ok1 := notifs.Sub()
require.True(t, ok1, "Expected first subscription to succeed.")
_, unsubscribe2, ok2 := notifs.Sub()
require.True(t, ok2, "Expected second subscription to succeed.")
// Try to subscribe more than the max allowed.
_, _, ok3 := notifs.Sub()
require.False(t, ok3, "Expected third subscription to fail due to max subscriber limit.")
// Unsubscribe one subscriber and try again.
unsubscribe1()
_, unsubscribe4, ok4 := notifs.Sub()
require.True(t, ok4, "Expected subscription to succeed after unsubscribing a subscriber.")
// Clean up the subscriptions.
unsubscribe2()
unsubscribe4()
}

View file

@ -215,7 +215,7 @@ type API struct {
isAgent bool
statsRenderer StatsRenderer
notificationsGetter func() []api.Notification
notificationsSub func() (<-chan api.Notification, func())
notificationsSub func() (<-chan api.Notification, func(), bool)
remoteWriteHandler http.Handler
remoteReadHandler http.Handler
@ -250,7 +250,7 @@ func NewAPI(
runtimeInfo func() (RuntimeInfo, error),
buildInfo *PrometheusVersion,
notificationsGetter func() []api.Notification,
notificationsSub func() (<-chan api.Notification, func()),
notificationsSub func() (<-chan api.Notification, func(), bool),
gatherer prometheus.Gatherer,
registerer prometheus.Registerer,
statsRenderer StatsRenderer,
@ -1690,7 +1690,11 @@ func (api *API) notificationsSSE(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Connection", "keep-alive")
// Subscribe to notifications.
notifications, unsubscribe := api.notificationsSub()
notifications, unsubscribe, ok := api.notificationsSub()
if !ok {
w.WriteHeader(http.StatusNoContent)
return
}
defer unsubscribe()
// Set up a flusher to push the response to the client.
@ -1700,6 +1704,10 @@ func (api *API) notificationsSSE(w http.ResponseWriter, r *http.Request) {
return
}
// Flush the response to ensure the headers are immediately and eventSource
// onopen is triggered client-side.
flusher.Flush()
for {
select {
case notification := <-notifications:

View file

@ -25,6 +25,7 @@
"@mantine/dates": "^7.11.2",
"@mantine/hooks": "^7.11.2",
"@mantine/notifications": "^7.11.2",
"@microsoft/fetch-event-source": "^2.0.1",
"@nexucis/fuzzy": "^0.5.1",
"@nexucis/kvsearch": "^0.9.1",
"@prometheus-io/codemirror-promql": "0.300.0-beta.0",

View file

@ -3,6 +3,7 @@ import { useSettings } from '../state/settingsSlice';
import { NotificationsContext } from '../state/useNotifications';
import { Notification, NotificationsResult } from "../api/responseTypes/notifications";
import { useAPIQuery } from '../api/api';
import { fetchEventSource } from '@microsoft/fetch-event-source';
export const NotificationsProvider: React.FC<{ children: React.ReactNode }> = ({ children }) => {
const { pathPrefix } = useSettings();
@ -24,9 +25,24 @@ export const NotificationsProvider: React.FC<{ children: React.ReactNode }> = ({
}, [data, isError]);
useEffect(() => {
const eventSource = new EventSource(`${pathPrefix}/api/v1/notifications/live`);
eventSource.onmessage = (event) => {
const controller = new AbortController();
fetchEventSource(`${pathPrefix}/api/v1/notifications/live`, {
signal: controller.signal,
async onopen(response) {
if (response.ok) {
if (response.status === 200) {
setNotifications([]);
setIsConnectionError(false);
} else if (response.status === 204) {
controller.abort();
setShouldFetchFromAPI(true);
}
} else {
setIsConnectionError(true);
throw new Error(`Unexpected response: ${response.status} ${response.statusText}`);
}
},
onmessage(event) {
const notification: Notification = JSON.parse(event.data);
setNotifications((prev: Notification[]) => {
@ -38,16 +54,18 @@ export const NotificationsProvider: React.FC<{ children: React.ReactNode }> = ({
return updatedNotifications;
});
};
eventSource.onerror = () => {
eventSource.close();
},
onclose() {
throw new Error("Server closed the connection");
},
onerror() {
setIsConnectionError(true);
setShouldFetchFromAPI(true);
};
return 5000;
},
});
return () => {
eventSource.close();
controller.abort();
};
}, [pathPrefix]);

View file

@ -39,6 +39,7 @@
"@mantine/dates": "^7.11.2",
"@mantine/hooks": "^7.11.2",
"@mantine/notifications": "^7.11.2",
"@microsoft/fetch-event-source": "^2.0.1",
"@nexucis/fuzzy": "^0.5.1",
"@nexucis/kvsearch": "^0.9.1",
"@prometheus-io/codemirror-promql": "0.300.0-beta.0",
@ -2255,6 +2256,11 @@
"react": "^18.2.0"
}
},
"node_modules/@microsoft/fetch-event-source": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz",
"integrity": "sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA=="
},
"node_modules/@nexucis/fuzzy": {
"version": "0.5.1",
"resolved": "https://registry.npmjs.org/@nexucis/fuzzy/-/fuzzy-0.5.1.tgz",

View file

@ -268,7 +268,7 @@ type Options struct {
Notifier *notifier.Manager
Version *PrometheusVersion
NotificationsGetter func() []api.Notification
NotificationsSub func() (<-chan api.Notification, func())
NotificationsSub func() (<-chan api.Notification, func(), bool)
Flags map[string]string
ListenAddresses []string