diff --git a/packages/cli/package.json b/packages/cli/package.json index 9bf7460c16..88298c03af 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -99,6 +99,7 @@ "@types/syslog-client": "^1.1.2", "@types/uuid": "^8.3.2", "@types/validator": "^13.7.0", + "@types/ws": "^8.5.4", "@types/yamljs": "^0.2.31", "chokidar": "^3.5.2", "concurrently": "^5.1.0", @@ -196,6 +197,7 @@ "uuid": "^8.3.2", "validator": "13.7.0", "winston": "^3.3.3", + "ws": "^8.12.0", "yamljs": "^0.3.0" } } diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index cb7098848e..ee01a639a1 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -30,6 +30,8 @@ import { WEBHOOK_METHODS } from '@/WebhookHelpers'; const emptyBuffer = Buffer.alloc(0); export abstract class AbstractServer { + protected server: Server; + protected app: express.Application; protected externalHooks: IExternalHooksClass; @@ -73,7 +75,7 @@ export abstract class AbstractServer { this.activeWorkflowRunner = ActiveWorkflowRunner.getInstance(); } - private async setupCommonMiddlewares() { + private async setupErrorHandlers() { const { app } = this; // Augment errors sent to Sentry @@ -82,6 +84,10 @@ export abstract class AbstractServer { } = await import('@sentry/node'); app.use(requestHandler()); app.use(errorHandler()); + } + + private async setupCommonMiddlewares() { + const { app } = this; // Compress the response data app.use(compression()); @@ -147,6 +153,8 @@ export abstract class AbstractServer { this.app.use(corsMiddleware); } + protected setupPushServer() {} + private async setupHealthCheck() { this.app.use((req, res, next) => { if (!Db.isInitialized) { @@ -392,10 +400,9 @@ export abstract class AbstractServer { async start(): Promise { const { app, externalHooks, protocol, sslKey, sslCert } = this; - let server: Server; if (protocol === 'https' && sslKey && sslCert) { const https = await import('https'); - server = https.createServer( + this.server = https.createServer( { key: await readFile(this.sslKey, 'utf8'), cert: await readFile(this.sslCert, 'utf8'), @@ -404,13 +411,13 @@ export abstract class AbstractServer { ); } else { const http = await import('http'); - server = http.createServer(app); + this.server = http.createServer(app); } const PORT = config.getEnv('port'); const ADDRESS = config.getEnv('listen_address'); - server.on('error', (error: Error & { code: string }) => { + this.server.on('error', (error: Error & { code: string }) => { if (error.code === 'EADDRINUSE') { console.log( `n8n's port ${PORT} is already in use. Do you have another instance of n8n running already?`, @@ -419,8 +426,10 @@ export abstract class AbstractServer { } }); - await new Promise((resolve) => server.listen(PORT, ADDRESS, () => resolve())); + await new Promise((resolve) => this.server.listen(PORT, ADDRESS, () => resolve())); + await this.setupErrorHandlers(); + this.setupPushServer(); await this.setupCommonMiddlewares(); if (inDevelopment) { this.setupDevMiddlewares(); diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index 86150c1acd..145500b9be 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -449,56 +449,6 @@ export interface IInternalHooksClass { onApiKeyDeleted(apiKeyDeletedData: { user: User; public_api: boolean }): Promise; } -export interface IN8nConfig { - database: IN8nConfigDatabase; - endpoints: IN8nConfigEndpoints; - executions: IN8nConfigExecutions; - generic: IN8nConfigGeneric; - host: string; - nodes: IN8nConfigNodes; - port: number; - protocol: 'http' | 'https'; -} - -export interface IN8nConfigDatabase { - type: DatabaseType; - postgresdb: { - host: string; - password: string; - port: number; - user: string; - }; -} - -export interface IN8nConfigEndpoints { - rest: string; - webhook: string; - webhookTest: string; -} - -// eslint-disable-next-line import/export -export interface IN8nConfigExecutions { - saveDataOnError: SaveExecutionDataType; - saveDataOnSuccess: SaveExecutionDataType; - saveDataManualExecutions: boolean; -} - -// eslint-disable-next-line import/export -export interface IN8nConfigExecutions { - saveDataOnError: SaveExecutionDataType; - saveDataOnSuccess: SaveExecutionDataType; - saveDataManualExecutions: boolean; -} - -export interface IN8nConfigGeneric { - timezone: string; -} - -export interface IN8nConfigNodes { - errorTriggerType: string; - exclude: string[]; -} - export interface IVersionNotificationSettings { enabled: boolean; endpoint: string; @@ -550,6 +500,7 @@ export interface IN8nUISettings { onboardingCallPromptEnabled: boolean; missingPackages?: boolean; executionMode: 'regular' | 'queue'; + pushBackend: 'sse' | 'websocket'; communityNodesEnabled: boolean; deployment: { type: string; diff --git a/packages/cli/src/Push.ts b/packages/cli/src/Push.ts deleted file mode 100644 index 74a399e037..0000000000 --- a/packages/cli/src/Push.ts +++ /dev/null @@ -1,83 +0,0 @@ -import SSEChannel from 'sse-channel'; -import type { Request, Response } from 'express'; - -import { LoggerProxy as Logger } from 'n8n-workflow'; -import type { IPushData, IPushDataType } from '@/Interfaces'; - -export class Push { - private channel = new SSEChannel(); - - private connections: Record = {}; - - constructor() { - this.channel.on('disconnect', (channel: string, res: Response) => { - if (res.req !== undefined) { - const { sessionId } = res.req.query; - Logger.debug('Remove editor-UI session', { sessionId }); - delete this.connections[sessionId as string]; - } - }); - } - - /** - * Adds a new push connection - * - * @param {string} sessionId The id of the session - * @param {Request} req The request - * @param {Response} res The response - */ - add(sessionId: string, req: Request, res: Response) { - Logger.debug('Add editor-UI session', { sessionId }); - - if (this.connections[sessionId] !== undefined) { - // Make sure to remove existing connection with the same session - // id if one exists already - this.connections[sessionId].end(); - this.channel.removeClient(this.connections[sessionId]); - } - - this.connections[sessionId] = res; - this.channel.addClient(req, res); - } - - /** - * Sends data to the client which is connected via a specific session - * - * @param {string} sessionId The session id of client to send data to - * @param {string} type Type of data to send - */ - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - send(type: IPushDataType, data: any, sessionId?: string) { - if (sessionId !== undefined && this.connections[sessionId] === undefined) { - Logger.error(`The session "${sessionId}" is not registered.`, { sessionId }); - return; - } - - Logger.debug(`Send data of type "${type}" to editor-UI`, { dataType: type, sessionId }); - - const sendData: IPushData = { - type, - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - data, - }; - - if (sessionId === undefined) { - // Send to all connected clients - this.channel.send(JSON.stringify(sendData)); - } else { - // Send only to a specific client - this.channel.send(JSON.stringify(sendData), [this.connections[sessionId]]); - } - } -} - -let activePushInstance: Push | undefined; - -export function getInstance(): Push { - if (activePushInstance === undefined) { - activePushInstance = new Push(); - } - - return activePushInstance; -} diff --git a/packages/cli/src/ReloadNodesAndCredentials.ts b/packages/cli/src/ReloadNodesAndCredentials.ts index 1be112037c..d7cc36c05e 100644 --- a/packages/cli/src/ReloadNodesAndCredentials.ts +++ b/packages/cli/src/ReloadNodesAndCredentials.ts @@ -3,7 +3,7 @@ import { realpath } from 'fs/promises'; import type { LoadNodesAndCredentialsClass } from '@/LoadNodesAndCredentials'; import type { NodeTypesClass } from '@/NodeTypes'; -import type { Push } from '@/Push'; +import type { Push } from '@/push'; export const reloadNodesAndCredentials = async ( loadNodesAndCredentials: LoadNodesAndCredentialsClass, diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index e166ce6a6e..b898f62a79 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -1,31 +1,15 @@ /* eslint-disable @typescript-eslint/no-unsafe-argument */ /* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */ /* eslint-disable @typescript-eslint/no-unnecessary-type-assertion */ -/* eslint-disable @typescript-eslint/no-use-before-define */ -/* eslint-disable @typescript-eslint/await-thenable */ -/* eslint-disable new-cap */ /* eslint-disable prefer-const */ /* eslint-disable @typescript-eslint/no-invalid-void-type */ -/* eslint-disable no-return-assign */ -/* eslint-disable no-param-reassign */ -/* eslint-disable consistent-return */ /* eslint-disable @typescript-eslint/restrict-template-expressions */ -/* eslint-disable @typescript-eslint/no-non-null-assertion */ -/* eslint-disable id-denylist */ -/* eslint-disable no-console */ -/* eslint-disable global-require */ /* eslint-disable @typescript-eslint/no-var-requires */ /* eslint-disable @typescript-eslint/no-shadow */ -/* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-return */ /* eslint-disable @typescript-eslint/no-unused-vars */ -/* eslint-disable no-continue */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ -/* eslint-disable @typescript-eslint/no-explicit-any */ -/* eslint-disable no-restricted-syntax */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -/* eslint-disable import/no-dynamic-require */ -/* eslint-disable no-await-in-loop */ import { exec as callbackExec } from 'child_process'; import { access as fsAccess } from 'fs/promises'; @@ -60,7 +44,6 @@ import type { INodeTypeNameVersion, ITelemetrySettings, WorkflowExecuteMode, - INodeTypes, ICredentialTypes, } from 'n8n-workflow'; import { LoggerProxy, jsonParse } from 'n8n-workflow'; @@ -78,7 +61,6 @@ import { getSharedWorkflowIds } from '@/WorkflowHelpers'; import { nodesController } from '@/api/nodes.api'; import { workflowsController } from '@/workflows/workflows.controller'; import { - AUTH_COOKIE_NAME, EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR, inDevelopment, @@ -106,7 +88,6 @@ import { PasswordResetController, UsersController, } from '@/controllers'; -import { resolveJwt } from '@/auth/jwt'; import { executionsController } from '@/executions/executions.controller'; import { nodeTypesController } from '@/api/nodeTypes.api'; @@ -143,7 +124,6 @@ import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; import type { LoadNodesAndCredentialsClass } from '@/LoadNodesAndCredentials'; import type { NodeTypesClass } from '@/NodeTypes'; import { NodeTypes } from '@/NodeTypes'; -import * as Push from '@/Push'; import * as ResponseHelper from '@/ResponseHelper'; import type { WaitTrackerClass } from '@/WaitTracker'; import { WaitTracker } from '@/WaitTracker'; @@ -155,7 +135,9 @@ import { eventBusRouter } from '@/eventbus/eventBusRoutes'; import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBusHelper'; import { getLicense } from '@/License'; import { licenseController } from './license/license.controller'; -import { corsMiddleware, setupAuthMiddlewares } from './middlewares'; +import type { Push } from '@/push'; +import { getPushInstance, setupPushServer, setupPushHandler } from '@/push'; +import { setupAuthMiddlewares } from './middlewares'; import { initEvents } from './events'; import { ldapController } from './Ldap/routes/ldap.controller.ee'; import { getLdapLoginLabel, isLdapEnabled, isLdapLoginEnabled } from './Ldap/helpers'; @@ -183,7 +165,7 @@ class Server extends AbstractServer { credentialTypes: ICredentialTypes; - push: Push.Push; + push: Push; constructor() { super(); @@ -198,12 +180,12 @@ class Server extends AbstractServer { this.presetCredentialsLoaded = false; this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint'); + this.push = getPushInstance(); + if (process.env.E2E_TESTS === 'true') { this.app.use('/e2e', require('./api/e2e.api').e2eController); } - this.push = Push.getInstance(); - const urlBaseWebhook = WebhookHelpers.getWebhookBaseUrl(); const telemetrySettings: ITelemetrySettings = { enabled: config.getEnv('diagnostics.enabled'), @@ -280,6 +262,7 @@ class Server extends AbstractServer { }, onboardingCallPromptEnabled: config.getEnv('onboardingCallPrompt.enabled'), executionMode: config.getEnv('executions.mode'), + pushBackend: config.getEnv('push.backend'), communityNodesEnabled: config.getEnv('nodes.communityPackages.enabled'), deployment: { type: config.getEnv('deployment.type'), @@ -434,26 +417,8 @@ class Server extends AbstractServer { // Parse cookies for easier access this.app.use(cookieParser()); - // Get push connections - this.app.use(`/${this.restEndpoint}/push`, corsMiddleware, async (req, res, next) => { - const { sessionId } = req.query; - if (sessionId === undefined) { - next(new Error('The query parameter "sessionId" is missing!')); - return; - } - - if (isUserManagementEnabled()) { - try { - const authCookie = req.cookies?.[AUTH_COOKIE_NAME] ?? ''; - await resolveJwt(authCookie); - } catch (error) { - res.status(401).send('Unauthorized'); - return; - } - } - - this.push.add(sessionId as string, req, res); - }); + const { restEndpoint, app } = this; + setupPushHandler(restEndpoint, app, isUserManagementEnabled()); // Make sure that Vue history mode works properly this.app.use( @@ -1324,6 +1289,11 @@ class Server extends AbstractServer { this.app.use('/', express.static(GENERATED_STATIC_DIR)); } } + + protected setupPushServer(): void { + const { restEndpoint, server, app } = this; + setupPushServer(restEndpoint, server, app); + } } export async function start(): Promise { diff --git a/packages/cli/src/TestWebhooks.ts b/packages/cli/src/TestWebhooks.ts index fe3b454229..aee337ec74 100644 --- a/packages/cli/src/TestWebhooks.ts +++ b/packages/cli/src/TestWebhooks.ts @@ -14,14 +14,15 @@ import type { WorkflowExecuteMode, } from 'n8n-workflow'; import type { IResponseCallbackData, IWorkflowDb } from '@/Interfaces'; -import * as Push from '@/Push'; +import type { Push } from '@/push'; +import { getPushInstance } from '@/push'; import * as ResponseHelper from '@/ResponseHelper'; import * as WebhookHelpers from '@/WebhookHelpers'; const WEBHOOK_TEST_UNREGISTERED_HINT = "Click the 'Execute workflow' button on the canvas, then try again. (In test mode, the webhook only works for one call after you click this button)"; -export class TestWebhooks { +class TestWebhooks { private testWebhookData: { [key: string]: { sessionId?: string; @@ -32,18 +33,14 @@ export class TestWebhooks { }; } = {}; - private activeWebhooks: ActiveWebhooks | null = null; - - constructor() { - this.activeWebhooks = new ActiveWebhooks(); - this.activeWebhooks.testWebhooks = true; + constructor(private activeWebhooks: ActiveWebhooks, private push: Push) { + activeWebhooks.testWebhooks = true; } /** * Executes a test-webhook and returns the data. It also makes sure that the * data gets additionally send to the UI. After the request got handled it * automatically remove the test-webhook. - * */ async callTestWebhook( httpMethod: WebhookHttpMethod, @@ -59,14 +56,16 @@ export class TestWebhooks { path = path.slice(0, -1); } - let webhookData: IWebhookData | undefined = this.activeWebhooks!.get(httpMethod, path); + const { activeWebhooks, push, testWebhookData } = this; + + let webhookData: IWebhookData | undefined = activeWebhooks.get(httpMethod, path); // check if path is dynamic if (webhookData === undefined) { const pathElements = path.split('/'); const webhookId = pathElements.shift(); // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - webhookData = this.activeWebhooks!.get(httpMethod, pathElements.join('/'), webhookId); + webhookData = activeWebhooks.get(httpMethod, pathElements.join('/'), webhookId); if (webhookData === undefined) { // The requested webhook is not registered throw new ResponseHelper.NotFoundError( @@ -85,14 +84,15 @@ export class TestWebhooks { }); } - const webhookKey = `${this.activeWebhooks!.getWebhookKey( + const { workflowId } = webhookData; + const webhookKey = `${activeWebhooks.getWebhookKey( webhookData.httpMethod, webhookData.path, webhookData.webhookId, - )}|${webhookData.workflowId}`; + )}|${workflowId}`; // TODO: Clean that duplication up one day and improve code generally - if (this.testWebhookData[webhookKey] === undefined) { + if (testWebhookData[webhookKey] === undefined) { // The requested webhook is not registered throw new ResponseHelper.NotFoundError( `The requested webhook "${httpMethod} ${path}" is not registered.`, @@ -100,7 +100,8 @@ export class TestWebhooks { ); } - const { workflow } = this.testWebhookData[webhookKey]; + const { destinationNode, sessionId, workflow, workflowData, timeout } = + testWebhookData[webhookKey]; // Get the node which has the webhook defined to know where to start from and to // get additional data @@ -116,61 +117,46 @@ export class TestWebhooks { const executionId = await WebhookHelpers.executeWebhook( workflow, webhookData!, - this.testWebhookData[webhookKey].workflowData, + workflowData, workflowStartNode, executionMode, - this.testWebhookData[webhookKey].sessionId, + sessionId, undefined, undefined, request, response, (error: Error | null, data: IResponseCallbackData) => { - if (error !== null) { - return reject(error); - } - resolve(data); + if (error !== null) reject(error); + else resolve(data); }, - this.testWebhookData[webhookKey].destinationNode, + destinationNode, ); - if (executionId === undefined) { - // The workflow did not run as the request was probably setup related - // or a ping so do not resolve the promise and wait for the real webhook - // request instead. - return; - } + // The workflow did not run as the request was probably setup related + // or a ping so do not resolve the promise and wait for the real webhook + // request instead. + if (executionId === undefined) return; // Inform editor-ui that webhook got received - if (this.testWebhookData[webhookKey].sessionId !== undefined) { - const pushInstance = Push.getInstance(); - pushInstance.send( - 'testWebhookReceived', - { workflowId: webhookData!.workflowId, executionId }, - this.testWebhookData[webhookKey].sessionId, - ); + if (sessionId !== undefined) { + push.send('testWebhookReceived', { workflowId, executionId }, sessionId); } - } catch (error) { + } finally { // Delete webhook also if an error is thrown - } + if (timeout) clearTimeout(timeout); + delete testWebhookData[webhookKey]; - // Remove the webhook - if (this.testWebhookData[webhookKey]) { - clearTimeout(this.testWebhookData[webhookKey].timeout); - delete this.testWebhookData[webhookKey]; + await activeWebhooks.removeWorkflow(workflow); } - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.activeWebhooks!.removeWorkflow(workflow); }); } /** * Gets all request methods associated with a single test webhook - * @param path webhook path */ async getWebhookMethods(path: string): Promise { - const webhookMethods: string[] = this.activeWebhooks!.getWebhookMethods(path); - - if (webhookMethods === undefined) { + const webhookMethods = this.activeWebhooks.getWebhookMethods(path); + if (!webhookMethods.length) { // The requested webhook is not registered throw new ResponseHelper.NotFoundError( `The requested webhook "${path}" is not registered.`, @@ -182,10 +168,8 @@ export class TestWebhooks { } /** - * Checks if it has to wait for webhook data to execute the workflow. If yes it waits - * for it and resolves with the result of the workflow if not it simply resolves - * with undefined - * + * Checks if it has to wait for webhook data to execute the workflow. + * If yes it waits for it and resolves with the result of the workflow if not it simply resolves with undefined */ async needsWebhookData( workflowData: IWorkflowDb, @@ -216,11 +200,13 @@ export class TestWebhooks { this.cancelTestWebhook(workflowData.id); }, 120000); + const { activeWebhooks, testWebhookData } = this; + let key: string; const activatedKey: string[] = []; // eslint-disable-next-line no-restricted-syntax for (const webhookData of webhooks) { - key = `${this.activeWebhooks!.getWebhookKey( + key = `${activeWebhooks.getWebhookKey( webhookData.httpMethod, webhookData.path, webhookData.webhookId, @@ -228,7 +214,7 @@ export class TestWebhooks { activatedKey.push(key); - this.testWebhookData[key] = { + testWebhookData[key] = { sessionId, timeout, workflow, @@ -238,11 +224,11 @@ export class TestWebhooks { try { // eslint-disable-next-line no-await-in-loop - await this.activeWebhooks!.add(workflow, webhookData, mode, activation); + await activeWebhooks.add(workflow, webhookData, mode, activation); } catch (error) { - activatedKey.forEach((deleteKey) => delete this.testWebhookData[deleteKey]); + activatedKey.forEach((deleteKey) => delete testWebhookData[deleteKey]); // eslint-disable-next-line no-await-in-loop - await this.activeWebhooks!.removeWorkflow(workflow); + await activeWebhooks.removeWorkflow(workflow); throw error; } } @@ -256,40 +242,34 @@ export class TestWebhooks { */ cancelTestWebhook(workflowId: string): boolean { let foundWebhook = false; + const { activeWebhooks, push, testWebhookData } = this; // eslint-disable-next-line no-restricted-syntax - for (const webhookKey of Object.keys(this.testWebhookData)) { - const webhookData = this.testWebhookData[webhookKey]; + for (const webhookKey of Object.keys(testWebhookData)) { + const { sessionId, timeout, workflow, workflowData } = testWebhookData[webhookKey]; - if (webhookData.workflowData.id !== workflowId) { + if (workflowData.id !== workflowId) { // eslint-disable-next-line no-continue continue; } - clearTimeout(this.testWebhookData[webhookKey].timeout); + clearTimeout(timeout); // Inform editor-ui that webhook got received - if (this.testWebhookData[webhookKey].sessionId !== undefined) { + if (sessionId !== undefined) { try { - const pushInstance = Push.getInstance(); - pushInstance.send( - 'testWebhookDeleted', - { workflowId }, - this.testWebhookData[webhookKey].sessionId, - ); - } catch (error) { + push.send('testWebhookDeleted', { workflowId }, sessionId); + } catch { // Could not inform editor, probably is not connected anymore. So simply go on. } } - const { workflow } = this.testWebhookData[webhookKey]; - // Remove the webhook - delete this.testWebhookData[webhookKey]; + delete testWebhookData[webhookKey]; if (!foundWebhook) { // As it removes all webhooks of the workflow execute only once // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.activeWebhooks!.removeWorkflow(workflow); + activeWebhooks.removeWorkflow(workflow); } foundWebhook = true; @@ -302,18 +282,7 @@ export class TestWebhooks { * Removes all the currently active test webhooks */ async removeAll(): Promise { - if (this.activeWebhooks === null) { - return; - } - - let workflow: Workflow; - const workflows: Workflow[] = []; - // eslint-disable-next-line no-restricted-syntax - for (const webhookKey of Object.keys(this.testWebhookData)) { - workflow = this.testWebhookData[webhookKey].workflow; - workflows.push(workflow); - } - + const workflows = Object.values(this.testWebhookData).map(({ workflow }) => workflow); return this.activeWebhooks.removeAll(workflows); } } @@ -322,7 +291,7 @@ let testWebhooksInstance: TestWebhooks | undefined; export function getInstance(): TestWebhooks { if (testWebhooksInstance === undefined) { - testWebhooksInstance = new TestWebhooks(); + testWebhooksInstance = new TestWebhooks(new ActiveWebhooks(), getPushInstance()); } return testWebhooksInstance; diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index c81d08c691..9f220dd164 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -60,7 +60,7 @@ import type { } from '@/Interfaces'; import { InternalHooksManager } from '@/InternalHooksManager'; import { NodeTypes } from '@/NodeTypes'; -import * as Push from '@/Push'; +import { getPushInstance } from '@/push'; import * as ResponseHelper from '@/ResponseHelper'; import * as WebhookHelpers from '@/WebhookHelpers'; import * as WorkflowHelpers from '@/WorkflowHelpers'; @@ -250,76 +250,67 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { return { nodeExecuteBefore: [ async function (this: WorkflowHooks, nodeName: string): Promise { + const { sessionId, executionId } = this; // Push data to session which started workflow before each // node which starts rendering - if (this.sessionId === undefined) { + if (sessionId === undefined) { return; } + Logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { - executionId: this.executionId, - sessionId: this.sessionId, + executionId, + sessionId, workflowId: this.workflowData.id, }); - const pushInstance = Push.getInstance(); - pushInstance.send( - 'nodeExecuteBefore', - { - executionId: this.executionId, - nodeName, - }, - this.sessionId, - ); + const pushInstance = getPushInstance(); + pushInstance.send('nodeExecuteBefore', { executionId, nodeName }, sessionId); }, ], nodeExecuteAfter: [ async function (this: WorkflowHooks, nodeName: string, data: ITaskData): Promise { + const { sessionId, executionId } = this; // Push data to session which started workflow after each rendered node - if (this.sessionId === undefined) { + if (sessionId === undefined) { return; } + Logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, { - executionId: this.executionId, - sessionId: this.sessionId, + executionId, + sessionId, workflowId: this.workflowData.id, }); - const pushInstance = Push.getInstance(); - pushInstance.send( - 'nodeExecuteAfter', - { - executionId: this.executionId, - nodeName, - data, - }, - this.sessionId, - ); + const pushInstance = getPushInstance(); + pushInstance.send('nodeExecuteAfter', { executionId, nodeName, data }, sessionId); }, ], workflowExecuteBefore: [ async function (this: WorkflowHooks): Promise { + const { sessionId, executionId } = this; + const { id: workflowId, name: workflowName } = this.workflowData; Logger.debug('Executing hook (hookFunctionsPush)', { - executionId: this.executionId, - sessionId: this.sessionId, - workflowId: this.workflowData.id, + executionId, + sessionId, + workflowId, }); // Push data to session which started the workflow - if (this.sessionId === undefined) { + if (sessionId === undefined) { return; } - const pushInstance = Push.getInstance(); + const pushInstance = getPushInstance(); pushInstance.send( 'executionStarted', { - executionId: this.executionId, + executionId, mode: this.mode, startedAt: new Date(), retryOf: this.retryOf, - workflowId: this.workflowData.id, - sessionId: this.sessionId, - workflowName: this.workflowData.name, + workflowId, + sessionId, + workflowName, }, - this.sessionId, + sessionId, ); }, ], @@ -329,13 +320,15 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { fullRunData: IRun, newStaticData: IDataObject, ): Promise { + const { sessionId, executionId, retryOf } = this; + const { id: workflowId } = this.workflowData; Logger.debug('Executing hook (hookFunctionsPush)', { - executionId: this.executionId, - sessionId: this.sessionId, - workflowId: this.workflowData.id, + executionId, + sessionId, + workflowId, }); // Push data to session which started the workflow - if (this.sessionId === undefined) { + if (sessionId === undefined) { return; } @@ -354,19 +347,19 @@ function hookFunctionsPush(): IWorkflowExecuteHooks { }; // Push data to editor-ui once workflow finished - Logger.debug(`Save execution progress to database for execution ID ${this.executionId} `, { - executionId: this.executionId, - workflowId: this.workflowData.id, + Logger.debug(`Save execution progress to database for execution ID ${executionId} `, { + executionId, + workflowId, }); // TODO: Look at this again const sendData: IPushDataExecutionFinished = { - executionId: this.executionId, + executionId, data: pushRunData, - retryOf: this.retryOf, + retryOf, }; - const pushInstance = Push.getInstance(); - pushInstance.send('executionFinished', sendData, this.sessionId); + const pushInstance = getPushInstance(); + pushInstance.send('executionFinished', sendData, sessionId); }, ], }; @@ -1041,11 +1034,11 @@ async function executeWorkflow( if (data.finished === true) { // Workflow did finish successfully - await ActiveExecutions.getInstance().remove(executionId, data); + ActiveExecutions.getInstance().remove(executionId, data); const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); return returnData!.data!.main; } - await ActiveExecutions.getInstance().remove(executionId, data); + ActiveExecutions.getInstance().remove(executionId, data); // Workflow did fail const { error } = data.data.resultData; // eslint-disable-next-line @typescript-eslint/no-throw-literal @@ -1057,20 +1050,21 @@ async function executeWorkflow( // eslint-disable-next-line @typescript-eslint/no-explicit-any export function sendMessageToUI(source: string, messages: any[]) { - if (this.sessionId === undefined) { + const { sessionId } = this; + if (sessionId === undefined) { return; } // Push data to session which started workflow try { - const pushInstance = Push.getInstance(); + const pushInstance = getPushInstance(); pushInstance.send( 'sendConsoleMessage', { source: `[Node: "${source}"]`, messages, }, - this.sessionId, + sessionId, ); } catch (error) { Logger.warn(`There was a problem sending message to UI: ${error.message}`); diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 30fa45970c..0e0f90bbd4 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -44,7 +44,6 @@ import type { IWorkflowExecutionDataProcessWithExecution, } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; -import * as Push from '@/Push'; import * as Queue from '@/Queue'; import * as ResponseHelper from '@/ResponseHelper'; import * as WebhookHelpers from '@/WebhookHelpers'; @@ -54,16 +53,18 @@ import { InternalHooksManager } from '@/InternalHooksManager'; import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; import { initErrorHandling } from '@/ErrorReporting'; import { PermissionChecker } from '@/UserManagement/PermissionChecker'; +import type { Push } from '@/push'; +import { getPushInstance } from '@/push'; export class WorkflowRunner { activeExecutions: ActiveExecutions.ActiveExecutions; - push: Push.Push; + push: Push; jobQueue: Queue.JobQueue; constructor() { - this.push = Push.getInstance(); + this.push = getPushInstance(); this.activeExecutions = ActiveExecutions.getInstance(); } diff --git a/packages/cli/src/api/nodes.api.ts b/packages/cli/src/api/nodes.api.ts index b68480be44..05e5ede065 100644 --- a/packages/cli/src/api/nodes.api.ts +++ b/packages/cli/src/api/nodes.api.ts @@ -4,7 +4,6 @@ import type { PublicInstalledPackage } from 'n8n-workflow'; import config from '@/config'; import { InternalHooksManager } from '@/InternalHooksManager'; import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; -import * as Push from '@/Push'; import * as ResponseHelper from '@/ResponseHelper'; import { @@ -34,6 +33,7 @@ import { isAuthenticatedRequest } from '@/UserManagement/UserManagementHelper'; import type { InstalledPackages } from '@db/entities/InstalledPackages'; import type { CommunityPackages } from '@/Interfaces'; import type { NodeRequest } from '@/requests'; +import { getPushInstance } from '@/push'; const { PACKAGE_NOT_INSTALLED, PACKAGE_NAME_NOT_PROVIDED } = RESPONSE_ERROR_MESSAGES; @@ -141,7 +141,7 @@ nodesController.post( if (!hasLoaded) removePackageFromMissingList(name); - const pushInstance = Push.getInstance(); + const pushInstance = getPushInstance(); // broadcast to connected frontends that node list has been updated installedPackage.installedNodes.forEach((node) => { @@ -248,7 +248,7 @@ nodesController.delete( throw new ResponseHelper.InternalServerError(message); } - const pushInstance = Push.getInstance(); + const pushInstance = getPushInstance(); // broadcast to connected frontends that node list has been updated installedPackage.installedNodes.forEach((node) => { @@ -295,7 +295,7 @@ nodesController.patch( previouslyInstalledPackage, ); - const pushInstance = Push.getInstance(); + const pushInstance = getPushInstance(); // broadcast to connected frontends that node list has been updated previouslyInstalledPackage.installedNodes.forEach((node) => { @@ -325,7 +325,7 @@ nodesController.patch( return newInstalledPackage; } catch (error) { previouslyInstalledPackage.installedNodes.forEach((node) => { - const pushInstance = Push.getInstance(); + const pushInstance = getPushInstance(); pushInstance.send('removeNodeType', { name: node.type, version: node.latestVersion, diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index cf1091bdae..0c453e202f 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -925,6 +925,15 @@ export const schema = { }, }, + push: { + backend: { + format: ['sse', 'websocket'] as const, + default: 'sse', + env: 'N8N_PUSH_BACKEND', + doc: 'Backend to use for push notifications', + }, + }, + binaryDataManager: { availableModes: { format: String, diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index f9609a77c0..152ae625dc 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -16,7 +16,6 @@ import * as ActiveExecutions from './ActiveExecutions'; import * as ActiveWorkflowRunner from './ActiveWorkflowRunner'; import * as Db from './Db'; import * as GenericHelpers from './GenericHelpers'; -import * as Push from './Push'; import * as ResponseHelper from './ResponseHelper'; import * as Server from './Server'; import * as TestWebhooks from './TestWebhooks'; @@ -30,7 +29,6 @@ export { ActiveWorkflowRunner, Db, GenericHelpers, - Push, ResponseHelper, Server, TestWebhooks, diff --git a/packages/cli/src/push/abstract.push.ts b/packages/cli/src/push/abstract.push.ts new file mode 100644 index 0000000000..dbbbe63b73 --- /dev/null +++ b/packages/cli/src/push/abstract.push.ts @@ -0,0 +1,49 @@ +import { LoggerProxy as Logger } from 'n8n-workflow'; +import type { IPushDataType } from '@/Interfaces'; + +export abstract class AbstractPush { + protected connections: Record = {}; + + protected abstract close(connection: T): void; + protected abstract sendToOne(connection: T, data: string): void; + + protected add(sessionId: string, connection: T): void { + const { connections } = this; + Logger.debug('Add editor-UI session', { sessionId }); + + const existingConnection = connections[sessionId]; + if (existingConnection) { + // Make sure to remove existing connection with the same id + this.close(existingConnection); + } + + connections[sessionId] = connection; + } + + protected remove(sessionId?: string): void { + if (sessionId !== undefined) { + Logger.debug('Remove editor-UI session', { sessionId }); + delete this.connections[sessionId]; + } + } + + send(type: IPushDataType, data: D, sessionId: string | undefined = undefined) { + const { connections } = this; + if (sessionId !== undefined && connections[sessionId] === undefined) { + Logger.error(`The session "${sessionId}" is not registered.`, { sessionId }); + return; + } + + Logger.debug(`Send data of type "${type}" to editor-UI`, { dataType: type, sessionId }); + + const sendData = JSON.stringify({ type, data }); + + if (sessionId === undefined) { + // Send to all connected clients + Object.values(connections).forEach((connection) => this.sendToOne(connection, sendData)); + } else { + // Send only to a specific client + this.sendToOne(connections[sessionId], sendData); + } + } +} diff --git a/packages/cli/src/push/index.ts b/packages/cli/src/push/index.ts new file mode 100644 index 0000000000..54675b1ef8 --- /dev/null +++ b/packages/cli/src/push/index.ts @@ -0,0 +1,105 @@ +import { ServerResponse } from 'http'; +import type { Server } from 'http'; +import type { Socket } from 'net'; +import type { Application, RequestHandler } from 'express'; +import { Server as WSServer } from 'ws'; +import { parse as parseUrl } from 'url'; +import config from '@/config'; +import { resolveJwt } from '@/auth/jwt'; +import { AUTH_COOKIE_NAME } from '@/constants'; +import { SSEPush } from './sse.push'; +import { WebSocketPush } from './websocket.push'; +import type { Push, PushResponse, SSEPushRequest, WebSocketPushRequest } from './types'; +export type { Push } from './types'; + +const useWebSockets = config.getEnv('push.backend') === 'websocket'; + +let pushInstance: Push; +export const getPushInstance = () => { + if (!pushInstance) pushInstance = useWebSockets ? new WebSocketPush() : new SSEPush(); + return pushInstance; +}; + +export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => { + if (useWebSockets) { + const wsServer = new WSServer({ noServer: true }); + server.on('upgrade', (request: WebSocketPushRequest, socket: Socket, head) => { + if (parseUrl(request.url).pathname === `/${restEndpoint}/push`) { + wsServer.handleUpgrade(request, socket, head, (ws) => { + request.ws = ws; + + const response = new ServerResponse(request); + response.writeHead = (statusCode) => { + if (statusCode > 200) ws.close(); + return response; + }; + + // @ts-ignore + // eslint-disable-next-line @typescript-eslint/no-unsafe-call + app.handle(request, response); + }); + } + }); + } +}; + +export const setupPushHandler = ( + restEndpoint: string, + app: Application, + isUserManagementEnabled: boolean, +) => { + const push = getPushInstance(); + const endpoint = `/${restEndpoint}/push`; + + const pushValidationMiddleware: RequestHandler = async ( + req: SSEPushRequest | WebSocketPushRequest, + res, + next, + ) => { + const ws = req.ws; + + const { sessionId } = req.query; + if (sessionId === undefined) { + if (ws) { + ws.send('The query parameter "sessionId" is missing!'); + ws.close(400); + } else { + next(new Error('The query parameter "sessionId" is missing!')); + } + return; + } + + // Handle authentication + if (isUserManagementEnabled) { + try { + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access + const authCookie: string = req.cookies?.[AUTH_COOKIE_NAME] ?? ''; + await resolveJwt(authCookie); + } catch (error) { + if (ws) { + ws.send(`Unauthorized: ${(error as Error).message}`); + ws.close(401); + } else { + res.status(401).send('Unauthorized'); + } + return; + } + } + + next(); + }; + + app.use( + endpoint, + pushValidationMiddleware, + (req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) => { + if (req.ws) { + (push as WebSocketPush).add(req.query.sessionId, req.ws); + } else if (!useWebSockets) { + (push as SSEPush).add(req.query.sessionId, { req, res }); + } else { + res.status(401).send('Unauthorized'); + } + }, + ); +}; diff --git a/packages/cli/src/push/sse.push.ts b/packages/cli/src/push/sse.push.ts new file mode 100644 index 0000000000..e3bae7dd1f --- /dev/null +++ b/packages/cli/src/push/sse.push.ts @@ -0,0 +1,32 @@ +import SSEChannel from 'sse-channel'; +import { AbstractPush } from './abstract.push'; +import type { PushRequest, PushResponse } from './types'; + +type Connection = { req: PushRequest; res: PushResponse }; + +export class SSEPush extends AbstractPush { + readonly channel = new SSEChannel(); + + readonly connections: Record = {}; + + constructor() { + super(); + this.channel.on('disconnect', (channel, { req }) => { + this.remove(req?.query?.sessionId); + }); + } + + add(sessionId: string, connection: Connection) { + super.add(sessionId, connection); + this.channel.addClient(connection.req, connection.res); + } + + protected close({ res }: Connection): void { + res.end(); + this.channel.removeClient(res); + } + + protected sendToOne(connection: Connection, data: string): void { + this.channel.send(data, [connection.res]); + } +} diff --git a/packages/cli/src/push/types.ts b/packages/cli/src/push/types.ts new file mode 100644 index 0000000000..531ab60478 --- /dev/null +++ b/packages/cli/src/push/types.ts @@ -0,0 +1,15 @@ +import type { Request, Response } from 'express'; +import type { WebSocket } from 'ws'; +import type { SSEPush } from './sse.push'; +import type { WebSocketPush } from './websocket.push'; + +// TODO: move all push related types here + +export type Push = SSEPush | WebSocketPush; + +export type PushRequest = Request<{}, {}, {}, { sessionId: string }>; + +export type SSEPushRequest = PushRequest & { ws: undefined }; +export type WebSocketPushRequest = PushRequest & { ws: WebSocket }; + +export type PushResponse = Response & { req: PushRequest }; diff --git a/packages/cli/src/push/websocket.push.ts b/packages/cli/src/push/websocket.push.ts new file mode 100644 index 0000000000..93c9a24220 --- /dev/null +++ b/packages/cli/src/push/websocket.push.ts @@ -0,0 +1,19 @@ +import type WebSocket from 'ws'; +import { AbstractPush } from './abstract.push'; + +export class WebSocketPush extends AbstractPush { + add(sessionId: string, connection: WebSocket) { + super.add(sessionId, connection); + + // Makes sure to remove the session if the connection is closed + connection.once('close', () => this.remove(sessionId)); + } + + protected close(connection: WebSocket): void { + connection.close(); + } + + protected sendToOne(connection: WebSocket, data: string): void { + connection.send(data); + } +} diff --git a/packages/cli/src/sse-channel.d.ts b/packages/cli/src/sse-channel.d.ts index 2dc8ef5e56..6ea435b361 100644 --- a/packages/cli/src/sse-channel.d.ts +++ b/packages/cli/src/sse-channel.d.ts @@ -1,16 +1,16 @@ -import type { Request, Response } from 'express'; +import type { PushRequest, PushResponse } from './push/types'; declare module 'sse-channel' { declare class Channel { constructor(); - on(event: string, handler: (channel: string, res: Response) => void): void; + on(event: string, handler: (channel: string, res: PushResponse) => void): void; - removeClient: (res: Response) => void; + removeClient: (res: PushResponse) => void; - addClient: (req: Request, res: Response) => void; + addClient: (req: PushRequest, res: PushResponse) => void; - send: (msg: string, clients?: Response[]) => void; + send: (msg: string, clients?: PushResponse[]) => void; } export = Channel; diff --git a/packages/cli/test/setup-mocks.ts b/packages/cli/test/setup-mocks.ts index 4e116fbea4..1a9bb4e9c0 100644 --- a/packages/cli/test/setup-mocks.ts +++ b/packages/cli/test/setup-mocks.ts @@ -2,4 +2,4 @@ jest.mock('@sentry/node'); jest.mock('@n8n_io/license-sdk'); jest.mock('@/telemetry'); jest.mock('@/eventbus/MessageEventBus/MessageEventBus'); -jest.mock('@/Push'); +jest.mock('@/push'); diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts index d3dad5022e..9580abc990 100644 --- a/packages/editor-ui/src/Interface.ts +++ b/packages/editor-ui/src/Interface.ts @@ -690,6 +690,7 @@ export interface IN8nUISettings { host: string; }; executionMode: string; + pushBackend: 'sse' | 'websocket'; communityNodesEnabled: boolean; isNpmAvailable: boolean; publicApi: { diff --git a/packages/editor-ui/src/mixins/pushConnection.ts b/packages/editor-ui/src/mixins/pushConnection.ts index 1a08f10154..7ef18364fb 100644 --- a/packages/editor-ui/src/mixins/pushConnection.ts +++ b/packages/editor-ui/src/mixins/pushConnection.ts @@ -24,6 +24,7 @@ import { useUIStore } from '@/stores/ui'; import { useWorkflowsStore } from '@/stores/workflows'; import { useNodeTypesStore } from '@/stores/nodeTypes'; import { useCredentialsStore } from '@/stores/credentials'; +import { useSettingsStore } from '@/stores/settings'; export const pushConnection = mixins( externalHooks, @@ -34,7 +35,7 @@ export const pushConnection = mixins( ).extend({ data() { return { - eventSource: null as EventSource | null, + pushSource: null as WebSocket | EventSource | null, reconnectTimeout: null as NodeJS.Timeout | null, retryTimeout: null as NodeJS.Timeout | null, pushMessageQueue: [] as Array<{ event: Event; retriesLeft: number }>, @@ -43,92 +44,99 @@ export const pushConnection = mixins( }; }, computed: { - ...mapStores(useCredentialsStore, useNodeTypesStore, useUIStore, useWorkflowsStore), + ...mapStores( + useCredentialsStore, + useNodeTypesStore, + useUIStore, + useWorkflowsStore, + useSettingsStore, + ), sessionId(): string { return this.rootStore.sessionId; }, }, methods: { - pushAutomaticReconnect(): void { - if (this.reconnectTimeout !== null) { - return; + attemptReconnect() { + const isWorkflowRunning = this.uiStore.isActionActive('workflowRunning'); + if (this.connectRetries > 3 && !this.lostConnection && isWorkflowRunning) { + this.lostConnection = true; + + this.workflowsStore.executingNode = null; + this.uiStore.removeActiveAction('workflowRunning'); + + this.$showMessage({ + title: this.$locale.baseText('pushConnection.executionFailed'), + message: this.$locale.baseText('pushConnection.executionFailed.message'), + type: 'error', + duration: 0, + }); } - this.reconnectTimeout = setTimeout(() => { - this.connectRetries++; - const isWorkflowRunning = this.uiStore.isActionActive('workflowRunning'); - if (this.connectRetries > 3 && !this.lostConnection && isWorkflowRunning) { - this.lostConnection = true; - - this.workflowsStore.executingNode = null; - this.uiStore.removeActiveAction('workflowRunning'); - - this.$showMessage({ - title: this.$locale.baseText('pushConnection.executionFailed'), - message: this.$locale.baseText('pushConnection.executionFailed.message'), - type: 'error', - duration: 0, - }); - } - this.pushConnect(); - }, 3000); + this.pushConnect(); }, /** - * Connect to server to receive data via EventSource + * Connect to server to receive data via a WebSocket or EventSource */ pushConnect(): void { - // Make sure existing event-source instances get - // always removed that we do not end up with multiple ones + // always close the previous connection so that we do not end up with multiple connections this.pushDisconnect(); - const connectionUrl = `${this.rootStore.getRestUrl}/push?sessionId=${this.sessionId}`; + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = null; + } - this.eventSource = new EventSource(connectionUrl, { withCredentials: true }); - this.eventSource.addEventListener('message', this.pushMessageReceived, false); + const useWebSockets = this.settingsStore.pushBackend === 'websocket'; - this.eventSource.addEventListener( - 'open', - () => { - this.connectRetries = 0; - this.lostConnection = false; + const { getRestUrl: restUrl } = this.rootStore; + const url = `/push?sessionId=${this.sessionId}`; - this.rootStore.pushConnectionActive = true; - if (this.reconnectTimeout !== null) { - clearTimeout(this.reconnectTimeout); - this.reconnectTimeout = null; - } - }, + if (useWebSockets) { + const { protocol, host } = window.location; + const baseUrl = restUrl.startsWith('http') + ? restUrl.replace(/^http/, 'ws') + : `${protocol === 'https:' ? 'wss' : 'ws'}://${host + restUrl}`; + this.pushSource = new WebSocket(`${baseUrl}${url}`); + } else { + this.pushSource = new EventSource(`${restUrl}${url}`, { withCredentials: true }); + } + + this.pushSource.addEventListener('open', this.onConnectionSuccess, false); + this.pushSource.addEventListener('message', this.pushMessageReceived, false); + this.pushSource.addEventListener( + useWebSockets ? 'close' : 'error', + this.onConnectionError, false, ); + }, - this.eventSource.addEventListener( - 'error', - () => { - this.pushDisconnect(); + onConnectionSuccess() { + this.connectRetries = 0; + this.lostConnection = false; + this.rootStore.pushConnectionActive = true; + this.pushSource?.removeEventListener('open', this.onConnectionSuccess); + }, - if (this.reconnectTimeout !== null) { - clearTimeout(this.reconnectTimeout); - this.reconnectTimeout = null; - } - - this.rootStore.pushConnectionActive = false; - this.pushAutomaticReconnect(); - }, - false, - ); + onConnectionError() { + this.pushDisconnect(); + this.connectRetries++; + this.reconnectTimeout = setTimeout(this.attemptReconnect, this.connectRetries * 5000); }, /** * Close connection to server */ pushDisconnect(): void { - if (this.eventSource !== null) { - this.eventSource.close(); - this.eventSource = null; - - this.rootStore.pushConnectionActive = false; + if (this.pushSource !== null) { + this.pushSource.removeEventListener('error', this.onConnectionError); + this.pushSource.removeEventListener('close', this.onConnectionError); + this.pushSource.removeEventListener('message', this.pushMessageReceived); + if (this.pushSource.readyState < 2) this.pushSource.close(); + this.pushSource = null; } + + this.rootStore.pushConnectionActive = false; }, /** @@ -136,7 +144,6 @@ export const pushConnection = mixins( * the REST API so we do not know yet what execution ID * is currently active. So internally resend the message * a few more times - * */ queuePushMessage(event: Event, retryAttempts: number) { this.pushMessageQueue.push({ event, retriesLeft: retryAttempts }); @@ -178,9 +185,6 @@ export const pushConnection = mixins( /** * Process a newly received message - * - * @param {Event} event The event data with the message data - * @param {boolean} [isRetry] If it is a retry */ pushMessageReceived(event: Event, isRetry?: boolean): boolean { const retryAttempts = 5; diff --git a/packages/editor-ui/src/stores/settings.ts b/packages/editor-ui/src/stores/settings.ts index e86c9abc61..9fca2f63f9 100644 --- a/packages/editor-ui/src/stores/settings.ts +++ b/packages/editor-ui/src/stores/settings.ts @@ -141,6 +141,9 @@ export const useSettingsStore = defineStore(STORES.SETTINGS, { templatesHost(): string { return this.settings.templates.host; }, + pushBackend(): IN8nUISettings['pushBackend'] { + return this.settings.pushBackend; + }, isCommunityNodesFeatureEnabled(): boolean { return this.settings.communityNodesEnabled; }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1dd27c55e5..9e9ac46b9a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -148,6 +148,7 @@ importers: '@types/syslog-client': ^1.1.2 '@types/uuid': ^8.3.2 '@types/validator': ^13.7.0 + '@types/ws': ^8.5.4 '@types/yamljs': ^0.2.31 axios: ^0.21.1 basic-auth: ^2.0.1 @@ -236,6 +237,7 @@ importers: uuid: ^8.3.2 validator: 13.7.0 winston: ^3.3.3 + ws: ^8.12.0 yamljs: ^0.3.0 dependencies: '@n8n_io/license-sdk': 1.8.0 @@ -324,6 +326,7 @@ importers: uuid: 8.3.2 validator: 13.7.0 winston: 3.8.2 + ws: 8.12.0 yamljs: 0.3.0 devDependencies: '@apidevtools/swagger-cli': 4.0.0 @@ -364,6 +367,7 @@ importers: '@types/syslog-client': 1.1.2 '@types/uuid': 8.3.4 '@types/validator': 13.7.7 + '@types/ws': 8.5.4 '@types/yamljs': 0.2.31 chokidar: 3.5.2 concurrently: 5.3.0 @@ -6482,6 +6486,12 @@ packages: '@types/webidl-conversions': 7.0.0 dev: false + /@types/ws/8.5.4: + resolution: {integrity: sha512-zdQDHKUgcX/zBc4GrwsE/7dVdAD8JR4EuiAXiiUhhfyIJXXb2+PrGshFyeXWQPMmmZ2XxgaqclgpIC7eTXc1mg==} + dependencies: + '@types/node': 16.18.12 + dev: true + /@types/xml2js/0.4.11: resolution: {integrity: sha512-JdigeAKmCyoJUiQljjr7tQG3if9NkqGUgwEUqBvV0N7LM4HyQk7UXCnusRa1lnvXAEYJ8mw8GtZWioagNztOwA==} dependencies: @@ -22840,7 +22850,6 @@ packages: optional: true utf-8-validate: optional: true - dev: true /x-default-browser/0.4.0: resolution: {integrity: sha512-7LKo7RtWfoFN/rHx1UELv/2zHGMx8MkZKDq1xENmOCTkfIqZJ0zZ26NEJX8czhnPXVcqS0ARjjfJB+eJ0/5Cvw==}