diff --git a/packages/cli/package.json b/packages/cli/package.json index 8cb7d03a72..d7d92d96bc 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -163,7 +163,6 @@ "simple-git": "3.17.0", "source-map-support": "0.5.21", "sqlite3": "5.1.7", - "sse-channel": "4.0.0", "sshpk": "1.17.0", "swagger-ui-express": "5.0.1", "syslog-client": "1.1.1", diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts index c56fa4c042..24cafa8121 100644 --- a/packages/cli/src/push/abstract.push.ts +++ b/packages/cli/src/push/abstract.push.ts @@ -1,8 +1,9 @@ import type { PushPayload, PushType } from '@n8n/api-types'; import { assert, jsonStringify } from 'n8n-workflow'; +import { Service } from 'typedi'; import type { User } from '@/databases/entities/user'; -import type { Logger } from '@/logging/logger.service'; +import { Logger } from '@/logging/logger.service'; import type { OnPushMessage } from '@/push/types'; import { TypedEmitter } from '@/typed-emitter'; @@ -16,6 +17,7 @@ export interface AbstractPushEvents { * * @emits message when a message is received from a client */ +@Service() export abstract class AbstractPush extends TypedEmitter { protected connections: Record = {}; @@ -23,9 +25,12 @@ export abstract class AbstractPush extends TypedEmitter this.pingAll(), 60 * 1000); } protected add(pushRef: string, userId: User['id'], connection: Connection) { @@ -75,6 +80,12 @@ export abstract class AbstractPush extends TypedEmitter(type: Type, data: PushPayload) { this.sendTo(type, data, Object.keys(this.connections)); } diff --git a/packages/cli/src/push/sse.push.ts b/packages/cli/src/push/sse.push.ts index 85d16a3b42..04e39d6d79 100644 --- a/packages/cli/src/push/sse.push.ts +++ b/packages/cli/src/push/sse.push.ts @@ -1,8 +1,6 @@ import { Service } from 'typedi'; import type { User } from '@/databases/entities/user'; -import { Logger } from '@/logging/logger.service'; -import SSEChannel from 'sse-channel'; import { AbstractPush } from './abstract.push'; import type { PushRequest, PushResponse } from './types'; @@ -11,29 +9,41 @@ type Connection = { req: PushRequest; res: PushResponse }; @Service() export class SSEPush extends AbstractPush { - readonly channel = new SSEChannel(); - - readonly connections: Record = {}; - - constructor(logger: Logger) { - super(logger); - - this.channel.on('disconnect', (_, { req }) => { - this.remove(req?.query?.pushRef); - }); - } - add(pushRef: string, userId: User['id'], connection: Connection) { + const { req, res } = connection; + + // Initialize the connection + req.socket.setTimeout(0); + req.socket.setNoDelay(true); + req.socket.setKeepAlive(true); + res.setHeader('Content-Type', 'text/event-stream; charset=UTF-8'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.writeHead(200); + res.write(':ok\n\n'); + res.flush(); + super.add(pushRef, userId, connection); - this.channel.addClient(connection.req, connection.res); + + // When the client disconnects, remove the client + const removeClient = () => this.remove(pushRef); + req.once('end', removeClient); + req.once('close', removeClient); + res.once('finish', removeClient); } protected close({ res }: Connection) { res.end(); - this.channel.removeClient(res); } protected sendToOneConnection(connection: Connection, data: string) { - this.channel.send(data, [connection.res]); + const { res } = connection; + res.write('data: ' + data + '\n\n'); + res.flush(); + } + + protected ping({ res }: Connection) { + res.write(':ping\n\n'); + res.flush(); } } diff --git a/packages/cli/src/push/types.ts b/packages/cli/src/push/types.ts index db9121eecc..b0db44ba1a 100644 --- a/packages/cli/src/push/types.ts +++ b/packages/cli/src/push/types.ts @@ -11,7 +11,15 @@ export type PushRequest = AuthenticatedRequest<{}, {}, {}, { pushRef: string }>; export type SSEPushRequest = PushRequest & { ws: undefined }; export type WebSocketPushRequest = PushRequest & { ws: WebSocket }; -export type PushResponse = Response & { req: PushRequest }; +export type PushResponse = Response & { + req: PushRequest; + /** + * `flush()` is defined in the compression middleware. + * This is necessary because the compression middleware sometimes waits + * for a certain amount of data before sending the data to the client + */ + flush: () => void; +}; export interface OnPushMessage { pushRef: string; diff --git a/packages/cli/src/push/websocket.push.ts b/packages/cli/src/push/websocket.push.ts index dc60d70901..a2ea39c500 100644 --- a/packages/cli/src/push/websocket.push.ts +++ b/packages/cli/src/push/websocket.push.ts @@ -3,7 +3,6 @@ import { Service } from 'typedi'; import type WebSocket from 'ws'; import type { User } from '@/databases/entities/user'; -import { Logger } from '@/logging/logger.service'; import { AbstractPush } from './abstract.push'; @@ -13,13 +12,6 @@ function heartbeat(this: WebSocket) { @Service() export class WebSocketPush extends AbstractPush { - constructor(logger: Logger) { - super(logger); - - // Ping all connected clients every 60 seconds - setInterval(() => this.pingAll(), 60 * 1000); - } - add(pushRef: string, userId: User['id'], connection: WebSocket) { connection.isAlive = true; connection.on('pong', heartbeat); @@ -67,17 +59,12 @@ export class WebSocketPush extends AbstractPush { connection.send(data); } - private pingAll() { - for (const pushRef in this.connections) { - const connection = this.connections[pushRef]; - // If a connection did not respond with a `PONG` in the last 60 seconds, disconnect - if (!connection.isAlive) { - delete this.connections[pushRef]; - return connection.terminate(); - } - - connection.isAlive = false; - connection.ping(); + protected ping(connection: WebSocket): void { + // If a connection did not respond with a `PONG` in the last 60 seconds, disconnect + if (!connection.isAlive) { + return connection.terminate(); } + connection.isAlive = false; + connection.ping(); } } diff --git a/packages/cli/src/sse-channel.d.ts b/packages/cli/src/sse-channel.d.ts deleted file mode 100644 index 6ea435b361..0000000000 --- a/packages/cli/src/sse-channel.d.ts +++ /dev/null @@ -1,17 +0,0 @@ -import type { PushRequest, PushResponse } from './push/types'; - -declare module 'sse-channel' { - declare class Channel { - constructor(); - - on(event: string, handler: (channel: string, res: PushResponse) => void): void; - - removeClient: (res: PushResponse) => void; - - addClient: (req: PushRequest, res: PushResponse) => void; - - send: (msg: string, clients?: PushResponse[]) => void; - } - - export = Channel; -} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3fb4d6dff0..0dca3aee05 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -937,9 +937,6 @@ importers: sqlite3: specifier: 5.1.7 version: 5.1.7 - sse-channel: - specifier: 4.0.0 - version: 4.0.0 sshpk: specifier: 1.17.0 version: 1.17.0 @@ -11081,9 +11078,6 @@ packages: resolution: {integrity: sha512-qC9iz2FlN7DQl3+wjwn3802RTyjCx7sDvfQEXchwa6CWOx07/WVfh91gBmQ9fahw8snwGEWU3xGzOt4tFyHLxg==} engines: {node: '>= 0.6'} - sse-channel@4.0.0: - resolution: {integrity: sha512-I539Tc0gyDTQ2QCSg4v78Flxo/UbqR9x7JoyPcqaPtwo+qzeOw/fF+aPSbk0xTvBQAAAZk7Dlkc8K1bum5GUnw==} - ssh2-sftp-client@7.2.3: resolution: {integrity: sha512-Bmq4Uewu3e0XOwu5bnPbiS5KRQYv+dff5H6+85V4GZrPrt0Fkt1nUH+uXanyAkoNxUpzjnAPEEoLdOaBO9c3xw==} engines: {node: '>=10.24.1'} @@ -24356,8 +24350,6 @@ snapshots: sqlstring@2.3.3: {} - sse-channel@4.0.0: {} - ssh2-sftp-client@7.2.3: dependencies: concat-stream: 2.0.0