From c84d3c3bbf661ae561c526d7a399d6efe3c677bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Thu, 28 Dec 2023 10:04:32 +0100 Subject: [PATCH] refactor(core): Extract ActiveWebhooks out of ActiveWorkflowRunner (no-changelog) (#8171) ## Summary This PR continues refactoring webhooks code for better modularity. Continued from #8069 to bring back `ActiveWebhooks`, but this time actually handling active webhook calls in this class. ## Review / Merge checklist - [x] PR title and summary are descriptive --- packages/cli/src/AbstractServer.ts | 14 +- packages/cli/src/ActiveWebhooks.ts | 161 +++++++++++++++++++++++ packages/cli/src/ActiveWorkflowRunner.ts | 148 +-------------------- packages/cli/test/unit/webhooks.test.ts | 6 +- 4 files changed, 170 insertions(+), 159 deletions(-) create mode 100644 packages/cli/src/ActiveWebhooks.ts diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index c12bc3c424..71b4c75ea9 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -7,7 +7,6 @@ import isbot from 'isbot'; import config from '@/config'; import { N8N_VERSION, inDevelopment, inTest } from '@/constants'; -import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import * as Db from '@/Db'; import { N8nInstanceType } from '@/Interfaces'; import { ExternalHooks } from '@/ExternalHooks'; @@ -21,6 +20,7 @@ import { generateHostInstanceId } from './databases/utils/generators'; import { Logger } from '@/Logger'; import { ServiceUnavailableError } from './errors/response-errors/service-unavailable.error'; import { OnShutdown } from '@/decorators/OnShutdown'; +import { ActiveWebhooks } from '@/ActiveWebhooks'; @Service() export abstract class AbstractServer { @@ -32,8 +32,6 @@ export abstract class AbstractServer { protected externalHooks: ExternalHooks; - protected activeWorkflowRunner: ActiveWorkflowRunner; - protected protocol: string; protected sslKey: string; @@ -162,7 +160,6 @@ export abstract class AbstractServer { await new Promise((resolve) => this.server.listen(PORT, ADDRESS, () => resolve())); this.externalHooks = Container.get(ExternalHooks); - this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner); await this.setupHealthCheck(); @@ -179,16 +176,13 @@ export abstract class AbstractServer { // Setup webhook handlers before bodyParser, to let the Webhook node handle binary data in requests if (this.webhooksEnabled) { - const activeWorkflowRunner = Container.get(ActiveWorkflowRunner); + const activeWebhooks = Container.get(ActiveWebhooks); // Register a handler for active forms - this.app.all(`/${this.endpointForm}/:path(*)`, webhookRequestHandler(activeWorkflowRunner)); + this.app.all(`/${this.endpointForm}/:path(*)`, webhookRequestHandler(activeWebhooks)); // Register a handler for active webhooks - this.app.all( - `/${this.endpointWebhook}/:path(*)`, - webhookRequestHandler(activeWorkflowRunner), - ); + this.app.all(`/${this.endpointWebhook}/:path(*)`, webhookRequestHandler(activeWebhooks)); // Register a handler for waiting forms this.app.all( diff --git a/packages/cli/src/ActiveWebhooks.ts b/packages/cli/src/ActiveWebhooks.ts new file mode 100644 index 0000000000..77102eb247 --- /dev/null +++ b/packages/cli/src/ActiveWebhooks.ts @@ -0,0 +1,161 @@ +import { Service } from 'typedi'; +import type { Response } from 'express'; +import { Workflow, NodeHelpers } from 'n8n-workflow'; +import type { INode, IWebhookData, IHttpRequestMethods } from 'n8n-workflow'; + +import { WorkflowRepository } from '@db/repositories/workflow.repository'; +import type { + IResponseCallbackData, + IWebhookManager, + WebhookAccessControlOptions, + WebhookRequest, +} from '@/Interfaces'; +import { Logger } from '@/Logger'; +import { NodeTypes } from '@/NodeTypes'; +import { WebhookService } from '@/services/webhook.service'; +import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error'; +import { NotFoundError } from '@/errors/response-errors/not-found.error'; +import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; +import * as WebhookHelpers from '@/WebhookHelpers'; +import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service'; + +@Service() +export class ActiveWebhooks implements IWebhookManager { + constructor( + private readonly logger: Logger, + private readonly nodeTypes: NodeTypes, + private readonly webhookService: WebhookService, + private readonly workflowRepository: WorkflowRepository, + private readonly workflowStaticDataService: WorkflowStaticDataService, + ) {} + + async getWebhookMethods(path: string) { + return this.webhookService.getWebhookMethods(path); + } + + async findAccessControlOptions(path: string, httpMethod: IHttpRequestMethods) { + const webhook = await this.findWebhook(path, httpMethod); + + const workflowData = await this.workflowRepository.findOne({ + where: { id: webhook.workflowId }, + select: ['nodes'], + }); + + const nodes = workflowData?.nodes; + const webhookNode = nodes?.find( + ({ type, parameters, typeVersion }) => + parameters?.path === path && + (parameters?.httpMethod ?? 'GET') === httpMethod && + 'webhook' in this.nodeTypes.getByNameAndVersion(type, typeVersion), + ); + return webhookNode?.parameters?.options as WebhookAccessControlOptions; + } + + /** + * Checks if a webhook for the given method and path exists and executes the workflow. + */ + async executeWebhook( + request: WebhookRequest, + response: Response, + ): Promise { + const httpMethod = request.method; + const path = request.params.path; + + this.logger.debug(`Received webhook "${httpMethod}" for path "${path}"`); + + // Reset request parameters + request.params = {} as WebhookRequest['params']; + + const webhook = await this.findWebhook(path, httpMethod); + + if (webhook.isDynamic) { + const pathElements = path.split('/').slice(1); + + // extracting params from path + // @ts-ignore + webhook.webhookPath.split('/').forEach((ele, index) => { + if (ele.startsWith(':')) { + // write params to req.params + // @ts-ignore + request.params[ele.slice(1)] = pathElements[index]; + } + }); + } + + const workflowData = await this.workflowRepository.findOne({ + where: { id: webhook.workflowId }, + relations: ['shared', 'shared.user', 'shared.user.globalRole'], + }); + + if (workflowData === null) { + throw new NotFoundError(`Could not find workflow with id "${webhook.workflowId}"`); + } + + const workflow = new Workflow({ + id: webhook.workflowId, + name: workflowData.name, + nodes: workflowData.nodes, + connections: workflowData.connections, + active: workflowData.active, + nodeTypes: this.nodeTypes, + staticData: workflowData.staticData, + settings: workflowData.settings, + }); + + const additionalData = await WorkflowExecuteAdditionalData.getBase( + workflowData.shared[0].user.id, + ); + + const webhookData = NodeHelpers.getNodeWebhooks( + workflow, + workflow.getNode(webhook.node) as INode, + additionalData, + ).find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData; + + // Get the node which has the webhook defined to know where to start from and to + // get additional data + const workflowStartNode = workflow.getNode(webhookData.node); + + if (workflowStartNode === null) { + throw new NotFoundError('Could not find node to process webhook.'); + } + + return new Promise((resolve, reject) => { + const executionMode = 'webhook'; + void WebhookHelpers.executeWebhook( + workflow, + webhookData, + workflowData, + workflowStartNode, + executionMode, + undefined, + undefined, + undefined, + request, + response, + async (error: Error | null, data: object) => { + if (error !== null) { + return reject(error); + } + // Save static data if it changed + await this.workflowStaticDataService.saveStaticData(workflow); + resolve(data); + }, + ); + }); + } + + private async findWebhook(path: string, httpMethod: IHttpRequestMethods) { + // Remove trailing slash + if (path.endsWith('/')) { + path = path.slice(0, -1); + } + + const webhook = await this.webhookService.findWebhook(httpMethod, path); + if (webhook === null) { + throw new WebhookNotFoundError({ path, httpMethod }, { hint: 'production' }); + } + + return webhook; + } +} diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index d15dd9026d..d00056bb4b 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -20,11 +20,8 @@ import type { WorkflowActivateMode, WorkflowExecuteMode, INodeType, - IWebhookData, - IHttpRequestMethods, } from 'n8n-workflow'; import { - NodeHelpers, Workflow, WorkflowActivationError, ErrorReporterProxy as ErrorReporter, @@ -32,16 +29,7 @@ import { ApplicationError, } from 'n8n-workflow'; -import type express from 'express'; - -import type { - IResponseCallbackData, - IWebhookManager, - IWorkflowDb, - IWorkflowExecutionDataProcess, - WebhookAccessControlOptions, - WebhookRequest, -} from '@/Interfaces'; +import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces'; import * as WebhookHelpers from '@/WebhookHelpers'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; @@ -56,13 +44,11 @@ import { import { NodeTypes } from '@/NodeTypes'; import { WorkflowRunner } from '@/WorkflowRunner'; import { ExternalHooks } from '@/ExternalHooks'; -import { WebhookNotFoundError } from './errors/response-errors/webhook-not-found.error'; import { WebhookService } from './services/webhook.service'; import { Logger } from './Logger'; import { WorkflowRepository } from '@db/repositories/workflow.repository'; import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee'; import { ActivationErrorsService } from '@/ActivationErrors.service'; -import { NotFoundError } from './errors/response-errors/not-found.error'; import { ActiveWorkflowsService } from '@/services/activeWorkflows.service'; import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service'; import { OnShutdown } from '@/decorators/OnShutdown'; @@ -75,7 +61,7 @@ interface QueuedActivation { } @Service() -export class ActiveWorkflowRunner implements IWebhookManager { +export class ActiveWorkflowRunner { private queuedActivations: { [workflowId: string]: QueuedActivation } = {}; constructor( @@ -128,136 +114,6 @@ export class ActiveWorkflowRunner implements IWebhookManager { await Promise.all(removePromises); } - /** - * Checks if a webhook for the given method and path exists and executes the workflow. - */ - async executeWebhook( - request: WebhookRequest, - response: express.Response, - ): Promise { - const httpMethod = request.method; - const path = request.params.path; - - this.logger.debug(`Received webhook "${httpMethod}" for path "${path}"`); - - // Reset request parameters - request.params = {} as WebhookRequest['params']; - - const webhook = await this.findWebhook(path, httpMethod); - - if (webhook.isDynamic) { - const pathElements = path.split('/').slice(1); - - // extracting params from path - // @ts-ignore - webhook.webhookPath.split('/').forEach((ele, index) => { - if (ele.startsWith(':')) { - // write params to req.params - // @ts-ignore - request.params[ele.slice(1)] = pathElements[index]; - } - }); - } - - const workflowData = await this.workflowRepository.findOne({ - where: { id: webhook.workflowId }, - relations: ['shared', 'shared.user', 'shared.user.globalRole'], - }); - - if (workflowData === null) { - throw new NotFoundError(`Could not find workflow with id "${webhook.workflowId}"`); - } - - const workflow = new Workflow({ - id: webhook.workflowId, - name: workflowData.name, - nodes: workflowData.nodes, - connections: workflowData.connections, - active: workflowData.active, - nodeTypes: this.nodeTypes, - staticData: workflowData.staticData, - settings: workflowData.settings, - }); - - const additionalData = await WorkflowExecuteAdditionalData.getBase( - workflowData.shared[0].user.id, - ); - - const webhookData = NodeHelpers.getNodeWebhooks( - workflow, - workflow.getNode(webhook.node) as INode, - additionalData, - ).find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData; - - // Get the node which has the webhook defined to know where to start from and to - // get additional data - const workflowStartNode = workflow.getNode(webhookData.node); - - if (workflowStartNode === null) { - throw new NotFoundError('Could not find node to process webhook.'); - } - - return new Promise((resolve, reject) => { - const executionMode = 'webhook'; - void WebhookHelpers.executeWebhook( - workflow, - webhookData, - workflowData, - workflowStartNode, - executionMode, - undefined, - undefined, - undefined, - request, - response, - async (error: Error | null, data: object) => { - if (error !== null) { - return reject(error); - } - // Save static data if it changed - await this.workflowStaticDataService.saveStaticData(workflow); - resolve(data); - }, - ); - }); - } - - async getWebhookMethods(path: string) { - return this.webhookService.getWebhookMethods(path); - } - - async findAccessControlOptions(path: string, httpMethod: IHttpRequestMethods) { - const webhook = await this.findWebhook(path, httpMethod); - - const workflowData = await this.workflowRepository.findOne({ - where: { id: webhook.workflowId }, - select: ['nodes'], - }); - - const nodes = workflowData?.nodes; - const webhookNode = nodes?.find( - ({ type, parameters, typeVersion }) => - parameters?.path === path && - (parameters?.httpMethod ?? 'GET') === httpMethod && - 'webhook' in this.nodeTypes.getByNameAndVersion(type, typeVersion), - ); - return webhookNode?.parameters?.options as WebhookAccessControlOptions; - } - - private async findWebhook(path: string, httpMethod: IHttpRequestMethods) { - // Remove trailing slash - if (path.endsWith('/')) { - path = path.slice(0, -1); - } - - const webhook = await this.webhookService.findWebhook(httpMethod, path); - if (webhook === null) { - throw new WebhookNotFoundError({ path, httpMethod }, { hint: 'production' }); - } - - return webhook; - } - /** * Returns the ids of the currently active workflows from memory. */ diff --git a/packages/cli/test/unit/webhooks.test.ts b/packages/cli/test/unit/webhooks.test.ts index b2b8207697..3070a5d478 100644 --- a/packages/cli/test/unit/webhooks.test.ts +++ b/packages/cli/test/unit/webhooks.test.ts @@ -4,7 +4,7 @@ import { mock } from 'jest-mock-extended'; import config from '@/config'; import { AbstractServer } from '@/AbstractServer'; -import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; +import { ActiveWebhooks } from '@/ActiveWebhooks'; import { ExternalHooks } from '@/ExternalHooks'; import { InternalHooks } from '@/InternalHooks'; import { TestWebhooks } from '@/TestWebhooks'; @@ -22,7 +22,7 @@ describe('WebhookServer', () => { describe('CORS', () => { const corsOrigin = 'https://example.com'; - const activeWorkflowRunner = mockInstance(ActiveWorkflowRunner); + const activeWebhooks = mockInstance(ActiveWebhooks); const testWebhooks = mockInstance(TestWebhooks); mockInstance(WaitingWebhooks); mockInstance(WaitingForms); @@ -36,7 +36,7 @@ describe('WebhookServer', () => { }); const tests = [ - ['webhook', activeWorkflowRunner], + ['webhook', activeWebhooks], ['webhookTest', testWebhooks], // TODO: enable webhookWaiting & waitingForms after CORS support is added // ['webhookWaiting', waitingWebhooks],