mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
refactor(core): Remove sse-channel (no-changelog) (#11207)
This commit is contained in:
parent
3d2fbcfd93
commit
6823e8f2dd
|
@ -163,7 +163,6 @@
|
||||||
"simple-git": "3.17.0",
|
"simple-git": "3.17.0",
|
||||||
"source-map-support": "0.5.21",
|
"source-map-support": "0.5.21",
|
||||||
"sqlite3": "5.1.7",
|
"sqlite3": "5.1.7",
|
||||||
"sse-channel": "4.0.0",
|
|
||||||
"sshpk": "1.17.0",
|
"sshpk": "1.17.0",
|
||||||
"swagger-ui-express": "5.0.1",
|
"swagger-ui-express": "5.0.1",
|
||||||
"syslog-client": "1.1.1",
|
"syslog-client": "1.1.1",
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
import type { PushPayload, PushType } from '@n8n/api-types';
|
import type { PushPayload, PushType } from '@n8n/api-types';
|
||||||
import { assert, jsonStringify } from 'n8n-workflow';
|
import { assert, jsonStringify } from 'n8n-workflow';
|
||||||
|
import { Service } from 'typedi';
|
||||||
|
|
||||||
import type { User } from '@/databases/entities/user';
|
import type { User } from '@/databases/entities/user';
|
||||||
import type { Logger } from '@/logging/logger.service';
|
import { Logger } from '@/logging/logger.service';
|
||||||
import type { OnPushMessage } from '@/push/types';
|
import type { OnPushMessage } from '@/push/types';
|
||||||
import { TypedEmitter } from '@/typed-emitter';
|
import { TypedEmitter } from '@/typed-emitter';
|
||||||
|
|
||||||
|
@ -16,6 +17,7 @@ export interface AbstractPushEvents {
|
||||||
*
|
*
|
||||||
* @emits message when a message is received from a client
|
* @emits message when a message is received from a client
|
||||||
*/
|
*/
|
||||||
|
@Service()
|
||||||
export abstract class AbstractPush<Connection> extends TypedEmitter<AbstractPushEvents> {
|
export abstract class AbstractPush<Connection> extends TypedEmitter<AbstractPushEvents> {
|
||||||
protected connections: Record<string, Connection> = {};
|
protected connections: Record<string, Connection> = {};
|
||||||
|
|
||||||
|
@ -23,9 +25,12 @@ export abstract class AbstractPush<Connection> extends TypedEmitter<AbstractPush
|
||||||
|
|
||||||
protected abstract close(connection: Connection): void;
|
protected abstract close(connection: Connection): void;
|
||||||
protected abstract sendToOneConnection(connection: Connection, data: string): void;
|
protected abstract sendToOneConnection(connection: Connection, data: string): void;
|
||||||
|
protected abstract ping(connection: Connection): void;
|
||||||
|
|
||||||
constructor(protected readonly logger: Logger) {
|
constructor(protected readonly logger: Logger) {
|
||||||
super();
|
super();
|
||||||
|
// Ping all connected clients every 60 seconds
|
||||||
|
setInterval(() => this.pingAll(), 60 * 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected add(pushRef: string, userId: User['id'], connection: Connection) {
|
protected add(pushRef: string, userId: User['id'], connection: Connection) {
|
||||||
|
@ -75,6 +80,12 @@ export abstract class AbstractPush<Connection> extends TypedEmitter<AbstractPush
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private pingAll() {
|
||||||
|
for (const pushRef in this.connections) {
|
||||||
|
this.ping(this.connections[pushRef]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
sendToAll<Type extends PushType>(type: Type, data: PushPayload<Type>) {
|
sendToAll<Type extends PushType>(type: Type, data: PushPayload<Type>) {
|
||||||
this.sendTo(type, data, Object.keys(this.connections));
|
this.sendTo(type, data, Object.keys(this.connections));
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
import { Service } from 'typedi';
|
import { Service } from 'typedi';
|
||||||
|
|
||||||
import type { User } from '@/databases/entities/user';
|
import type { User } from '@/databases/entities/user';
|
||||||
import { Logger } from '@/logging/logger.service';
|
|
||||||
import SSEChannel from 'sse-channel';
|
|
||||||
|
|
||||||
import { AbstractPush } from './abstract.push';
|
import { AbstractPush } from './abstract.push';
|
||||||
import type { PushRequest, PushResponse } from './types';
|
import type { PushRequest, PushResponse } from './types';
|
||||||
|
@ -11,29 +9,41 @@ type Connection = { req: PushRequest; res: PushResponse };
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class SSEPush extends AbstractPush<Connection> {
|
export class SSEPush extends AbstractPush<Connection> {
|
||||||
readonly channel = new SSEChannel();
|
|
||||||
|
|
||||||
readonly connections: Record<string, Connection> = {};
|
|
||||||
|
|
||||||
constructor(logger: Logger) {
|
|
||||||
super(logger);
|
|
||||||
|
|
||||||
this.channel.on('disconnect', (_, { req }) => {
|
|
||||||
this.remove(req?.query?.pushRef);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
add(pushRef: string, userId: User['id'], connection: Connection) {
|
add(pushRef: string, userId: User['id'], connection: Connection) {
|
||||||
|
const { req, res } = connection;
|
||||||
|
|
||||||
|
// Initialize the connection
|
||||||
|
req.socket.setTimeout(0);
|
||||||
|
req.socket.setNoDelay(true);
|
||||||
|
req.socket.setKeepAlive(true);
|
||||||
|
res.setHeader('Content-Type', 'text/event-stream; charset=UTF-8');
|
||||||
|
res.setHeader('Cache-Control', 'no-cache');
|
||||||
|
res.setHeader('Connection', 'keep-alive');
|
||||||
|
res.writeHead(200);
|
||||||
|
res.write(':ok\n\n');
|
||||||
|
res.flush();
|
||||||
|
|
||||||
super.add(pushRef, userId, connection);
|
super.add(pushRef, userId, connection);
|
||||||
this.channel.addClient(connection.req, connection.res);
|
|
||||||
|
// When the client disconnects, remove the client
|
||||||
|
const removeClient = () => this.remove(pushRef);
|
||||||
|
req.once('end', removeClient);
|
||||||
|
req.once('close', removeClient);
|
||||||
|
res.once('finish', removeClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected close({ res }: Connection) {
|
protected close({ res }: Connection) {
|
||||||
res.end();
|
res.end();
|
||||||
this.channel.removeClient(res);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected sendToOneConnection(connection: Connection, data: string) {
|
protected sendToOneConnection(connection: Connection, data: string) {
|
||||||
this.channel.send(data, [connection.res]);
|
const { res } = connection;
|
||||||
|
res.write('data: ' + data + '\n\n');
|
||||||
|
res.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ping({ res }: Connection) {
|
||||||
|
res.write(':ping\n\n');
|
||||||
|
res.flush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,15 @@ export type PushRequest = AuthenticatedRequest<{}, {}, {}, { pushRef: string }>;
|
||||||
export type SSEPushRequest = PushRequest & { ws: undefined };
|
export type SSEPushRequest = PushRequest & { ws: undefined };
|
||||||
export type WebSocketPushRequest = PushRequest & { ws: WebSocket };
|
export type WebSocketPushRequest = PushRequest & { ws: WebSocket };
|
||||||
|
|
||||||
export type PushResponse = Response & { req: PushRequest };
|
export type PushResponse = Response & {
|
||||||
|
req: PushRequest;
|
||||||
|
/**
|
||||||
|
* `flush()` is defined in the compression middleware.
|
||||||
|
* This is necessary because the compression middleware sometimes waits
|
||||||
|
* for a certain amount of data before sending the data to the client
|
||||||
|
*/
|
||||||
|
flush: () => void;
|
||||||
|
};
|
||||||
|
|
||||||
export interface OnPushMessage {
|
export interface OnPushMessage {
|
||||||
pushRef: string;
|
pushRef: string;
|
||||||
|
|
|
@ -3,7 +3,6 @@ import { Service } from 'typedi';
|
||||||
import type WebSocket from 'ws';
|
import type WebSocket from 'ws';
|
||||||
|
|
||||||
import type { User } from '@/databases/entities/user';
|
import type { User } from '@/databases/entities/user';
|
||||||
import { Logger } from '@/logging/logger.service';
|
|
||||||
|
|
||||||
import { AbstractPush } from './abstract.push';
|
import { AbstractPush } from './abstract.push';
|
||||||
|
|
||||||
|
@ -13,13 +12,6 @@ function heartbeat(this: WebSocket) {
|
||||||
|
|
||||||
@Service()
|
@Service()
|
||||||
export class WebSocketPush extends AbstractPush<WebSocket> {
|
export class WebSocketPush extends AbstractPush<WebSocket> {
|
||||||
constructor(logger: Logger) {
|
|
||||||
super(logger);
|
|
||||||
|
|
||||||
// Ping all connected clients every 60 seconds
|
|
||||||
setInterval(() => this.pingAll(), 60 * 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
add(pushRef: string, userId: User['id'], connection: WebSocket) {
|
add(pushRef: string, userId: User['id'], connection: WebSocket) {
|
||||||
connection.isAlive = true;
|
connection.isAlive = true;
|
||||||
connection.on('pong', heartbeat);
|
connection.on('pong', heartbeat);
|
||||||
|
@ -67,17 +59,12 @@ export class WebSocketPush extends AbstractPush<WebSocket> {
|
||||||
connection.send(data);
|
connection.send(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
private pingAll() {
|
protected ping(connection: WebSocket): void {
|
||||||
for (const pushRef in this.connections) {
|
// If a connection did not respond with a `PONG` in the last 60 seconds, disconnect
|
||||||
const connection = this.connections[pushRef];
|
if (!connection.isAlive) {
|
||||||
// If a connection did not respond with a `PONG` in the last 60 seconds, disconnect
|
return connection.terminate();
|
||||||
if (!connection.isAlive) {
|
|
||||||
delete this.connections[pushRef];
|
|
||||||
return connection.terminate();
|
|
||||||
}
|
|
||||||
|
|
||||||
connection.isAlive = false;
|
|
||||||
connection.ping();
|
|
||||||
}
|
}
|
||||||
|
connection.isAlive = false;
|
||||||
|
connection.ping();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
17
packages/cli/src/sse-channel.d.ts
vendored
17
packages/cli/src/sse-channel.d.ts
vendored
|
@ -1,17 +0,0 @@
|
||||||
import type { PushRequest, PushResponse } from './push/types';
|
|
||||||
|
|
||||||
declare module 'sse-channel' {
|
|
||||||
declare class Channel {
|
|
||||||
constructor();
|
|
||||||
|
|
||||||
on(event: string, handler: (channel: string, res: PushResponse) => void): void;
|
|
||||||
|
|
||||||
removeClient: (res: PushResponse) => void;
|
|
||||||
|
|
||||||
addClient: (req: PushRequest, res: PushResponse) => void;
|
|
||||||
|
|
||||||
send: (msg: string, clients?: PushResponse[]) => void;
|
|
||||||
}
|
|
||||||
|
|
||||||
export = Channel;
|
|
||||||
}
|
|
|
@ -937,9 +937,6 @@ importers:
|
||||||
sqlite3:
|
sqlite3:
|
||||||
specifier: 5.1.7
|
specifier: 5.1.7
|
||||||
version: 5.1.7
|
version: 5.1.7
|
||||||
sse-channel:
|
|
||||||
specifier: 4.0.0
|
|
||||||
version: 4.0.0
|
|
||||||
sshpk:
|
sshpk:
|
||||||
specifier: 1.17.0
|
specifier: 1.17.0
|
||||||
version: 1.17.0
|
version: 1.17.0
|
||||||
|
@ -11081,9 +11078,6 @@ packages:
|
||||||
resolution: {integrity: sha512-qC9iz2FlN7DQl3+wjwn3802RTyjCx7sDvfQEXchwa6CWOx07/WVfh91gBmQ9fahw8snwGEWU3xGzOt4tFyHLxg==}
|
resolution: {integrity: sha512-qC9iz2FlN7DQl3+wjwn3802RTyjCx7sDvfQEXchwa6CWOx07/WVfh91gBmQ9fahw8snwGEWU3xGzOt4tFyHLxg==}
|
||||||
engines: {node: '>= 0.6'}
|
engines: {node: '>= 0.6'}
|
||||||
|
|
||||||
sse-channel@4.0.0:
|
|
||||||
resolution: {integrity: sha512-I539Tc0gyDTQ2QCSg4v78Flxo/UbqR9x7JoyPcqaPtwo+qzeOw/fF+aPSbk0xTvBQAAAZk7Dlkc8K1bum5GUnw==}
|
|
||||||
|
|
||||||
ssh2-sftp-client@7.2.3:
|
ssh2-sftp-client@7.2.3:
|
||||||
resolution: {integrity: sha512-Bmq4Uewu3e0XOwu5bnPbiS5KRQYv+dff5H6+85V4GZrPrt0Fkt1nUH+uXanyAkoNxUpzjnAPEEoLdOaBO9c3xw==}
|
resolution: {integrity: sha512-Bmq4Uewu3e0XOwu5bnPbiS5KRQYv+dff5H6+85V4GZrPrt0Fkt1nUH+uXanyAkoNxUpzjnAPEEoLdOaBO9c3xw==}
|
||||||
engines: {node: '>=10.24.1'}
|
engines: {node: '>=10.24.1'}
|
||||||
|
@ -24356,8 +24350,6 @@ snapshots:
|
||||||
|
|
||||||
sqlstring@2.3.3: {}
|
sqlstring@2.3.3: {}
|
||||||
|
|
||||||
sse-channel@4.0.0: {}
|
|
||||||
|
|
||||||
ssh2-sftp-client@7.2.3:
|
ssh2-sftp-client@7.2.3:
|
||||||
dependencies:
|
dependencies:
|
||||||
concat-stream: 2.0.0
|
concat-stream: 2.0.0
|
||||||
|
|
Loading…
Reference in a new issue