refactor(core): Extract ActiveWebhooks out of ActiveWorkflowRunner (no-changelog) (#8171)

## Summary
This PR continues refactoring webhooks code for better modularity.
Continued from #8069 to bring back `ActiveWebhooks`, but this time
actually handling active webhook calls in this class.

## Review / Merge checklist
- [x] PR title and summary are descriptive
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-12-28 10:04:32 +01:00 committed by GitHub
parent 68cff4c59e
commit c84d3c3bbf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 170 additions and 159 deletions

View file

@ -7,7 +7,6 @@ import isbot from 'isbot';
import config from '@/config';
import { N8N_VERSION, inDevelopment, inTest } from '@/constants';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as Db from '@/Db';
import { N8nInstanceType } from '@/Interfaces';
import { ExternalHooks } from '@/ExternalHooks';
@ -21,6 +20,7 @@ import { generateHostInstanceId } from './databases/utils/generators';
import { Logger } from '@/Logger';
import { ServiceUnavailableError } from './errors/response-errors/service-unavailable.error';
import { OnShutdown } from '@/decorators/OnShutdown';
import { ActiveWebhooks } from '@/ActiveWebhooks';
@Service()
export abstract class AbstractServer {
@ -32,8 +32,6 @@ export abstract class AbstractServer {
protected externalHooks: ExternalHooks;
protected activeWorkflowRunner: ActiveWorkflowRunner;
protected protocol: string;
protected sslKey: string;
@ -162,7 +160,6 @@ export abstract class AbstractServer {
await new Promise<void>((resolve) => this.server.listen(PORT, ADDRESS, () => resolve()));
this.externalHooks = Container.get(ExternalHooks);
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
await this.setupHealthCheck();
@ -179,16 +176,13 @@ export abstract class AbstractServer {
// Setup webhook handlers before bodyParser, to let the Webhook node handle binary data in requests
if (this.webhooksEnabled) {
const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
const activeWebhooks = Container.get(ActiveWebhooks);
// Register a handler for active forms
this.app.all(`/${this.endpointForm}/:path(*)`, webhookRequestHandler(activeWorkflowRunner));
this.app.all(`/${this.endpointForm}/:path(*)`, webhookRequestHandler(activeWebhooks));
// Register a handler for active webhooks
this.app.all(
`/${this.endpointWebhook}/:path(*)`,
webhookRequestHandler(activeWorkflowRunner),
);
this.app.all(`/${this.endpointWebhook}/:path(*)`, webhookRequestHandler(activeWebhooks));
// Register a handler for waiting forms
this.app.all(

View file

@ -0,0 +1,161 @@
import { Service } from 'typedi';
import type { Response } from 'express';
import { Workflow, NodeHelpers } from 'n8n-workflow';
import type { INode, IWebhookData, IHttpRequestMethods } from 'n8n-workflow';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import type {
IResponseCallbackData,
IWebhookManager,
WebhookAccessControlOptions,
WebhookRequest,
} from '@/Interfaces';
import { Logger } from '@/Logger';
import { NodeTypes } from '@/NodeTypes';
import { WebhookService } from '@/services/webhook.service';
import { WebhookNotFoundError } from '@/errors/response-errors/webhook-not-found.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import * as WebhookHelpers from '@/WebhookHelpers';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
@Service()
export class ActiveWebhooks implements IWebhookManager {
constructor(
private readonly logger: Logger,
private readonly nodeTypes: NodeTypes,
private readonly webhookService: WebhookService,
private readonly workflowRepository: WorkflowRepository,
private readonly workflowStaticDataService: WorkflowStaticDataService,
) {}
async getWebhookMethods(path: string) {
return this.webhookService.getWebhookMethods(path);
}
async findAccessControlOptions(path: string, httpMethod: IHttpRequestMethods) {
const webhook = await this.findWebhook(path, httpMethod);
const workflowData = await this.workflowRepository.findOne({
where: { id: webhook.workflowId },
select: ['nodes'],
});
const nodes = workflowData?.nodes;
const webhookNode = nodes?.find(
({ type, parameters, typeVersion }) =>
parameters?.path === path &&
(parameters?.httpMethod ?? 'GET') === httpMethod &&
'webhook' in this.nodeTypes.getByNameAndVersion(type, typeVersion),
);
return webhookNode?.parameters?.options as WebhookAccessControlOptions;
}
/**
* Checks if a webhook for the given method and path exists and executes the workflow.
*/
async executeWebhook(
request: WebhookRequest,
response: Response,
): Promise<IResponseCallbackData> {
const httpMethod = request.method;
const path = request.params.path;
this.logger.debug(`Received webhook "${httpMethod}" for path "${path}"`);
// Reset request parameters
request.params = {} as WebhookRequest['params'];
const webhook = await this.findWebhook(path, httpMethod);
if (webhook.isDynamic) {
const pathElements = path.split('/').slice(1);
// extracting params from path
// @ts-ignore
webhook.webhookPath.split('/').forEach((ele, index) => {
if (ele.startsWith(':')) {
// write params to req.params
// @ts-ignore
request.params[ele.slice(1)] = pathElements[index];
}
});
}
const workflowData = await this.workflowRepository.findOne({
where: { id: webhook.workflowId },
relations: ['shared', 'shared.user', 'shared.user.globalRole'],
});
if (workflowData === null) {
throw new NotFoundError(`Could not find workflow with id "${webhook.workflowId}"`);
}
const workflow = new Workflow({
id: webhook.workflowId,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes: this.nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});
const additionalData = await WorkflowExecuteAdditionalData.getBase(
workflowData.shared[0].user.id,
);
const webhookData = NodeHelpers.getNodeWebhooks(
workflow,
workflow.getNode(webhook.node) as INode,
additionalData,
).find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData;
// Get the node which has the webhook defined to know where to start from and to
// get additional data
const workflowStartNode = workflow.getNode(webhookData.node);
if (workflowStartNode === null) {
throw new NotFoundError('Could not find node to process webhook.');
}
return new Promise((resolve, reject) => {
const executionMode = 'webhook';
void WebhookHelpers.executeWebhook(
workflow,
webhookData,
workflowData,
workflowStartNode,
executionMode,
undefined,
undefined,
undefined,
request,
response,
async (error: Error | null, data: object) => {
if (error !== null) {
return reject(error);
}
// Save static data if it changed
await this.workflowStaticDataService.saveStaticData(workflow);
resolve(data);
},
);
});
}
private async findWebhook(path: string, httpMethod: IHttpRequestMethods) {
// Remove trailing slash
if (path.endsWith('/')) {
path = path.slice(0, -1);
}
const webhook = await this.webhookService.findWebhook(httpMethod, path);
if (webhook === null) {
throw new WebhookNotFoundError({ path, httpMethod }, { hint: 'production' });
}
return webhook;
}
}

View file

@ -20,11 +20,8 @@ import type {
WorkflowActivateMode,
WorkflowExecuteMode,
INodeType,
IWebhookData,
IHttpRequestMethods,
} from 'n8n-workflow';
import {
NodeHelpers,
Workflow,
WorkflowActivationError,
ErrorReporterProxy as ErrorReporter,
@ -32,16 +29,7 @@ import {
ApplicationError,
} from 'n8n-workflow';
import type express from 'express';
import type {
IResponseCallbackData,
IWebhookManager,
IWorkflowDb,
IWorkflowExecutionDataProcess,
WebhookAccessControlOptions,
WebhookRequest,
} from '@/Interfaces';
import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
@ -56,13 +44,11 @@ import {
import { NodeTypes } from '@/NodeTypes';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExternalHooks } from '@/ExternalHooks';
import { WebhookNotFoundError } from './errors/response-errors/webhook-not-found.error';
import { WebhookService } from './services/webhook.service';
import { Logger } from './Logger';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { ActivationErrorsService } from '@/ActivationErrors.service';
import { NotFoundError } from './errors/response-errors/not-found.error';
import { ActiveWorkflowsService } from '@/services/activeWorkflows.service';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
import { OnShutdown } from '@/decorators/OnShutdown';
@ -75,7 +61,7 @@ interface QueuedActivation {
}
@Service()
export class ActiveWorkflowRunner implements IWebhookManager {
export class ActiveWorkflowRunner {
private queuedActivations: { [workflowId: string]: QueuedActivation } = {};
constructor(
@ -128,136 +114,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
await Promise.all(removePromises);
}
/**
* Checks if a webhook for the given method and path exists and executes the workflow.
*/
async executeWebhook(
request: WebhookRequest,
response: express.Response,
): Promise<IResponseCallbackData> {
const httpMethod = request.method;
const path = request.params.path;
this.logger.debug(`Received webhook "${httpMethod}" for path "${path}"`);
// Reset request parameters
request.params = {} as WebhookRequest['params'];
const webhook = await this.findWebhook(path, httpMethod);
if (webhook.isDynamic) {
const pathElements = path.split('/').slice(1);
// extracting params from path
// @ts-ignore
webhook.webhookPath.split('/').forEach((ele, index) => {
if (ele.startsWith(':')) {
// write params to req.params
// @ts-ignore
request.params[ele.slice(1)] = pathElements[index];
}
});
}
const workflowData = await this.workflowRepository.findOne({
where: { id: webhook.workflowId },
relations: ['shared', 'shared.user', 'shared.user.globalRole'],
});
if (workflowData === null) {
throw new NotFoundError(`Could not find workflow with id "${webhook.workflowId}"`);
}
const workflow = new Workflow({
id: webhook.workflowId,
name: workflowData.name,
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes: this.nodeTypes,
staticData: workflowData.staticData,
settings: workflowData.settings,
});
const additionalData = await WorkflowExecuteAdditionalData.getBase(
workflowData.shared[0].user.id,
);
const webhookData = NodeHelpers.getNodeWebhooks(
workflow,
workflow.getNode(webhook.node) as INode,
additionalData,
).find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData;
// Get the node which has the webhook defined to know where to start from and to
// get additional data
const workflowStartNode = workflow.getNode(webhookData.node);
if (workflowStartNode === null) {
throw new NotFoundError('Could not find node to process webhook.');
}
return new Promise((resolve, reject) => {
const executionMode = 'webhook';
void WebhookHelpers.executeWebhook(
workflow,
webhookData,
workflowData,
workflowStartNode,
executionMode,
undefined,
undefined,
undefined,
request,
response,
async (error: Error | null, data: object) => {
if (error !== null) {
return reject(error);
}
// Save static data if it changed
await this.workflowStaticDataService.saveStaticData(workflow);
resolve(data);
},
);
});
}
async getWebhookMethods(path: string) {
return this.webhookService.getWebhookMethods(path);
}
async findAccessControlOptions(path: string, httpMethod: IHttpRequestMethods) {
const webhook = await this.findWebhook(path, httpMethod);
const workflowData = await this.workflowRepository.findOne({
where: { id: webhook.workflowId },
select: ['nodes'],
});
const nodes = workflowData?.nodes;
const webhookNode = nodes?.find(
({ type, parameters, typeVersion }) =>
parameters?.path === path &&
(parameters?.httpMethod ?? 'GET') === httpMethod &&
'webhook' in this.nodeTypes.getByNameAndVersion(type, typeVersion),
);
return webhookNode?.parameters?.options as WebhookAccessControlOptions;
}
private async findWebhook(path: string, httpMethod: IHttpRequestMethods) {
// Remove trailing slash
if (path.endsWith('/')) {
path = path.slice(0, -1);
}
const webhook = await this.webhookService.findWebhook(httpMethod, path);
if (webhook === null) {
throw new WebhookNotFoundError({ path, httpMethod }, { hint: 'production' });
}
return webhook;
}
/**
* Returns the ids of the currently active workflows from memory.
*/

View file

@ -4,7 +4,7 @@ import { mock } from 'jest-mock-extended';
import config from '@/config';
import { AbstractServer } from '@/AbstractServer';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWebhooks } from '@/ActiveWebhooks';
import { ExternalHooks } from '@/ExternalHooks';
import { InternalHooks } from '@/InternalHooks';
import { TestWebhooks } from '@/TestWebhooks';
@ -22,7 +22,7 @@ describe('WebhookServer', () => {
describe('CORS', () => {
const corsOrigin = 'https://example.com';
const activeWorkflowRunner = mockInstance(ActiveWorkflowRunner);
const activeWebhooks = mockInstance(ActiveWebhooks);
const testWebhooks = mockInstance(TestWebhooks);
mockInstance(WaitingWebhooks);
mockInstance(WaitingForms);
@ -36,7 +36,7 @@ describe('WebhookServer', () => {
});
const tests = [
['webhook', activeWorkflowRunner],
['webhook', activeWebhooks],
['webhookTest', testWebhooks],
// TODO: enable webhookWaiting & waitingForms after CORS support is added
// ['webhookWaiting', waitingWebhooks],