feat(core): Add support for WebSockets as an alternative to Server-Sent Events (#5443)

Co-authored-by: Matthijs Knigge <matthijs@volcano.nl>
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-02-10 15:02:47 +01:00 committed by GitHub
parent 5194513850
commit 538984dc2f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 457 additions and 400 deletions

View file

@ -99,6 +99,7 @@
"@types/syslog-client": "^1.1.2",
"@types/uuid": "^8.3.2",
"@types/validator": "^13.7.0",
"@types/ws": "^8.5.4",
"@types/yamljs": "^0.2.31",
"chokidar": "^3.5.2",
"concurrently": "^5.1.0",
@ -196,6 +197,7 @@
"uuid": "^8.3.2",
"validator": "13.7.0",
"winston": "^3.3.3",
"ws": "^8.12.0",
"yamljs": "^0.3.0"
}
}

View file

@ -30,6 +30,8 @@ import { WEBHOOK_METHODS } from '@/WebhookHelpers';
const emptyBuffer = Buffer.alloc(0);
export abstract class AbstractServer {
protected server: Server;
protected app: express.Application;
protected externalHooks: IExternalHooksClass;
@ -73,7 +75,7 @@ export abstract class AbstractServer {
this.activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
}
private async setupCommonMiddlewares() {
private async setupErrorHandlers() {
const { app } = this;
// Augment errors sent to Sentry
@ -82,6 +84,10 @@ export abstract class AbstractServer {
} = await import('@sentry/node');
app.use(requestHandler());
app.use(errorHandler());
}
private async setupCommonMiddlewares() {
const { app } = this;
// Compress the response data
app.use(compression());
@ -147,6 +153,8 @@ export abstract class AbstractServer {
this.app.use(corsMiddleware);
}
protected setupPushServer() {}
private async setupHealthCheck() {
this.app.use((req, res, next) => {
if (!Db.isInitialized) {
@ -392,10 +400,9 @@ export abstract class AbstractServer {
async start(): Promise<void> {
const { app, externalHooks, protocol, sslKey, sslCert } = this;
let server: Server;
if (protocol === 'https' && sslKey && sslCert) {
const https = await import('https');
server = https.createServer(
this.server = https.createServer(
{
key: await readFile(this.sslKey, 'utf8'),
cert: await readFile(this.sslCert, 'utf8'),
@ -404,13 +411,13 @@ export abstract class AbstractServer {
);
} else {
const http = await import('http');
server = http.createServer(app);
this.server = http.createServer(app);
}
const PORT = config.getEnv('port');
const ADDRESS = config.getEnv('listen_address');
server.on('error', (error: Error & { code: string }) => {
this.server.on('error', (error: Error & { code: string }) => {
if (error.code === 'EADDRINUSE') {
console.log(
`n8n's port ${PORT} is already in use. Do you have another instance of n8n running already?`,
@ -419,8 +426,10 @@ export abstract class AbstractServer {
}
});
await new Promise<void>((resolve) => server.listen(PORT, ADDRESS, () => resolve()));
await new Promise<void>((resolve) => this.server.listen(PORT, ADDRESS, () => resolve()));
await this.setupErrorHandlers();
this.setupPushServer();
await this.setupCommonMiddlewares();
if (inDevelopment) {
this.setupDevMiddlewares();

View file

@ -449,56 +449,6 @@ export interface IInternalHooksClass {
onApiKeyDeleted(apiKeyDeletedData: { user: User; public_api: boolean }): Promise<void>;
}
export interface IN8nConfig {
database: IN8nConfigDatabase;
endpoints: IN8nConfigEndpoints;
executions: IN8nConfigExecutions;
generic: IN8nConfigGeneric;
host: string;
nodes: IN8nConfigNodes;
port: number;
protocol: 'http' | 'https';
}
export interface IN8nConfigDatabase {
type: DatabaseType;
postgresdb: {
host: string;
password: string;
port: number;
user: string;
};
}
export interface IN8nConfigEndpoints {
rest: string;
webhook: string;
webhookTest: string;
}
// eslint-disable-next-line import/export
export interface IN8nConfigExecutions {
saveDataOnError: SaveExecutionDataType;
saveDataOnSuccess: SaveExecutionDataType;
saveDataManualExecutions: boolean;
}
// eslint-disable-next-line import/export
export interface IN8nConfigExecutions {
saveDataOnError: SaveExecutionDataType;
saveDataOnSuccess: SaveExecutionDataType;
saveDataManualExecutions: boolean;
}
export interface IN8nConfigGeneric {
timezone: string;
}
export interface IN8nConfigNodes {
errorTriggerType: string;
exclude: string[];
}
export interface IVersionNotificationSettings {
enabled: boolean;
endpoint: string;
@ -550,6 +500,7 @@ export interface IN8nUISettings {
onboardingCallPromptEnabled: boolean;
missingPackages?: boolean;
executionMode: 'regular' | 'queue';
pushBackend: 'sse' | 'websocket';
communityNodesEnabled: boolean;
deployment: {
type: string;

View file

@ -1,83 +0,0 @@
import SSEChannel from 'sse-channel';
import type { Request, Response } from 'express';
import { LoggerProxy as Logger } from 'n8n-workflow';
import type { IPushData, IPushDataType } from '@/Interfaces';
export class Push {
private channel = new SSEChannel();
private connections: Record<string, Response> = {};
constructor() {
this.channel.on('disconnect', (channel: string, res: Response) => {
if (res.req !== undefined) {
const { sessionId } = res.req.query;
Logger.debug('Remove editor-UI session', { sessionId });
delete this.connections[sessionId as string];
}
});
}
/**
* Adds a new push connection
*
* @param {string} sessionId The id of the session
* @param {Request} req The request
* @param {Response} res The response
*/
add(sessionId: string, req: Request, res: Response) {
Logger.debug('Add editor-UI session', { sessionId });
if (this.connections[sessionId] !== undefined) {
// Make sure to remove existing connection with the same session
// id if one exists already
this.connections[sessionId].end();
this.channel.removeClient(this.connections[sessionId]);
}
this.connections[sessionId] = res;
this.channel.addClient(req, res);
}
/**
* Sends data to the client which is connected via a specific session
*
* @param {string} sessionId The session id of client to send data to
* @param {string} type Type of data to send
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
send(type: IPushDataType, data: any, sessionId?: string) {
if (sessionId !== undefined && this.connections[sessionId] === undefined) {
Logger.error(`The session "${sessionId}" is not registered.`, { sessionId });
return;
}
Logger.debug(`Send data of type "${type}" to editor-UI`, { dataType: type, sessionId });
const sendData: IPushData = {
type,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
data,
};
if (sessionId === undefined) {
// Send to all connected clients
this.channel.send(JSON.stringify(sendData));
} else {
// Send only to a specific client
this.channel.send(JSON.stringify(sendData), [this.connections[sessionId]]);
}
}
}
let activePushInstance: Push | undefined;
export function getInstance(): Push {
if (activePushInstance === undefined) {
activePushInstance = new Push();
}
return activePushInstance;
}

View file

@ -3,7 +3,7 @@ import { realpath } from 'fs/promises';
import type { LoadNodesAndCredentialsClass } from '@/LoadNodesAndCredentials';
import type { NodeTypesClass } from '@/NodeTypes';
import type { Push } from '@/Push';
import type { Push } from '@/push';
export const reloadNodesAndCredentials = async (
loadNodesAndCredentials: LoadNodesAndCredentialsClass,

View file

@ -1,31 +1,15 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
/* eslint-disable @typescript-eslint/no-unnecessary-type-assertion */
/* eslint-disable @typescript-eslint/no-use-before-define */
/* eslint-disable @typescript-eslint/await-thenable */
/* eslint-disable new-cap */
/* eslint-disable prefer-const */
/* eslint-disable @typescript-eslint/no-invalid-void-type */
/* eslint-disable no-return-assign */
/* eslint-disable no-param-reassign */
/* eslint-disable consistent-return */
/* eslint-disable @typescript-eslint/restrict-template-expressions */
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable id-denylist */
/* eslint-disable no-console */
/* eslint-disable global-require */
/* eslint-disable @typescript-eslint/no-var-requires */
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-return */
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable no-continue */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable no-restricted-syntax */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable import/no-dynamic-require */
/* eslint-disable no-await-in-loop */
import { exec as callbackExec } from 'child_process';
import { access as fsAccess } from 'fs/promises';
@ -60,7 +44,6 @@ import type {
INodeTypeNameVersion,
ITelemetrySettings,
WorkflowExecuteMode,
INodeTypes,
ICredentialTypes,
} from 'n8n-workflow';
import { LoggerProxy, jsonParse } from 'n8n-workflow';
@ -78,7 +61,6 @@ import { getSharedWorkflowIds } from '@/WorkflowHelpers';
import { nodesController } from '@/api/nodes.api';
import { workflowsController } from '@/workflows/workflows.controller';
import {
AUTH_COOKIE_NAME,
EDITOR_UI_DIST_DIR,
GENERATED_STATIC_DIR,
inDevelopment,
@ -106,7 +88,6 @@ import {
PasswordResetController,
UsersController,
} from '@/controllers';
import { resolveJwt } from '@/auth/jwt';
import { executionsController } from '@/executions/executions.controller';
import { nodeTypesController } from '@/api/nodeTypes.api';
@ -143,7 +124,6 @@ import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import type { LoadNodesAndCredentialsClass } from '@/LoadNodesAndCredentials';
import type { NodeTypesClass } from '@/NodeTypes';
import { NodeTypes } from '@/NodeTypes';
import * as Push from '@/Push';
import * as ResponseHelper from '@/ResponseHelper';
import type { WaitTrackerClass } from '@/WaitTracker';
import { WaitTracker } from '@/WaitTracker';
@ -155,7 +135,9 @@ import { eventBusRouter } from '@/eventbus/eventBusRoutes';
import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBusHelper';
import { getLicense } from '@/License';
import { licenseController } from './license/license.controller';
import { corsMiddleware, setupAuthMiddlewares } from './middlewares';
import type { Push } from '@/push';
import { getPushInstance, setupPushServer, setupPushHandler } from '@/push';
import { setupAuthMiddlewares } from './middlewares';
import { initEvents } from './events';
import { ldapController } from './Ldap/routes/ldap.controller.ee';
import { getLdapLoginLabel, isLdapEnabled, isLdapLoginEnabled } from './Ldap/helpers';
@ -183,7 +165,7 @@ class Server extends AbstractServer {
credentialTypes: ICredentialTypes;
push: Push.Push;
push: Push;
constructor() {
super();
@ -198,12 +180,12 @@ class Server extends AbstractServer {
this.presetCredentialsLoaded = false;
this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');
this.push = getPushInstance();
if (process.env.E2E_TESTS === 'true') {
this.app.use('/e2e', require('./api/e2e.api').e2eController);
}
this.push = Push.getInstance();
const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl();
const telemetrySettings: ITelemetrySettings = {
enabled: config.getEnv('diagnostics.enabled'),
@ -280,6 +262,7 @@ class Server extends AbstractServer {
},
onboardingCallPromptEnabled: config.getEnv('onboardingCallPrompt.enabled'),
executionMode: config.getEnv('executions.mode'),
pushBackend: config.getEnv('push.backend'),
communityNodesEnabled: config.getEnv('nodes.communityPackages.enabled'),
deployment: {
type: config.getEnv('deployment.type'),
@ -434,26 +417,8 @@ class Server extends AbstractServer {
// Parse cookies for easier access
this.app.use(cookieParser());
// Get push connections
this.app.use(`/${this.restEndpoint}/push`, corsMiddleware, async (req, res, next) => {
const { sessionId } = req.query;
if (sessionId === undefined) {
next(new Error('The query parameter "sessionId" is missing!'));
return;
}
if (isUserManagementEnabled()) {
try {
const authCookie = req.cookies?.[AUTH_COOKIE_NAME] ?? '';
await resolveJwt(authCookie);
} catch (error) {
res.status(401).send('Unauthorized');
return;
}
}
this.push.add(sessionId as string, req, res);
});
const { restEndpoint, app } = this;
setupPushHandler(restEndpoint, app, isUserManagementEnabled());
// Make sure that Vue history mode works properly
this.app.use(
@ -1324,6 +1289,11 @@ class Server extends AbstractServer {
this.app.use('/', express.static(GENERATED_STATIC_DIR));
}
}
protected setupPushServer(): void {
const { restEndpoint, server, app } = this;
setupPushServer(restEndpoint, server, app);
}
}
export async function start(): Promise<void> {

View file

@ -14,14 +14,15 @@ import type {
WorkflowExecuteMode,
} from 'n8n-workflow';
import type { IResponseCallbackData, IWorkflowDb } from '@/Interfaces';
import * as Push from '@/Push';
import type { Push } from '@/push';
import { getPushInstance } from '@/push';
import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers';
const WEBHOOK_TEST_UNREGISTERED_HINT =
"Click the 'Execute workflow' button on the canvas, then try again. (In test mode, the webhook only works for one call after you click this button)";
export class TestWebhooks {
class TestWebhooks {
private testWebhookData: {
[key: string]: {
sessionId?: string;
@ -32,18 +33,14 @@ export class TestWebhooks {
};
} = {};
private activeWebhooks: ActiveWebhooks | null = null;
constructor() {
this.activeWebhooks = new ActiveWebhooks();
this.activeWebhooks.testWebhooks = true;
constructor(private activeWebhooks: ActiveWebhooks, private push: Push) {
activeWebhooks.testWebhooks = true;
}
/**
* Executes a test-webhook and returns the data. It also makes sure that the
* data gets additionally send to the UI. After the request got handled it
* automatically remove the test-webhook.
*
*/
async callTestWebhook(
httpMethod: WebhookHttpMethod,
@ -59,14 +56,16 @@ export class TestWebhooks {
path = path.slice(0, -1);
}
let webhookData: IWebhookData | undefined = this.activeWebhooks!.get(httpMethod, path);
const { activeWebhooks, push, testWebhookData } = this;
let webhookData: IWebhookData | undefined = activeWebhooks.get(httpMethod, path);
// check if path is dynamic
if (webhookData === undefined) {
const pathElements = path.split('/');
const webhookId = pathElements.shift();
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
webhookData = this.activeWebhooks!.get(httpMethod, pathElements.join('/'), webhookId);
webhookData = activeWebhooks.get(httpMethod, pathElements.join('/'), webhookId);
if (webhookData === undefined) {
// The requested webhook is not registered
throw new ResponseHelper.NotFoundError(
@ -85,14 +84,15 @@ export class TestWebhooks {
});
}
const webhookKey = `${this.activeWebhooks!.getWebhookKey(
const { workflowId } = webhookData;
const webhookKey = `${activeWebhooks.getWebhookKey(
webhookData.httpMethod,
webhookData.path,
webhookData.webhookId,
)}|${webhookData.workflowId}`;
)}|${workflowId}`;
// TODO: Clean that duplication up one day and improve code generally
if (this.testWebhookData[webhookKey] === undefined) {
if (testWebhookData[webhookKey] === undefined) {
// The requested webhook is not registered
throw new ResponseHelper.NotFoundError(
`The requested webhook "${httpMethod} ${path}" is not registered.`,
@ -100,7 +100,8 @@ export class TestWebhooks {
);
}
const { workflow } = this.testWebhookData[webhookKey];
const { destinationNode, sessionId, workflow, workflowData, timeout } =
testWebhookData[webhookKey];
// Get the node which has the webhook defined to know where to start from and to
// get additional data
@ -116,61 +117,46 @@ export class TestWebhooks {
const executionId = await WebhookHelpers.executeWebhook(
workflow,
webhookData!,
this.testWebhookData[webhookKey].workflowData,
workflowData,
workflowStartNode,
executionMode,
this.testWebhookData[webhookKey].sessionId,
sessionId,
undefined,
undefined,
request,
response,
(error: Error | null, data: IResponseCallbackData) => {
if (error !== null) {
return reject(error);
}
resolve(data);
if (error !== null) reject(error);
else resolve(data);
},
this.testWebhookData[webhookKey].destinationNode,
destinationNode,
);
if (executionId === undefined) {
// The workflow did not run as the request was probably setup related
// or a ping so do not resolve the promise and wait for the real webhook
// request instead.
return;
}
// The workflow did not run as the request was probably setup related
// or a ping so do not resolve the promise and wait for the real webhook
// request instead.
if (executionId === undefined) return;
// Inform editor-ui that webhook got received
if (this.testWebhookData[webhookKey].sessionId !== undefined) {
const pushInstance = Push.getInstance();
pushInstance.send(
'testWebhookReceived',
{ workflowId: webhookData!.workflowId, executionId },
this.testWebhookData[webhookKey].sessionId,
);
if (sessionId !== undefined) {
push.send('testWebhookReceived', { workflowId, executionId }, sessionId);
}
} catch (error) {
} finally {
// Delete webhook also if an error is thrown
}
if (timeout) clearTimeout(timeout);
delete testWebhookData[webhookKey];
// Remove the webhook
if (this.testWebhookData[webhookKey]) {
clearTimeout(this.testWebhookData[webhookKey].timeout);
delete this.testWebhookData[webhookKey];
await activeWebhooks.removeWorkflow(workflow);
}
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.activeWebhooks!.removeWorkflow(workflow);
});
}
/**
* Gets all request methods associated with a single test webhook
* @param path webhook path
*/
async getWebhookMethods(path: string): Promise<string[]> {
const webhookMethods: string[] = this.activeWebhooks!.getWebhookMethods(path);
if (webhookMethods === undefined) {
const webhookMethods = this.activeWebhooks.getWebhookMethods(path);
if (!webhookMethods.length) {
// The requested webhook is not registered
throw new ResponseHelper.NotFoundError(
`The requested webhook "${path}" is not registered.`,
@ -182,10 +168,8 @@ export class TestWebhooks {
}
/**
* Checks if it has to wait for webhook data to execute the workflow. If yes it waits
* for it and resolves with the result of the workflow if not it simply resolves
* with undefined
*
* Checks if it has to wait for webhook data to execute the workflow.
* If yes it waits for it and resolves with the result of the workflow if not it simply resolves with undefined
*/
async needsWebhookData(
workflowData: IWorkflowDb,
@ -216,11 +200,13 @@ export class TestWebhooks {
this.cancelTestWebhook(workflowData.id);
}, 120000);
const { activeWebhooks, testWebhookData } = this;
let key: string;
const activatedKey: string[] = [];
// eslint-disable-next-line no-restricted-syntax
for (const webhookData of webhooks) {
key = `${this.activeWebhooks!.getWebhookKey(
key = `${activeWebhooks.getWebhookKey(
webhookData.httpMethod,
webhookData.path,
webhookData.webhookId,
@ -228,7 +214,7 @@ export class TestWebhooks {
activatedKey.push(key);
this.testWebhookData[key] = {
testWebhookData[key] = {
sessionId,
timeout,
workflow,
@ -238,11 +224,11 @@ export class TestWebhooks {
try {
// eslint-disable-next-line no-await-in-loop
await this.activeWebhooks!.add(workflow, webhookData, mode, activation);
await activeWebhooks.add(workflow, webhookData, mode, activation);
} catch (error) {
activatedKey.forEach((deleteKey) => delete this.testWebhookData[deleteKey]);
activatedKey.forEach((deleteKey) => delete testWebhookData[deleteKey]);
// eslint-disable-next-line no-await-in-loop
await this.activeWebhooks!.removeWorkflow(workflow);
await activeWebhooks.removeWorkflow(workflow);
throw error;
}
}
@ -256,40 +242,34 @@ export class TestWebhooks {
*/
cancelTestWebhook(workflowId: string): boolean {
let foundWebhook = false;
const { activeWebhooks, push, testWebhookData } = this;
// eslint-disable-next-line no-restricted-syntax
for (const webhookKey of Object.keys(this.testWebhookData)) {
const webhookData = this.testWebhookData[webhookKey];
for (const webhookKey of Object.keys(testWebhookData)) {
const { sessionId, timeout, workflow, workflowData } = testWebhookData[webhookKey];
if (webhookData.workflowData.id !== workflowId) {
if (workflowData.id !== workflowId) {
// eslint-disable-next-line no-continue
continue;
}
clearTimeout(this.testWebhookData[webhookKey].timeout);
clearTimeout(timeout);
// Inform editor-ui that webhook got received
if (this.testWebhookData[webhookKey].sessionId !== undefined) {
if (sessionId !== undefined) {
try {
const pushInstance = Push.getInstance();
pushInstance.send(
'testWebhookDeleted',
{ workflowId },
this.testWebhookData[webhookKey].sessionId,
);
} catch (error) {
push.send('testWebhookDeleted', { workflowId }, sessionId);
} catch {
// Could not inform editor, probably is not connected anymore. So simply go on.
}
}
const { workflow } = this.testWebhookData[webhookKey];
// Remove the webhook
delete this.testWebhookData[webhookKey];
delete testWebhookData[webhookKey];
if (!foundWebhook) {
// As it removes all webhooks of the workflow execute only once
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.activeWebhooks!.removeWorkflow(workflow);
activeWebhooks.removeWorkflow(workflow);
}
foundWebhook = true;
@ -302,18 +282,7 @@ export class TestWebhooks {
* Removes all the currently active test webhooks
*/
async removeAll(): Promise<void> {
if (this.activeWebhooks === null) {
return;
}
let workflow: Workflow;
const workflows: Workflow[] = [];
// eslint-disable-next-line no-restricted-syntax
for (const webhookKey of Object.keys(this.testWebhookData)) {
workflow = this.testWebhookData[webhookKey].workflow;
workflows.push(workflow);
}
const workflows = Object.values(this.testWebhookData).map(({ workflow }) => workflow);
return this.activeWebhooks.removeAll(workflows);
}
}
@ -322,7 +291,7 @@ let testWebhooksInstance: TestWebhooks | undefined;
export function getInstance(): TestWebhooks {
if (testWebhooksInstance === undefined) {
testWebhooksInstance = new TestWebhooks();
testWebhooksInstance = new TestWebhooks(new ActiveWebhooks(), getPushInstance());
}
return testWebhooksInstance;

View file

@ -60,7 +60,7 @@ import type {
} from '@/Interfaces';
import { InternalHooksManager } from '@/InternalHooksManager';
import { NodeTypes } from '@/NodeTypes';
import * as Push from '@/Push';
import { getPushInstance } from '@/push';
import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowHelpers from '@/WorkflowHelpers';
@ -250,76 +250,67 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
return {
nodeExecuteBefore: [
async function (this: WorkflowHooks, nodeName: string): Promise<void> {
const { sessionId, executionId } = this;
// Push data to session which started workflow before each
// node which starts rendering
if (this.sessionId === undefined) {
if (sessionId === undefined) {
return;
}
Logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId: this.executionId,
sessionId: this.sessionId,
executionId,
sessionId,
workflowId: this.workflowData.id,
});
const pushInstance = Push.getInstance();
pushInstance.send(
'nodeExecuteBefore',
{
executionId: this.executionId,
nodeName,
},
this.sessionId,
);
const pushInstance = getPushInstance();
pushInstance.send('nodeExecuteBefore', { executionId, nodeName }, sessionId);
},
],
nodeExecuteAfter: [
async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise<void> {
const { sessionId, executionId } = this;
// Push data to session which started workflow after each rendered node
if (this.sessionId === undefined) {
if (sessionId === undefined) {
return;
}
Logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
executionId: this.executionId,
sessionId: this.sessionId,
executionId,
sessionId,
workflowId: this.workflowData.id,
});
const pushInstance = Push.getInstance();
pushInstance.send(
'nodeExecuteAfter',
{
executionId: this.executionId,
nodeName,
data,
},
this.sessionId,
);
const pushInstance = getPushInstance();
pushInstance.send('nodeExecuteAfter', { executionId, nodeName, data }, sessionId);
},
],
workflowExecuteBefore: [
async function (this: WorkflowHooks): Promise<void> {
const { sessionId, executionId } = this;
const { id: workflowId, name: workflowName } = this.workflowData;
Logger.debug('Executing hook (hookFunctionsPush)', {
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
executionId,
sessionId,
workflowId,
});
// Push data to session which started the workflow
if (this.sessionId === undefined) {
if (sessionId === undefined) {
return;
}
const pushInstance = Push.getInstance();
const pushInstance = getPushInstance();
pushInstance.send(
'executionStarted',
{
executionId: this.executionId,
executionId,
mode: this.mode,
startedAt: new Date(),
retryOf: this.retryOf,
workflowId: this.workflowData.id,
sessionId: this.sessionId,
workflowName: this.workflowData.name,
workflowId,
sessionId,
workflowName,
},
this.sessionId,
sessionId,
);
},
],
@ -329,13 +320,15 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
fullRunData: IRun,
newStaticData: IDataObject,
): Promise<void> {
const { sessionId, executionId, retryOf } = this;
const { id: workflowId } = this.workflowData;
Logger.debug('Executing hook (hookFunctionsPush)', {
executionId: this.executionId,
sessionId: this.sessionId,
workflowId: this.workflowData.id,
executionId,
sessionId,
workflowId,
});
// Push data to session which started the workflow
if (this.sessionId === undefined) {
if (sessionId === undefined) {
return;
}
@ -354,19 +347,19 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
};
// Push data to editor-ui once workflow finished
Logger.debug(`Save execution progress to database for execution ID ${this.executionId} `, {
executionId: this.executionId,
workflowId: this.workflowData.id,
Logger.debug(`Save execution progress to database for execution ID ${executionId} `, {
executionId,
workflowId,
});
// TODO: Look at this again
const sendData: IPushDataExecutionFinished = {
executionId: this.executionId,
executionId,
data: pushRunData,
retryOf: this.retryOf,
retryOf,
};
const pushInstance = Push.getInstance();
pushInstance.send('executionFinished', sendData, this.sessionId);
const pushInstance = getPushInstance();
pushInstance.send('executionFinished', sendData, sessionId);
},
],
};
@ -1041,11 +1034,11 @@ async function executeWorkflow(
if (data.finished === true) {
// Workflow did finish successfully
await ActiveExecutions.getInstance().remove(executionId, data);
ActiveExecutions.getInstance().remove(executionId, data);
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
return returnData!.data!.main;
}
await ActiveExecutions.getInstance().remove(executionId, data);
ActiveExecutions.getInstance().remove(executionId, data);
// Workflow did fail
const { error } = data.data.resultData;
// eslint-disable-next-line @typescript-eslint/no-throw-literal
@ -1057,20 +1050,21 @@ async function executeWorkflow(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function sendMessageToUI(source: string, messages: any[]) {
if (this.sessionId === undefined) {
const { sessionId } = this;
if (sessionId === undefined) {
return;
}
// Push data to session which started workflow
try {
const pushInstance = Push.getInstance();
const pushInstance = getPushInstance();
pushInstance.send(
'sendConsoleMessage',
{
source: `[Node: "${source}"]`,
messages,
},
this.sessionId,
sessionId,
);
} catch (error) {
Logger.warn(`There was a problem sending message to UI: ${error.message}`);

View file

@ -44,7 +44,6 @@ import type {
IWorkflowExecutionDataProcessWithExecution,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import * as Push from '@/Push';
import * as Queue from '@/Queue';
import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers';
@ -54,16 +53,18 @@ import { InternalHooksManager } from '@/InternalHooksManager';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { initErrorHandling } from '@/ErrorReporting';
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import type { Push } from '@/push';
import { getPushInstance } from '@/push';
export class WorkflowRunner {
activeExecutions: ActiveExecutions.ActiveExecutions;
push: Push.Push;
push: Push;
jobQueue: Queue.JobQueue;
constructor() {
this.push = Push.getInstance();
this.push = getPushInstance();
this.activeExecutions = ActiveExecutions.getInstance();
}

View file

@ -4,7 +4,6 @@ import type { PublicInstalledPackage } from 'n8n-workflow';
import config from '@/config';
import { InternalHooksManager } from '@/InternalHooksManager';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import * as Push from '@/Push';
import * as ResponseHelper from '@/ResponseHelper';
import {
@ -34,6 +33,7 @@ import { isAuthenticatedRequest } from '@/UserManagement/UserManagementHelper';
import type { InstalledPackages } from '@db/entities/InstalledPackages';
import type { CommunityPackages } from '@/Interfaces';
import type { NodeRequest } from '@/requests';
import { getPushInstance } from '@/push';
const { PACKAGE_NOT_INSTALLED, PACKAGE_NAME_NOT_PROVIDED } = RESPONSE_ERROR_MESSAGES;
@ -141,7 +141,7 @@ nodesController.post(
if (!hasLoaded) removePackageFromMissingList(name);
const pushInstance = Push.getInstance();
const pushInstance = getPushInstance();
// broadcast to connected frontends that node list has been updated
installedPackage.installedNodes.forEach((node) => {
@ -248,7 +248,7 @@ nodesController.delete(
throw new ResponseHelper.InternalServerError(message);
}
const pushInstance = Push.getInstance();
const pushInstance = getPushInstance();
// broadcast to connected frontends that node list has been updated
installedPackage.installedNodes.forEach((node) => {
@ -295,7 +295,7 @@ nodesController.patch(
previouslyInstalledPackage,
);
const pushInstance = Push.getInstance();
const pushInstance = getPushInstance();
// broadcast to connected frontends that node list has been updated
previouslyInstalledPackage.installedNodes.forEach((node) => {
@ -325,7 +325,7 @@ nodesController.patch(
return newInstalledPackage;
} catch (error) {
previouslyInstalledPackage.installedNodes.forEach((node) => {
const pushInstance = Push.getInstance();
const pushInstance = getPushInstance();
pushInstance.send('removeNodeType', {
name: node.type,
version: node.latestVersion,

View file

@ -925,6 +925,15 @@ export const schema = {
},
},
push: {
backend: {
format: ['sse', 'websocket'] as const,
default: 'sse',
env: 'N8N_PUSH_BACKEND',
doc: 'Backend to use for push notifications',
},
},
binaryDataManager: {
availableModes: {
format: String,

View file

@ -16,7 +16,6 @@ import * as ActiveExecutions from './ActiveExecutions';
import * as ActiveWorkflowRunner from './ActiveWorkflowRunner';
import * as Db from './Db';
import * as GenericHelpers from './GenericHelpers';
import * as Push from './Push';
import * as ResponseHelper from './ResponseHelper';
import * as Server from './Server';
import * as TestWebhooks from './TestWebhooks';
@ -30,7 +29,6 @@ export {
ActiveWorkflowRunner,
Db,
GenericHelpers,
Push,
ResponseHelper,
Server,
TestWebhooks,

View file

@ -0,0 +1,49 @@
import { LoggerProxy as Logger } from 'n8n-workflow';
import type { IPushDataType } from '@/Interfaces';
export abstract class AbstractPush<T> {
protected connections: Record<string, T> = {};
protected abstract close(connection: T): void;
protected abstract sendToOne(connection: T, data: string): void;
protected add(sessionId: string, connection: T): void {
const { connections } = this;
Logger.debug('Add editor-UI session', { sessionId });
const existingConnection = connections[sessionId];
if (existingConnection) {
// Make sure to remove existing connection with the same id
this.close(existingConnection);
}
connections[sessionId] = connection;
}
protected remove(sessionId?: string): void {
if (sessionId !== undefined) {
Logger.debug('Remove editor-UI session', { sessionId });
delete this.connections[sessionId];
}
}
send<D>(type: IPushDataType, data: D, sessionId: string | undefined = undefined) {
const { connections } = this;
if (sessionId !== undefined && connections[sessionId] === undefined) {
Logger.error(`The session "${sessionId}" is not registered.`, { sessionId });
return;
}
Logger.debug(`Send data of type "${type}" to editor-UI`, { dataType: type, sessionId });
const sendData = JSON.stringify({ type, data });
if (sessionId === undefined) {
// Send to all connected clients
Object.values(connections).forEach((connection) => this.sendToOne(connection, sendData));
} else {
// Send only to a specific client
this.sendToOne(connections[sessionId], sendData);
}
}
}

View file

@ -0,0 +1,105 @@
import { ServerResponse } from 'http';
import type { Server } from 'http';
import type { Socket } from 'net';
import type { Application, RequestHandler } from 'express';
import { Server as WSServer } from 'ws';
import { parse as parseUrl } from 'url';
import config from '@/config';
import { resolveJwt } from '@/auth/jwt';
import { AUTH_COOKIE_NAME } from '@/constants';
import { SSEPush } from './sse.push';
import { WebSocketPush } from './websocket.push';
import type { Push, PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
export type { Push } from './types';
const useWebSockets = config.getEnv('push.backend') === 'websocket';
let pushInstance: Push;
export const getPushInstance = () => {
if (!pushInstance) pushInstance = useWebSockets ? new WebSocketPush() : new SSEPush();
return pushInstance;
};
export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => {
if (useWebSockets) {
const wsServer = new WSServer({ noServer: true });
server.on('upgrade', (request: WebSocketPushRequest, socket: Socket, head) => {
if (parseUrl(request.url).pathname === `/${restEndpoint}/push`) {
wsServer.handleUpgrade(request, socket, head, (ws) => {
request.ws = ws;
const response = new ServerResponse(request);
response.writeHead = (statusCode) => {
if (statusCode > 200) ws.close();
return response;
};
// @ts-ignore
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
app.handle(request, response);
});
}
});
}
};
export const setupPushHandler = (
restEndpoint: string,
app: Application,
isUserManagementEnabled: boolean,
) => {
const push = getPushInstance();
const endpoint = `/${restEndpoint}/push`;
const pushValidationMiddleware: RequestHandler = async (
req: SSEPushRequest | WebSocketPushRequest,
res,
next,
) => {
const ws = req.ws;
const { sessionId } = req.query;
if (sessionId === undefined) {
if (ws) {
ws.send('The query parameter "sessionId" is missing!');
ws.close(400);
} else {
next(new Error('The query parameter "sessionId" is missing!'));
}
return;
}
// Handle authentication
if (isUserManagementEnabled) {
try {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access
const authCookie: string = req.cookies?.[AUTH_COOKIE_NAME] ?? '';
await resolveJwt(authCookie);
} catch (error) {
if (ws) {
ws.send(`Unauthorized: ${(error as Error).message}`);
ws.close(401);
} else {
res.status(401).send('Unauthorized');
}
return;
}
}
next();
};
app.use(
endpoint,
pushValidationMiddleware,
(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) => {
if (req.ws) {
(push as WebSocketPush).add(req.query.sessionId, req.ws);
} else if (!useWebSockets) {
(push as SSEPush).add(req.query.sessionId, { req, res });
} else {
res.status(401).send('Unauthorized');
}
},
);
};

View file

@ -0,0 +1,32 @@
import SSEChannel from 'sse-channel';
import { AbstractPush } from './abstract.push';
import type { PushRequest, PushResponse } from './types';
type Connection = { req: PushRequest; res: PushResponse };
export class SSEPush extends AbstractPush<Connection> {
readonly channel = new SSEChannel();
readonly connections: Record<string, Connection> = {};
constructor() {
super();
this.channel.on('disconnect', (channel, { req }) => {
this.remove(req?.query?.sessionId);
});
}
add(sessionId: string, connection: Connection) {
super.add(sessionId, connection);
this.channel.addClient(connection.req, connection.res);
}
protected close({ res }: Connection): void {
res.end();
this.channel.removeClient(res);
}
protected sendToOne(connection: Connection, data: string): void {
this.channel.send(data, [connection.res]);
}
}

View file

@ -0,0 +1,15 @@
import type { Request, Response } from 'express';
import type { WebSocket } from 'ws';
import type { SSEPush } from './sse.push';
import type { WebSocketPush } from './websocket.push';
// TODO: move all push related types here
export type Push = SSEPush | WebSocketPush;
export type PushRequest = Request<{}, {}, {}, { sessionId: string }>;
export type SSEPushRequest = PushRequest & { ws: undefined };
export type WebSocketPushRequest = PushRequest & { ws: WebSocket };
export type PushResponse = Response & { req: PushRequest };

View file

@ -0,0 +1,19 @@
import type WebSocket from 'ws';
import { AbstractPush } from './abstract.push';
export class WebSocketPush extends AbstractPush<WebSocket> {
add(sessionId: string, connection: WebSocket) {
super.add(sessionId, connection);
// Makes sure to remove the session if the connection is closed
connection.once('close', () => this.remove(sessionId));
}
protected close(connection: WebSocket): void {
connection.close();
}
protected sendToOne(connection: WebSocket, data: string): void {
connection.send(data);
}
}

View file

@ -1,16 +1,16 @@
import type { Request, Response } from 'express';
import type { PushRequest, PushResponse } from './push/types';
declare module 'sse-channel' {
declare class Channel {
constructor();
on(event: string, handler: (channel: string, res: Response) => void): void;
on(event: string, handler: (channel: string, res: PushResponse) => void): void;
removeClient: (res: Response) => void;
removeClient: (res: PushResponse) => void;
addClient: (req: Request, res: Response) => void;
addClient: (req: PushRequest, res: PushResponse) => void;
send: (msg: string, clients?: Response[]) => void;
send: (msg: string, clients?: PushResponse[]) => void;
}
export = Channel;

View file

@ -2,4 +2,4 @@ jest.mock('@sentry/node');
jest.mock('@n8n_io/license-sdk');
jest.mock('@/telemetry');
jest.mock('@/eventbus/MessageEventBus/MessageEventBus');
jest.mock('@/Push');
jest.mock('@/push');

View file

@ -690,6 +690,7 @@ export interface IN8nUISettings {
host: string;
};
executionMode: string;
pushBackend: 'sse' | 'websocket';
communityNodesEnabled: boolean;
isNpmAvailable: boolean;
publicApi: {

View file

@ -24,6 +24,7 @@ import { useUIStore } from '@/stores/ui';
import { useWorkflowsStore } from '@/stores/workflows';
import { useNodeTypesStore } from '@/stores/nodeTypes';
import { useCredentialsStore } from '@/stores/credentials';
import { useSettingsStore } from '@/stores/settings';
export const pushConnection = mixins(
externalHooks,
@ -34,7 +35,7 @@ export const pushConnection = mixins(
).extend({
data() {
return {
eventSource: null as EventSource | null,
pushSource: null as WebSocket | EventSource | null,
reconnectTimeout: null as NodeJS.Timeout | null,
retryTimeout: null as NodeJS.Timeout | null,
pushMessageQueue: [] as Array<{ event: Event; retriesLeft: number }>,
@ -43,92 +44,99 @@ export const pushConnection = mixins(
};
},
computed: {
...mapStores(useCredentialsStore, useNodeTypesStore, useUIStore, useWorkflowsStore),
...mapStores(
useCredentialsStore,
useNodeTypesStore,
useUIStore,
useWorkflowsStore,
useSettingsStore,
),
sessionId(): string {
return this.rootStore.sessionId;
},
},
methods: {
pushAutomaticReconnect(): void {
if (this.reconnectTimeout !== null) {
return;
attemptReconnect() {
const isWorkflowRunning = this.uiStore.isActionActive('workflowRunning');
if (this.connectRetries > 3 && !this.lostConnection && isWorkflowRunning) {
this.lostConnection = true;
this.workflowsStore.executingNode = null;
this.uiStore.removeActiveAction('workflowRunning');
this.$showMessage({
title: this.$locale.baseText('pushConnection.executionFailed'),
message: this.$locale.baseText('pushConnection.executionFailed.message'),
type: 'error',
duration: 0,
});
}
this.reconnectTimeout = setTimeout(() => {
this.connectRetries++;
const isWorkflowRunning = this.uiStore.isActionActive('workflowRunning');
if (this.connectRetries > 3 && !this.lostConnection && isWorkflowRunning) {
this.lostConnection = true;
this.workflowsStore.executingNode = null;
this.uiStore.removeActiveAction('workflowRunning');
this.$showMessage({
title: this.$locale.baseText('pushConnection.executionFailed'),
message: this.$locale.baseText('pushConnection.executionFailed.message'),
type: 'error',
duration: 0,
});
}
this.pushConnect();
}, 3000);
this.pushConnect();
},
/**
* Connect to server to receive data via EventSource
* Connect to server to receive data via a WebSocket or EventSource
*/
pushConnect(): void {
// Make sure existing event-source instances get
// always removed that we do not end up with multiple ones
// always close the previous connection so that we do not end up with multiple connections
this.pushDisconnect();
const connectionUrl = `${this.rootStore.getRestUrl}/push?sessionId=${this.sessionId}`;
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
this.eventSource = new EventSource(connectionUrl, { withCredentials: true });
this.eventSource.addEventListener('message', this.pushMessageReceived, false);
const useWebSockets = this.settingsStore.pushBackend === 'websocket';
this.eventSource.addEventListener(
'open',
() => {
this.connectRetries = 0;
this.lostConnection = false;
const { getRestUrl: restUrl } = this.rootStore;
const url = `/push?sessionId=${this.sessionId}`;
this.rootStore.pushConnectionActive = true;
if (this.reconnectTimeout !== null) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
},
if (useWebSockets) {
const { protocol, host } = window.location;
const baseUrl = restUrl.startsWith('http')
? restUrl.replace(/^http/, 'ws')
: `${protocol === 'https:' ? 'wss' : 'ws'}://${host + restUrl}`;
this.pushSource = new WebSocket(`${baseUrl}${url}`);
} else {
this.pushSource = new EventSource(`${restUrl}${url}`, { withCredentials: true });
}
this.pushSource.addEventListener('open', this.onConnectionSuccess, false);
this.pushSource.addEventListener('message', this.pushMessageReceived, false);
this.pushSource.addEventListener(
useWebSockets ? 'close' : 'error',
this.onConnectionError,
false,
);
},
this.eventSource.addEventListener(
'error',
() => {
this.pushDisconnect();
onConnectionSuccess() {
this.connectRetries = 0;
this.lostConnection = false;
this.rootStore.pushConnectionActive = true;
this.pushSource?.removeEventListener('open', this.onConnectionSuccess);
},
if (this.reconnectTimeout !== null) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
this.rootStore.pushConnectionActive = false;
this.pushAutomaticReconnect();
},
false,
);
onConnectionError() {
this.pushDisconnect();
this.connectRetries++;
this.reconnectTimeout = setTimeout(this.attemptReconnect, this.connectRetries * 5000);
},
/**
* Close connection to server
*/
pushDisconnect(): void {
if (this.eventSource !== null) {
this.eventSource.close();
this.eventSource = null;
this.rootStore.pushConnectionActive = false;
if (this.pushSource !== null) {
this.pushSource.removeEventListener('error', this.onConnectionError);
this.pushSource.removeEventListener('close', this.onConnectionError);
this.pushSource.removeEventListener('message', this.pushMessageReceived);
if (this.pushSource.readyState < 2) this.pushSource.close();
this.pushSource = null;
}
this.rootStore.pushConnectionActive = false;
},
/**
@ -136,7 +144,6 @@ export const pushConnection = mixins(
* the REST API so we do not know yet what execution ID
* is currently active. So internally resend the message
* a few more times
*
*/
queuePushMessage(event: Event, retryAttempts: number) {
this.pushMessageQueue.push({ event, retriesLeft: retryAttempts });
@ -178,9 +185,6 @@ export const pushConnection = mixins(
/**
* Process a newly received message
*
* @param {Event} event The event data with the message data
* @param {boolean} [isRetry] If it is a retry
*/
pushMessageReceived(event: Event, isRetry?: boolean): boolean {
const retryAttempts = 5;

View file

@ -141,6 +141,9 @@ export const useSettingsStore = defineStore(STORES.SETTINGS, {
templatesHost(): string {
return this.settings.templates.host;
},
pushBackend(): IN8nUISettings['pushBackend'] {
return this.settings.pushBackend;
},
isCommunityNodesFeatureEnabled(): boolean {
return this.settings.communityNodesEnabled;
},

View file

@ -148,6 +148,7 @@ importers:
'@types/syslog-client': ^1.1.2
'@types/uuid': ^8.3.2
'@types/validator': ^13.7.0
'@types/ws': ^8.5.4
'@types/yamljs': ^0.2.31
axios: ^0.21.1
basic-auth: ^2.0.1
@ -236,6 +237,7 @@ importers:
uuid: ^8.3.2
validator: 13.7.0
winston: ^3.3.3
ws: ^8.12.0
yamljs: ^0.3.0
dependencies:
'@n8n_io/license-sdk': 1.8.0
@ -324,6 +326,7 @@ importers:
uuid: 8.3.2
validator: 13.7.0
winston: 3.8.2
ws: 8.12.0
yamljs: 0.3.0
devDependencies:
'@apidevtools/swagger-cli': 4.0.0
@ -364,6 +367,7 @@ importers:
'@types/syslog-client': 1.1.2
'@types/uuid': 8.3.4
'@types/validator': 13.7.7
'@types/ws': 8.5.4
'@types/yamljs': 0.2.31
chokidar: 3.5.2
concurrently: 5.3.0
@ -6482,6 +6486,12 @@ packages:
'@types/webidl-conversions': 7.0.0
dev: false
/@types/ws/8.5.4:
resolution: {integrity: sha512-zdQDHKUgcX/zBc4GrwsE/7dVdAD8JR4EuiAXiiUhhfyIJXXb2+PrGshFyeXWQPMmmZ2XxgaqclgpIC7eTXc1mg==}
dependencies:
'@types/node': 16.18.12
dev: true
/@types/xml2js/0.4.11:
resolution: {integrity: sha512-JdigeAKmCyoJUiQljjr7tQG3if9NkqGUgwEUqBvV0N7LM4HyQk7UXCnusRa1lnvXAEYJ8mw8GtZWioagNztOwA==}
dependencies:
@ -22840,7 +22850,6 @@ packages:
optional: true
utf-8-validate:
optional: true
dev: true
/x-default-browser/0.4.0:
resolution: {integrity: sha512-7LKo7RtWfoFN/rHx1UELv/2zHGMx8MkZKDq1xENmOCTkfIqZJ0zZ26NEJX8czhnPXVcqS0ARjjfJB+eJ0/5Cvw==}