diff --git a/packages/@n8n/api-types/src/index.ts b/packages/@n8n/api-types/src/index.ts index a51850cc6c..e6c8830d8e 100644 --- a/packages/@n8n/api-types/src/index.ts +++ b/packages/@n8n/api-types/src/index.ts @@ -7,6 +7,8 @@ export type * from './user'; export type * from './api-keys'; export type { Collaborator } from './push/collaboration'; +export type { HeartbeatMessage } from './push/heartbeat'; +export { createHeartbeatMessage, heartbeatMessageSchema } from './push/heartbeat'; export type { SendWorkerStatusMessage } from './push/worker'; export type { BannerName } from './schemas/bannerName.schema'; diff --git a/packages/@n8n/api-types/src/push/heartbeat.ts b/packages/@n8n/api-types/src/push/heartbeat.ts new file mode 100644 index 0000000000..2a211e704f --- /dev/null +++ b/packages/@n8n/api-types/src/push/heartbeat.ts @@ -0,0 +1,11 @@ +import { z } from 'zod'; + +export const heartbeatMessageSchema = z.object({ + type: z.literal('heartbeat'), +}); + +export type HeartbeatMessage = z.infer; + +export const createHeartbeatMessage = (): HeartbeatMessage => ({ + type: 'heartbeat', +}); diff --git a/packages/cli/src/push/__tests__/websocket.push.test.ts b/packages/cli/src/push/__tests__/websocket.push.test.ts index 64a5526300..038e73dab2 100644 --- a/packages/cli/src/push/__tests__/websocket.push.test.ts +++ b/packages/cli/src/push/__tests__/websocket.push.test.ts @@ -1,4 +1,4 @@ -import type { PushMessage } from '@n8n/api-types'; +import { createHeartbeatMessage, type PushMessage } from '@n8n/api-types'; import { Container } from '@n8n/di'; import { EventEmitter } from 'events'; import { Logger } from 'n8n-core'; @@ -107,7 +107,8 @@ describe('WebSocketPush', () => { expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg); }); - it('emits message event when connection receives data', () => { + it('emits message event when connection receives data', async () => { + jest.useRealTimers(); const mockOnMessageReceived = jest.fn(); webSocketPush.on('message', mockOnMessageReceived); webSocketPush.add(pushRef1, userId, mockWebSocket1); @@ -118,10 +119,30 @@ describe('WebSocketPush', () => { mockWebSocket1.emit('message', buffer); + // Flush the event loop + await new Promise(process.nextTick); + expect(mockOnMessageReceived).toHaveBeenCalledWith({ msg: data, pushRef: pushRef1, userId, }); }); + + it("emits doesn' emit message for client heartbeat", async () => { + const mockOnMessageReceived = jest.fn(); + webSocketPush.on('message', mockOnMessageReceived); + webSocketPush.add(pushRef1, userId, mockWebSocket1); + webSocketPush.add(pushRef2, userId, mockWebSocket2); + + const data = createHeartbeatMessage(); + const buffer = Buffer.from(JSON.stringify(data)); + + mockWebSocket1.emit('message', buffer); + + // Flush the event loop + await new Promise(process.nextTick); + + expect(mockOnMessageReceived).not.toHaveBeenCalled(); + }); }); diff --git a/packages/cli/src/push/websocket.push.ts b/packages/cli/src/push/websocket.push.ts index 297490f6cf..16e28f2453 100644 --- a/packages/cli/src/push/websocket.push.ts +++ b/packages/cli/src/push/websocket.push.ts @@ -1,3 +1,4 @@ +import { heartbeatMessageSchema } from '@n8n/api-types'; import { Service } from '@n8n/di'; import { ApplicationError } from 'n8n-workflow'; import type WebSocket from 'ws'; @@ -18,11 +19,19 @@ export class WebSocketPush extends AbstractPush { super.add(pushRef, userId, connection); - const onMessage = (data: WebSocket.RawData) => { + const onMessage = async (data: WebSocket.RawData) => { try { const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data); + const msg: unknown = JSON.parse(buffer.toString('utf8')); - this.onMessageReceived(pushRef, JSON.parse(buffer.toString('utf8'))); + // Client sends application level heartbeat messages to react + // to connection issues. This is in addition to the protocol + // level ping/pong mechanism used by the server. + if (await this.isClientHeartbeat(msg)) { + return; + } + + this.onMessageReceived(pushRef, msg); } catch (error) { this.errorReporter.error( new ApplicationError('Error parsing push message', { @@ -67,4 +76,10 @@ export class WebSocketPush extends AbstractPush { connection.isAlive = false; connection.ping(); } + + private async isClientHeartbeat(msg: unknown) { + const result = await heartbeatMessageSchema.safeParseAsync(msg); + + return result.success; + } } diff --git a/packages/editor-ui/src/push-connection/useWebSocketClient.ts b/packages/editor-ui/src/push-connection/useWebSocketClient.ts index 5a83d279db..8137b774d5 100644 --- a/packages/editor-ui/src/push-connection/useWebSocketClient.ts +++ b/packages/editor-ui/src/push-connection/useWebSocketClient.ts @@ -1,7 +1,7 @@ import { useHeartbeat } from '@/push-connection/useHeartbeat'; import { useReconnectTimer } from '@/push-connection/useReconnectTimer'; import { ref } from 'vue'; - +import { createHeartbeatMessage } from '@n8n/api-types'; export type UseWebSocketClientOptions = { url: string; onMessage: (data: T) => void; @@ -31,7 +31,7 @@ export const useWebSocketClient = (options: UseWebSocketClientOptions) => const { startHeartbeat, stopHeartbeat } = useHeartbeat({ interval: 30_000, onHeartbeat: () => { - socket.value?.send(JSON.stringify({ type: 'heartbeat' })); + socket.value?.send(JSON.stringify(createHeartbeatMessage())); }, });