diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index 94d0aa80b6..f9f27f6e50 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -1,6 +1,6 @@ -/* eslint-disable @typescript-eslint/no-unnecessary-type-assertion */ - -import { Container, Service } from 'typedi'; +import { Service } from 'typedi'; +import type { ChildProcess } from 'child_process'; +import type PCancelable from 'p-cancelable'; import type { IDeferredPromise, IExecuteResponsePromiseData, @@ -9,8 +9,6 @@ import type { } from 'n8n-workflow'; import { ApplicationError, WorkflowOperationError, createDeferredPromise } from 'n8n-workflow'; -import type { ChildProcess } from 'child_process'; -import type PCancelable from 'p-cancelable'; import type { ExecutionPayload, IExecutingWorkflowData, @@ -28,7 +26,10 @@ export class ActiveExecutions { [index: string]: IExecutingWorkflowData; } = {}; - constructor(private readonly logger: Logger) {} + constructor( + private readonly logger: Logger, + private readonly executionRepository: ExecutionRepository, + ) {} /** * Add a new active execution @@ -61,7 +62,7 @@ export class ActiveExecutions { fullExecutionData.workflowId = workflowId; } - executionId = await Container.get(ExecutionRepository).createNewExecution(fullExecutionData); + executionId = await this.executionRepository.createNewExecution(fullExecutionData); if (executionId === undefined) { throw new ApplicationError('There was an issue assigning an execution id to the execution'); } @@ -76,7 +77,7 @@ export class ActiveExecutions { status: executionStatus, }; - await Container.get(ExecutionRepository).updateExistingExecution(executionId, execution); + await this.executionRepository.updateExistingExecution(executionId, execution); } this.activeExecutions[executionId] = { @@ -96,34 +97,33 @@ export class ActiveExecutions { */ attachWorkflowExecution(executionId: string, workflowExecution: PCancelable) { - if (this.activeExecutions[executionId] === undefined) { + const execution = this.activeExecutions[executionId]; + if (execution === undefined) { throw new ApplicationError('No active execution found to attach to workflow execution to', { extra: { executionId }, }); } - this.activeExecutions[executionId].workflowExecution = workflowExecution; + execution.workflowExecution = workflowExecution; } attachResponsePromise( executionId: string, responsePromise: IDeferredPromise, ): void { - if (this.activeExecutions[executionId] === undefined) { + const execution = this.activeExecutions[executionId]; + if (execution === undefined) { throw new ApplicationError('No active execution found to attach to workflow execution to', { extra: { executionId }, }); } - this.activeExecutions[executionId].responsePromise = responsePromise; + execution.responsePromise = responsePromise; } resolveResponsePromise(executionId: string, response: IExecuteResponsePromiseData): void { - if (this.activeExecutions[executionId] === undefined) { - return; - } - - this.activeExecutions[executionId].responsePromise?.resolve(response); + const execution = this.activeExecutions[executionId]; + execution?.responsePromise?.resolve(response); } getPostExecutePromiseCount(executionId: string): number { @@ -135,13 +135,14 @@ export class ActiveExecutions { * */ remove(executionId: string, fullRunData?: IRun): void { - if (this.activeExecutions[executionId] === undefined) { + const execution = this.activeExecutions[executionId]; + if (execution === undefined) { return; } // Resolve all the waiting promises - for (const promise of this.activeExecutions[executionId].postExecutePromises) { + for (const promise of execution.postExecutePromises) { promise.resolve(fullRunData); } @@ -156,26 +157,27 @@ export class ActiveExecutions { * @param {string} timeout String 'timeout' given if stop due to timeout */ async stopExecution(executionId: string, timeout?: string): Promise { - if (this.activeExecutions[executionId] === undefined) { + const execution = this.activeExecutions[executionId]; + if (execution === undefined) { // There is no execution running with that id return; } // In case something goes wrong make sure that promise gets first // returned that it gets then also resolved correctly. - if (this.activeExecutions[executionId].process !== undefined) { + if (execution.process !== undefined) { // Workflow is running in subprocess - if (this.activeExecutions[executionId].process!.connected) { + if (execution.process.connected) { setTimeout(() => { // execute on next event loop tick; - this.activeExecutions[executionId].process!.send({ + execution.process!.send({ type: timeout || 'stopExecution', }); }, 1); } } else { // Workflow is running in current process - this.activeExecutions[executionId].workflowExecution!.cancel(); + execution.workflowExecution!.cancel(); } return await this.getPostExecutePromise(executionId); @@ -188,14 +190,15 @@ export class ActiveExecutions { * @param {string} executionId The id of the execution to wait for */ async getPostExecutePromise(executionId: string): Promise { - if (this.activeExecutions[executionId] === undefined) { + const execution = this.activeExecutions[executionId]; + if (execution === undefined) { throw new WorkflowOperationError(`There is no active execution with id "${executionId}".`); } // Create the promise which will be resolved when the execution finished const waitPromise = await createDeferredPromise(); - this.activeExecutions[executionId].postExecutePromises.push(waitPromise); + execution.postExecutePromises.push(waitPromise); return await waitPromise.promise(); } @@ -213,10 +216,10 @@ export class ActiveExecutions { data = this.activeExecutions[id]; returnData.push({ id, - retryOf: data.executionData.retryOf as string | undefined, + retryOf: data.executionData.retryOf, startedAt: data.startedAt, mode: data.executionData.executionMode, - workflowId: data.executionData.workflowData.id! as string, + workflowId: data.executionData.workflowData.id, status: data.status, }); } @@ -225,21 +228,19 @@ export class ActiveExecutions { } async setStatus(executionId: string, status: ExecutionStatus): Promise { - if (this.activeExecutions[executionId] === undefined) { + const execution = this.activeExecutions[executionId]; + if (execution === undefined) { this.logger.debug( `There is no active execution with id "${executionId}", can't update status to ${status}.`, ); return; } - this.activeExecutions[executionId].status = status; + execution.status = status; } getStatus(executionId: string): ExecutionStatus { - if (this.activeExecutions[executionId] === undefined) { - return 'unknown'; - } - - return this.activeExecutions[executionId].status; + const execution = this.activeExecutions[executionId]; + return execution?.status ?? 'unknown'; } } diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index ad19b09d18..ffeb8876d3 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -7,16 +7,14 @@ import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core'; import type { ExecutionError, IDeferredPromise, - IExecuteData, IExecuteResponsePromiseData, IGetExecutePollFunctions, IGetExecuteTriggerFunctions, INode, INodeExecutionData, IRun, - IRunExecutionData, IWorkflowBase, - IWorkflowExecuteAdditionalData as IWorkflowExecuteAdditionalDataWorkflow, + IWorkflowExecuteAdditionalData, WorkflowActivateMode, WorkflowExecuteMode, INodeType, @@ -29,7 +27,7 @@ import { ApplicationError, } from 'n8n-workflow'; -import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces'; +import type { IWorkflowDb } from '@/Interfaces'; import * as WebhookHelpers from '@/WebhookHelpers'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; @@ -42,7 +40,6 @@ import { WORKFLOW_REACTIVATE_MAX_TIMEOUT, } from '@/constants'; import { NodeTypes } from '@/NodeTypes'; -import { WorkflowRunner } from '@/WorkflowRunner'; import { ExternalHooks } from '@/ExternalHooks'; import { WebhookService } from './services/webhook.service'; import { Logger } from './Logger'; @@ -50,6 +47,7 @@ import { WorkflowRepository } from '@db/repositories/workflow.repository'; import { OrchestrationService } from '@/services/orchestration.service'; import { ActivationErrorsService } from '@/ActivationErrors.service'; import { ActiveWorkflowsService } from '@/services/activeWorkflows.service'; +import { WorkflowExecutionService } from '@/workflows/workflowExecution.service'; import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service'; import { OnShutdown } from '@/decorators/OnShutdown'; @@ -77,6 +75,7 @@ export class ActiveWorkflowRunner { private readonly executionService: ExecutionService, private readonly workflowStaticDataService: WorkflowStaticDataService, private readonly activeWorkflowsService: ActiveWorkflowsService, + private readonly workflowExecutionService: WorkflowExecutionService, ) {} async init() { @@ -141,7 +140,7 @@ export class ActiveWorkflowRunner { */ async addWebhooks( workflow: Workflow, - additionalData: IWorkflowExecuteAdditionalDataWorkflow, + additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, activation: WorkflowActivateMode, ) { @@ -264,57 +263,13 @@ export class ActiveWorkflowRunner { await this.webhookService.deleteWorkflowWebhooks(workflowId); } - async runWorkflow( - workflowData: IWorkflowDb, - node: INode, - data: INodeExecutionData[][], - additionalData: IWorkflowExecuteAdditionalDataWorkflow, - mode: WorkflowExecuteMode, - responsePromise?: IDeferredPromise, - ) { - const nodeExecutionStack: IExecuteData[] = [ - { - node, - data: { - main: data, - }, - source: null, - }, - ]; - - const executionData: IRunExecutionData = { - startData: {}, - resultData: { - runData: {}, - }, - executionData: { - contextData: {}, - metadata: {}, - nodeExecutionStack, - waitingExecution: {}, - waitingExecutionSource: {}, - }, - }; - - // Start the workflow - const runData: IWorkflowExecutionDataProcess = { - userId: additionalData.userId, - executionMode: mode, - executionData, - workflowData, - }; - - const workflowRunner = new WorkflowRunner(); - return await workflowRunner.run(runData, true, undefined, undefined, responsePromise); - } - /** * Return poll function which gets the global functions from n8n-core * and overwrites the emit to be able to start it in subprocess */ getExecutePollFunctions( workflowData: IWorkflowDb, - additionalData: IWorkflowExecuteAdditionalDataWorkflow, + additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, activation: WorkflowActivateMode, ): IGetExecutePollFunctions { @@ -333,7 +288,7 @@ export class ActiveWorkflowRunner { ): void => { this.logger.debug(`Received event to trigger execution for workflow "${workflow.name}"`); void this.workflowStaticDataService.saveStaticData(workflow); - const executePromise = this.runWorkflow( + const executePromise = this.workflowExecutionService.runWorkflow( workflowData, node, data, @@ -371,7 +326,7 @@ export class ActiveWorkflowRunner { */ getExecuteTriggerFunctions( workflowData: IWorkflowDb, - additionalData: IWorkflowExecuteAdditionalDataWorkflow, + additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, activation: WorkflowActivateMode, ): IGetExecuteTriggerFunctions { @@ -391,7 +346,7 @@ export class ActiveWorkflowRunner { this.logger.debug(`Received trigger for workflow "${workflow.name}"`); void this.workflowStaticDataService.saveStaticData(workflow); - const executePromise = this.runWorkflow( + const executePromise = this.workflowExecutionService.runWorkflow( workflowData, node, data, @@ -659,10 +614,7 @@ export class ActiveWorkflowRunner { /** * Count all triggers in the workflow, excluding Manual Trigger. */ - private countTriggers( - workflow: Workflow, - additionalData: IWorkflowExecuteAdditionalDataWorkflow, - ) { + private countTriggers(workflow: Workflow, additionalData: IWorkflowExecuteAdditionalData) { const triggerFilter = (nodeType: INodeType) => !!nodeType.trigger && !nodeType.description.name.includes('manualTrigger'); @@ -796,7 +748,7 @@ export class ActiveWorkflowRunner { }: { activationMode: WorkflowActivateMode; executionMode: WorkflowExecuteMode; - additionalData: IWorkflowExecuteAdditionalDataWorkflow; + additionalData: IWorkflowExecuteAdditionalData; }, ) { const getTriggerFunctions = this.getExecuteTriggerFunctions( diff --git a/packages/cli/src/WaitTracker.ts b/packages/cli/src/WaitTracker.ts index b39222bc63..9acb96cfc0 100644 --- a/packages/cli/src/WaitTracker.ts +++ b/packages/cli/src/WaitTracker.ts @@ -25,6 +25,7 @@ export class WaitTracker { private readonly logger: Logger, private readonly executionRepository: ExecutionRepository, private readonly ownershipService: OwnershipService, + private readonly workflowRunner: WorkflowRunner, ) { // Poll every 60 seconds a list of upcoming executions this.mainTimer = setInterval(() => { @@ -163,8 +164,7 @@ export class WaitTracker { }; // Start the execution again - const workflowRunner = new WorkflowRunner(); - await workflowRunner.run(data, false, false, executionId); + await this.workflowRunner.run(data, false, false, executionId); })().catch((error: Error) => { ErrorReporter.error(error); this.logger.error( diff --git a/packages/cli/src/WebhookHelpers.ts b/packages/cli/src/WebhookHelpers.ts index 538030ecd1..999740a19b 100644 --- a/packages/cli/src/WebhookHelpers.ts +++ b/packages/cli/src/WebhookHelpers.ts @@ -590,8 +590,7 @@ export async function executeWebhook( } // Start now to run the workflow - const workflowRunner = new WorkflowRunner(); - executionId = await workflowRunner.run( + executionId = await Container.get(WorkflowRunner).run( runData, true, !didSendResponse, diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 5c25dbff3f..ebd5a5a0cc 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -2,7 +2,7 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-shadow */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -import { Container } from 'typedi'; +import { Container, Service } from 'typedi'; import type { IProcessMessage } from 'n8n-core'; import { WorkflowExecute } from 'n8n-core'; @@ -46,36 +46,37 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData' import { generateFailedExecutionFromError } from '@/WorkflowHelpers'; import { initErrorHandling } from '@/ErrorReporting'; import { PermissionChecker } from '@/UserManagement/PermissionChecker'; -import { Push } from '@/push'; import { InternalHooks } from '@/InternalHooks'; import { Logger } from '@/Logger'; import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service'; +@Service() export class WorkflowRunner { - logger: Logger; + private jobQueue: Queue; - activeExecutions: ActiveExecutions; + private executionsMode = config.getEnv('executions.mode'); - push: Push; + private executionsProcess = config.getEnv('executions.process'); - jobQueue: Queue; + constructor( + private readonly logger: Logger, + private readonly activeExecutions: ActiveExecutions, + private readonly executionRepository: ExecutionRepository, + private readonly externalHooks: ExternalHooks, + private readonly workflowStaticDataService: WorkflowStaticDataService, + private readonly nodeTypes: NodeTypes, + private readonly permissionChecker: PermissionChecker, + ) {} - constructor() { - this.logger = Container.get(Logger); - this.push = Container.get(Push); - this.activeExecutions = Container.get(ActiveExecutions); - } - - /** - * The process did send a hook message so execute the appropriate hook - */ - async processHookMessage(workflowHooks: WorkflowHooks, hookData: IProcessMessageDataHook) { + /** The process did send a hook message so execute the appropriate hook */ + private async processHookMessage( + workflowHooks: WorkflowHooks, + hookData: IProcessMessageDataHook, + ) { await workflowHooks.executeHookFunctions(hookData.hook, hookData.parameters); } - /** - * The process did error - */ + /** The process did error */ async processError( error: ExecutionError, startedAt: Date, @@ -91,12 +92,9 @@ export class WorkflowRunner { // by Bull even though it executed successfully, see https://github.com/OptimalBits/bull/issues/1415 if (isQueueMode && executionMode !== 'manual') { - const executionWithoutData = await Container.get(ExecutionRepository).findSingleExecution( - executionId, - { - includeData: false, - }, - ); + const executionWithoutData = await this.executionRepository.findSingleExecution(executionId, { + includeData: false, + }); if (executionWithoutData?.finished === true && executionWithoutData?.status === 'success') { // false positive, execution was successful return; @@ -140,12 +138,9 @@ export class WorkflowRunner { } } - const executionFlattedData = await Container.get(ExecutionRepository).findSingleExecution( - executionId, - { - includeData: true, - }, - ); + const executionFlattedData = await this.executionRepository.findSingleExecution(executionId, { + includeData: true, + }); if (executionFlattedData) { void Container.get(InternalHooks).onWorkflowCrashed( @@ -169,12 +164,7 @@ export class WorkflowRunner { } } - /** - * Run the workflow - * - * @param {boolean} [loadStaticData] If set will the static data be loaded from - * the workflow and added to input data - */ + /** Run the workflow */ async run( data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, @@ -182,16 +172,13 @@ export class WorkflowRunner { executionId?: string, responsePromise?: IDeferredPromise, ): Promise { - const executionsMode = config.getEnv('executions.mode'); - const executionsProcess = config.getEnv('executions.process'); - await initErrorHandling(); - if (executionsMode === 'queue') { + if (this.executionsMode === 'queue') { this.jobQueue = Container.get(Queue); } - if (executionsMode === 'queue' && data.executionMode !== 'manual') { + if (this.executionsMode === 'queue' && data.executionMode !== 'manual') { // Do not run "manual" executions in bull because sending events to the // frontend would not be possible executionId = await this.enqueueExecution( @@ -202,7 +189,7 @@ export class WorkflowRunner { responsePromise, ); } else { - if (executionsProcess === 'main') { + if (this.executionsProcess === 'main') { executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise); } else { executionId = await this.runSubprocess(data, loadStaticData, executionId, responsePromise); @@ -213,12 +200,11 @@ export class WorkflowRunner { // only run these when not in queue mode or when the execution is manual, // since these calls are now done by the worker directly if ( - executionsMode !== 'queue' || + this.executionsMode !== 'queue' || config.getEnv('generic.instanceType') === 'worker' || data.executionMode === 'manual' ) { const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId); - const externalHooks = Container.get(ExternalHooks); postExecutePromise .then(async (executionData) => { void Container.get(InternalHooks).onWorkflowPostExecute( @@ -227,9 +213,9 @@ export class WorkflowRunner { executionData, data.userId, ); - if (externalHooks.exists('workflow.postExecute')) { + if (this.externalHooks.exists('workflow.postExecute')) { try { - await externalHooks.run('workflow.postExecute', [ + await this.externalHooks.run('workflow.postExecute', [ executionData, data.workflowData, executionId, @@ -249,13 +235,8 @@ export class WorkflowRunner { return executionId; } - /** - * Run the workflow in current process - * - * @param {boolean} [loadStaticData] If set will the static data be loaded from - * the workflow and added to input data - */ - async runMainProcess( + /** Run the workflow in current process */ + private async runMainProcess( data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, restartExecutionId?: string, @@ -264,11 +245,9 @@ export class WorkflowRunner { const workflowId = data.workflowData.id; if (loadStaticData === true && workflowId) { data.workflowData.staticData = - await Container.get(WorkflowStaticDataService).getStaticDataById(workflowId); + await this.workflowStaticDataService.getStaticDataById(workflowId); } - const nodeTypes = Container.get(NodeTypes); - // Soft timeout to stop workflow execution after current running node // Changes were made by adding the `workflowTimeout` to the `additionalData` // So that the timeout will also work for executions with nested workflows. @@ -291,7 +270,7 @@ export class WorkflowRunner { nodes: data.workflowData.nodes, connections: data.workflowData.connections, active: data.workflowData.active, - nodeTypes, + nodeTypes: this.nodeTypes, staticData: data.workflowData.staticData, settings: workflowSettings, pinData, @@ -312,7 +291,7 @@ export class WorkflowRunner { { executionId }, ); let workflowExecution: PCancelable; - await Container.get(ExecutionRepository).updateStatus(executionId, 'running'); + await this.executionRepository.updateStatus(executionId, 'running'); try { additionalData.hooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain( @@ -322,7 +301,7 @@ export class WorkflowRunner { ); try { - await Container.get(PermissionChecker).check(workflow, data.userId); + await this.permissionChecker.check(workflow, data.userId); } catch (error) { ErrorReporter.error(error); // Create a failed execution with the data for the node @@ -439,7 +418,7 @@ export class WorkflowRunner { return executionId; } - async enqueueExecution( + private async enqueueExecution( data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, realtime?: boolean, @@ -604,13 +583,10 @@ export class WorkflowRunner { ); } - const fullExecutionData = await Container.get(ExecutionRepository).findSingleExecution( - executionId, - { - includeData: executionHasPostExecutionPromises, - unflattenData: executionHasPostExecutionPromises, - }, - ); + const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, { + includeData: executionHasPostExecutionPromises, + unflattenData: executionHasPostExecutionPromises, + }); if (!fullExecutionData) { return reject(new Error(`Could not find execution with id "${executionId}"`)); } @@ -651,13 +627,8 @@ export class WorkflowRunner { return executionId; } - /** - * Run the workflow - * - * @param {boolean} [loadStaticData] If set will the static data be loaded from - * the workflow and added to input data - */ - async runSubprocess( + /** Run the workflow in a child-process */ + private async runSubprocess( data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, restartExecutionId?: string, @@ -669,7 +640,7 @@ export class WorkflowRunner { if (loadStaticData === true && workflowId) { data.workflowData.staticData = - await Container.get(WorkflowStaticDataService).getStaticDataById(workflowId); + await this.workflowStaticDataService.getStaticDataById(workflowId); } data.restartExecutionId = restartExecutionId; @@ -678,7 +649,7 @@ export class WorkflowRunner { const executionId = await this.activeExecutions.add(data, subprocess, restartExecutionId); (data as unknown as IWorkflowExecutionDataProcessWithExecution).executionId = executionId; - await Container.get(ExecutionRepository).updateStatus(executionId, 'running'); + await this.executionRepository.updateStatus(executionId, 'running'); const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId); diff --git a/packages/cli/src/commands/execute.ts b/packages/cli/src/commands/execute.ts index 329349297f..a5c60a3e20 100644 --- a/packages/cli/src/commands/execute.ts +++ b/packages/cli/src/commands/execute.ts @@ -106,8 +106,7 @@ export class Execute extends BaseCommand { userId: user.id, }; - const workflowRunner = new WorkflowRunner(); - const executionId = await workflowRunner.run(runData); + const executionId = await Container.get(WorkflowRunner).run(runData); const activeExecutions = Container.get(ActiveExecutions); const data = await activeExecutions.getPostExecutePromise(executionId); diff --git a/packages/cli/src/commands/executeBatch.ts b/packages/cli/src/commands/executeBatch.ts index 565eb555bd..cc50809622 100644 --- a/packages/cli/src/commands/executeBatch.ts +++ b/packages/cli/src/commands/executeBatch.ts @@ -644,8 +644,7 @@ export class ExecuteBatch extends BaseCommand { userId: ExecuteBatch.instanceOwner.id, }; - const workflowRunner = new WorkflowRunner(); - const executionId = await workflowRunner.run(runData); + const executionId = await Container.get(WorkflowRunner).run(runData); const activeExecutions = Container.get(ActiveExecutions); const data = await activeExecutions.getPostExecutePromise(executionId); diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index 8f92538fa4..174b7ff446 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -76,6 +76,7 @@ export class ExecutionService { private readonly executionRepository: ExecutionRepository, private readonly workflowRepository: WorkflowRepository, private readonly nodeTypes: NodeTypes, + private readonly workflowRunner: WorkflowRunner, ) {} async findMany(req: ExecutionRequest.GetMany, sharedWorkflowIds: string[]) { @@ -276,8 +277,7 @@ export class ExecutionService { } } - const workflowRunner = new WorkflowRunner(); - const retriedExecutionId = await workflowRunner.run(data); + const retriedExecutionId = await this.workflowRunner.run(data); const executionData = await this.activeExecutions.getPostExecutePromise(retriedExecutionId); diff --git a/packages/cli/src/workflows/workflowExecution.service.ts b/packages/cli/src/workflows/workflowExecution.service.ts index 8ee41955a6..913a3bbc44 100644 --- a/packages/cli/src/workflows/workflowExecution.service.ts +++ b/packages/cli/src/workflows/workflowExecution.service.ts @@ -1,5 +1,15 @@ import { Service } from 'typedi'; -import type { IExecuteData, INode, IPinData, IRunExecutionData } from 'n8n-workflow'; +import type { + IDeferredPromise, + IExecuteData, + IExecuteResponsePromiseData, + INode, + INodeExecutionData, + IPinData, + IRunExecutionData, + IWorkflowExecuteAdditionalData, + WorkflowExecuteMode, +} from 'n8n-workflow'; import { SubworkflowOperationError, Workflow, @@ -34,8 +44,52 @@ export class WorkflowExecutionService { private readonly nodeTypes: NodeTypes, private readonly testWebhooks: TestWebhooks, private readonly permissionChecker: PermissionChecker, + private readonly workflowRunner: WorkflowRunner, ) {} + async runWorkflow( + workflowData: IWorkflowDb, + node: INode, + data: INodeExecutionData[][], + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, + responsePromise?: IDeferredPromise, + ) { + const nodeExecutionStack: IExecuteData[] = [ + { + node, + data: { + main: data, + }, + source: null, + }, + ]; + + const executionData: IRunExecutionData = { + startData: {}, + resultData: { + runData: {}, + }, + executionData: { + contextData: {}, + metadata: {}, + nodeExecutionStack, + waitingExecution: {}, + waitingExecutionSource: {}, + }, + }; + + // Start the workflow + const runData: IWorkflowExecutionDataProcess = { + userId: additionalData.userId, + executionMode: mode, + executionData, + workflowData, + }; + + return await this.workflowRunner.run(runData, true, undefined, undefined, responsePromise); + } + async executeManually( { workflowData, @@ -92,8 +146,7 @@ export class WorkflowExecutionService { data.startNodes = [pinnedTrigger.name]; } - const workflowRunner = new WorkflowRunner(); - const executionId = await workflowRunner.run(data); + const executionId = await this.workflowRunner.run(data); return { executionId, @@ -230,8 +283,7 @@ export class WorkflowExecutionService { userId: runningUser.id, }; - const workflowRunner = new WorkflowRunner(); - await workflowRunner.run(runData); + await this.workflowRunner.run(runData); } catch (error) { ErrorReporter.error(error); this.logger.error( diff --git a/packages/cli/test/integration/ActiveWorkflowRunner.test.ts b/packages/cli/test/integration/ActiveWorkflowRunner.test.ts index 4e6138fd96..4088e00d11 100644 --- a/packages/cli/test/integration/ActiveWorkflowRunner.test.ts +++ b/packages/cli/test/integration/ActiveWorkflowRunner.test.ts @@ -1,5 +1,4 @@ import { Container } from 'typedi'; - import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow'; import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow'; @@ -12,20 +11,20 @@ import { SecretsHelper } from '@/SecretsHelpers'; import { WebhookService } from '@/services/webhook.service'; import * as WebhookHelpers from '@/WebhookHelpers'; import * as AdditionalData from '@/WorkflowExecuteAdditionalData'; -import { WorkflowRunner } from '@/WorkflowRunner'; import type { User } from '@db/entities/User'; import type { WebhookEntity } from '@db/entities/WebhookEntity'; import { NodeTypes } from '@/NodeTypes'; -import { chooseRandomly } from './shared/random'; import { OrchestrationService } from '@/services/orchestration.service'; +import { ExecutionService } from '@/executions/execution.service'; +import { WorkflowService } from '@/workflows/workflow.service'; +import { ActiveWorkflowsService } from '@/services/activeWorkflows.service'; + import { mockInstance } from '../shared/mocking'; +import { chooseRandomly } from './shared/random'; import { setSchedulerAsLoadedNode } from './shared/utils'; import * as testDb from './shared/testDb'; import { createOwner } from './shared/db/users'; import { createWorkflow } from './shared/db/workflows'; -import { ExecutionService } from '@/executions/execution.service'; -import { WorkflowService } from '@/workflows/workflow.service'; -import { ActiveWorkflowsService } from '@/services/activeWorkflows.service'; mockInstance(ActiveExecutions); mockInstance(Push); @@ -182,26 +181,6 @@ describe('isActive()', () => { }); }); -describe('runWorkflow()', () => { - test('should call `WorkflowRunner.run()`', async () => { - const workflow = await createWorkflow({ active: true }, owner); - - await activeWorkflowRunner.init(); - - const additionalData = await AdditionalData.getBase('fake-user-id'); - - const runSpy = jest - .spyOn(WorkflowRunner.prototype, 'run') - .mockResolvedValue('fake-execution-id'); - - const [node] = workflow.nodes; - - await activeWorkflowRunner.runWorkflow(workflow, node, [[]], additionalData, 'trigger'); - - expect(runSpy).toHaveBeenCalledTimes(1); - }); -}); - describe('executeErrorWorkflow()', () => { test('should call `WorkflowExecuteAdditionalData.executeErrorWorkflow()`', async () => { const workflow = await createWorkflow({ active: true }, owner); diff --git a/packages/cli/test/integration/community-packages.api.test.ts b/packages/cli/test/integration/community-packages.api.test.ts index 1eb184a88c..03fd43f456 100644 --- a/packages/cli/test/integration/community-packages.api.test.ts +++ b/packages/cli/test/integration/community-packages.api.test.ts @@ -4,7 +4,6 @@ import type { SuperAgentTest } from 'supertest'; import type { InstalledPackages } from '@db/entities/InstalledPackages'; import type { InstalledNodes } from '@db/entities/InstalledNodes'; import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; -import { Push } from '@/push'; import { CommunityPackagesService } from '@/services/communityPackages.service'; import { mockInstance } from '../shared/mocking'; @@ -16,7 +15,6 @@ const communityPackagesService = mockInstance(CommunityPackagesService, { hasMissingPackages: false, }); mockInstance(LoadNodesAndCredentials); -mockInstance(Push); const testServer = setupTestServer({ endpointGroups: ['community-packages'] }); diff --git a/packages/cli/test/integration/executions.controller.test.ts b/packages/cli/test/integration/executions.controller.test.ts index 711b0716b5..02f8c3f25f 100644 --- a/packages/cli/test/integration/executions.controller.test.ts +++ b/packages/cli/test/integration/executions.controller.test.ts @@ -1,16 +1,17 @@ import type { User } from '@db/entities/User'; -import { Push } from '@/push'; +import { EnterpriseExecutionsService } from '@/executions/execution.service.ee'; +import { WaitTracker } from '@/WaitTracker'; + import { createSuccessfulExecution, getAllExecutions } from './shared/db/executions'; import { createOwner } from './shared/db/users'; import { createWorkflow } from './shared/db/workflows'; import * as testDb from './shared/testDb'; import { setupTestServer } from './shared/utils'; import { mockInstance } from '../shared/mocking'; -import { EnterpriseExecutionsService } from '@/executions/execution.service.ee'; mockInstance(EnterpriseExecutionsService); +mockInstance(WaitTracker); -mockInstance(Push); let testServer = setupTestServer({ endpointGroups: ['executions'] }); let owner: User; diff --git a/packages/cli/test/integration/publicApi/workflows.test.ts b/packages/cli/test/integration/publicApi/workflows.test.ts index 53aa198f09..305844453a 100644 --- a/packages/cli/test/integration/publicApi/workflows.test.ts +++ b/packages/cli/test/integration/publicApi/workflows.test.ts @@ -7,7 +7,6 @@ import type { User } from '@db/entities/User'; import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository'; import { WorkflowHistoryRepository } from '@db/repositories/workflowHistory.repository'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; -import { Push } from '@/push'; import { ExecutionService } from '@/executions/execution.service'; import { randomApiKey } from '../shared/random'; @@ -27,7 +26,6 @@ let workflowRunner: ActiveWorkflowRunner; const testServer = utils.setupTestServer({ endpointGroups: ['publicApi'] }); const license = testServer.license; -mockInstance(Push); mockInstance(ExecutionService); beforeAll(async () => { diff --git a/packages/cli/test/integration/shared/utils/index.ts b/packages/cli/test/integration/shared/utils/index.ts index 711f98a296..584f6e0b2f 100644 --- a/packages/cli/test/integration/shared/utils/index.ts +++ b/packages/cli/test/integration/shared/utils/index.ts @@ -11,14 +11,15 @@ import { v4 as uuid } from 'uuid'; import config from '@/config'; import { WorkflowEntity } from '@db/entities/WorkflowEntity'; -import { AUTH_COOKIE_NAME } from '@/constants'; - -import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; import { SettingsRepository } from '@db/repositories/settings.repository'; -import { mockNodeTypesData } from '../../../unit/Helpers'; -import { OrchestrationService } from '@/services/orchestration.service'; -import { mockInstance } from '../../../shared/mocking'; +import { AUTH_COOKIE_NAME } from '@/constants'; import { ExecutionService } from '@/executions/execution.service'; +import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; +import { Push } from '@/push'; +import { OrchestrationService } from '@/services/orchestration.service'; + +import { mockNodeTypesData } from '../../../unit/Helpers'; +import { mockInstance } from '../../../shared/mocking'; export { setupTestServer } from './testServer'; @@ -30,6 +31,7 @@ export { setupTestServer } from './testServer'; * Initialize node types. */ export async function initActiveWorkflowRunner() { + mockInstance(Push); mockInstance(OrchestrationService); mockInstance(ExecutionService); diff --git a/packages/cli/test/integration/shared/utils/testServer.ts b/packages/cli/test/integration/shared/utils/testServer.ts index aea2602bdd..dfa22f173a 100644 --- a/packages/cli/test/integration/shared/utils/testServer.ts +++ b/packages/cli/test/integration/shared/utils/testServer.ts @@ -12,6 +12,7 @@ import { issueJWT } from '@/auth/jwt'; import { registerController } from '@/decorators'; import { rawBodyReader, bodyParser, setupAuthMiddlewares } from '@/middlewares'; import { PostHogClient } from '@/posthog'; +import { Push } from '@/push'; import { License } from '@/License'; import { Logger } from '@/Logger'; import { InternalHooks } from '@/InternalHooks'; @@ -78,6 +79,7 @@ export const setupTestServer = ({ mockInstance(Logger); mockInstance(InternalHooks); mockInstance(PostHogClient); + mockInstance(Push); const testServer: TestServer = { app, diff --git a/packages/cli/test/integration/workflows/workflows.controller.ee.test.ts b/packages/cli/test/integration/workflows/workflows.controller.ee.test.ts index 222ef6ebf7..e24973e013 100644 --- a/packages/cli/test/integration/workflows/workflows.controller.ee.test.ts +++ b/packages/cli/test/integration/workflows/workflows.controller.ee.test.ts @@ -6,7 +6,6 @@ import type { INode } from 'n8n-workflow'; import type { User } from '@db/entities/User'; import { WorkflowHistoryRepository } from '@db/repositories/workflowHistory.repository'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; -import { Push } from '@/push'; import { WorkflowSharingService } from '@/workflows/workflowSharing.service'; import { mockInstance } from '../../shared/mocking'; @@ -30,7 +29,6 @@ let authAnotherMemberAgent: SuperAgentTest; let saveCredential: SaveCredentialFunction; const activeWorkflowRunner = mockInstance(ActiveWorkflowRunner); -mockInstance(Push); const sharingSpy = jest.spyOn(License.prototype, 'isSharingEnabled').mockReturnValue(true); const testServer = utils.setupTestServer({ diff --git a/packages/cli/test/integration/workflows/workflows.controller.test.ts b/packages/cli/test/integration/workflows/workflows.controller.test.ts index b02c351b4d..d493dd9d48 100644 --- a/packages/cli/test/integration/workflows/workflows.controller.test.ts +++ b/packages/cli/test/integration/workflows/workflows.controller.test.ts @@ -9,7 +9,6 @@ import type { WorkflowEntity } from '@db/entities/WorkflowEntity'; import type { ListQuery } from '@/requests'; import { WorkflowHistoryRepository } from '@db/repositories/workflowHistory.repository'; import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; -import { Push } from '@/push'; import { EnterpriseWorkflowService } from '@/workflows/workflow.service.ee'; import { mockInstance } from '../../shared/mocking'; @@ -34,7 +33,6 @@ const license = testServer.license; const { objectContaining, arrayContaining, any } = expect; const activeWorkflowRunnerLike = mockInstance(ActiveWorkflowRunner); -mockInstance(Push); beforeAll(async () => { owner = await createOwner(); diff --git a/packages/cli/test/unit/ActiveExecutions.test.ts b/packages/cli/test/unit/ActiveExecutions.test.ts index 6ac1438b4b..f30fd3778a 100644 --- a/packages/cli/test/unit/ActiveExecutions.test.ts +++ b/packages/cli/test/unit/ActiveExecutions.test.ts @@ -1,11 +1,10 @@ import { ActiveExecutions } from '@/ActiveExecutions'; import PCancelable from 'p-cancelable'; import { v4 as uuid } from 'uuid'; -import { Container } from 'typedi'; import type { IExecuteResponsePromiseData, IRun } from 'n8n-workflow'; import { createDeferredPromise } from 'n8n-workflow'; import type { IWorkflowExecutionDataProcess } from '@/Interfaces'; -import { ExecutionRepository } from '@db/repositories/execution.repository'; +import type { ExecutionRepository } from '@db/repositories/execution.repository'; import { mock } from 'jest-mock-extended'; const FAKE_EXECUTION_ID = '15'; @@ -14,7 +13,7 @@ const FAKE_SECOND_EXECUTION_ID = '20'; const updateExistingExecution = jest.fn(); const createNewExecution = jest.fn(async () => FAKE_EXECUTION_ID); -Container.set(ExecutionRepository, { +const executionRepository = mock({ updateExistingExecution, createNewExecution, }); @@ -23,7 +22,7 @@ describe('ActiveExecutions', () => { let activeExecutions: ActiveExecutions; beforeEach(() => { - activeExecutions = new ActiveExecutions(mock()); + activeExecutions = new ActiveExecutions(mock(), executionRepository); }); afterEach(() => { diff --git a/packages/cli/test/unit/WaitTracker.test.ts b/packages/cli/test/unit/WaitTracker.test.ts index f1f8f306dd..4bf43bb940 100644 --- a/packages/cli/test/unit/WaitTracker.test.ts +++ b/packages/cli/test/unit/WaitTracker.test.ts @@ -21,7 +21,7 @@ describe('WaitTracker', () => { it('should query DB for waiting executions', async () => { executionRepository.getWaitingExecutions.mockResolvedValue([execution]); - new WaitTracker(mock(), executionRepository, mock()); + new WaitTracker(mock(), executionRepository, mock(), mock()); expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1); }); @@ -29,7 +29,7 @@ describe('WaitTracker', () => { it('if no executions to start, should do nothing', () => { executionRepository.getWaitingExecutions.mockResolvedValue([]); - new WaitTracker(mock(), executionRepository, mock()); + new WaitTracker(mock(), executionRepository, mock(), mock()); expect(executionRepository.findSingleExecution).not.toHaveBeenCalled(); }); @@ -37,7 +37,7 @@ describe('WaitTracker', () => { describe('if execution to start', () => { it('if not enough time passed, should not start execution', async () => { executionRepository.getWaitingExecutions.mockResolvedValue([execution]); - const waitTracker = new WaitTracker(mock(), executionRepository, mock()); + const waitTracker = new WaitTracker(mock(), executionRepository, mock(), mock()); executionRepository.getWaitingExecutions.mockResolvedValue([execution]); await waitTracker.getWaitingExecutions(); @@ -51,7 +51,7 @@ describe('WaitTracker', () => { it('if enough time passed, should start execution', async () => { executionRepository.getWaitingExecutions.mockResolvedValue([]); - const waitTracker = new WaitTracker(mock(), executionRepository, mock()); + const waitTracker = new WaitTracker(mock(), executionRepository, mock(), mock()); executionRepository.getWaitingExecutions.mockResolvedValue([execution]); await waitTracker.getWaitingExecutions(); @@ -68,7 +68,7 @@ describe('WaitTracker', () => { describe('startExecution()', () => { it('should query for execution to start', async () => { executionRepository.getWaitingExecutions.mockResolvedValue([]); - const waitTracker = new WaitTracker(mock(), executionRepository, mock()); + const waitTracker = new WaitTracker(mock(), executionRepository, mock(), mock()); executionRepository.findSingleExecution.mockResolvedValue(execution); waitTracker.startExecution(execution.id); diff --git a/packages/cli/test/unit/WorkflowRunner.test.ts b/packages/cli/test/unit/WorkflowRunner.test.ts index 0317793459..72f3da9fd2 100644 --- a/packages/cli/test/unit/WorkflowRunner.test.ts +++ b/packages/cli/test/unit/WorkflowRunner.test.ts @@ -1,11 +1,9 @@ +import Container from 'typedi'; +import { WorkflowHooks, type ExecutionError, type IWorkflowExecuteHooks } from 'n8n-workflow'; import type { User } from '@db/entities/User'; import { WorkflowRunner } from '@/WorkflowRunner'; -import { WorkflowHooks, type ExecutionError, type IWorkflowExecuteHooks } from 'n8n-workflow'; -import { Push } from '@/push'; -import Container from 'typedi'; import config from '@/config'; -import { mockInstance } from '../shared/mocking'; import * as testDb from '../integration/shared/testDb'; import { setupTestServer } from '../integration/shared/utils'; import { createUser } from '../integration/shared/db/users'; @@ -26,10 +24,7 @@ const watchedWorkflowExecuteAfter = jest.spyOn(watchers, 'workflowExecuteAfter') beforeAll(async () => { owner = await createUser({ role: 'global:owner' }); - mockInstance(Push); - Container.set(Push, new Push()); - - runner = new WorkflowRunner(); + runner = Container.get(WorkflowRunner); hookFunctions = { workflowExecuteAfter: [watchers.workflowExecuteAfter], diff --git a/packages/cli/test/unit/workflow-execution.service.test.ts b/packages/cli/test/unit/workflow-execution.service.test.ts index fd2848a46e..567f6bf6e7 100644 --- a/packages/cli/test/unit/workflow-execution.service.test.ts +++ b/packages/cli/test/unit/workflow-execution.service.test.ts @@ -1,8 +1,11 @@ import type { INode } from 'n8n-workflow'; -import { WorkflowExecutionService } from '@/workflows/workflowExecution.service'; -import type { IWorkflowDb } from '@/Interfaces'; import { mock } from 'jest-mock-extended'; +import type { WorkflowEntity } from '@db/entities/WorkflowEntity'; +import type { IWorkflowDb } from '@/Interfaces'; +import { WorkflowExecutionService } from '@/workflows/workflowExecution.service'; +import type { WorkflowRunner } from '@/WorkflowRunner'; + const webhookNode: INode = { name: 'Webhook', type: 'n8n-nodes-base.webhook', @@ -47,17 +50,28 @@ const hackerNewsNode: INode = { }; describe('WorkflowExecutionService', () => { - let workflowExecutionService: WorkflowExecutionService; + const workflowRunner = mock(); + const workflowExecutionService = new WorkflowExecutionService( + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + workflowRunner, + ); - beforeAll(() => { - workflowExecutionService = new WorkflowExecutionService( - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - ); + describe('runWorkflow()', () => { + test('should call `WorkflowRunner.run()`', async () => { + const node = mock(); + const workflow = mock({ active: true, nodes: [node] }); + + workflowRunner.run.mockResolvedValue('fake-execution-id'); + + await workflowExecutionService.runWorkflow(workflow, node, [[]], mock(), 'trigger'); + + expect(workflowRunner.run).toHaveBeenCalledTimes(1); + }); }); describe('selectPinnedActivatorStarter()', () => {