diff --git a/packages/cli/src/__tests__/active-executions.test.ts b/packages/cli/src/__tests__/active-executions.test.ts index 6b0f8fb454..a8c7b0f18e 100644 --- a/packages/cli/src/__tests__/active-executions.test.ts +++ b/packages/cli/src/__tests__/active-executions.test.ts @@ -99,7 +99,7 @@ describe('ActiveExecutions', () => { const executionId = await activeExecutions.add(newExecution); // ACT - activeExecutions.finishExecution(executionId); + activeExecutions.finalizeExecution(executionId); // Wait until the next tick to ensure that the post-execution promise has settled await new Promise(setImmediate); @@ -117,7 +117,7 @@ describe('ActiveExecutions', () => { setTimeout(res, 100); }); const fakeOutput = mockFullRunData(); - activeExecutions.finishExecution(executionId, fakeOutput); + activeExecutions.finalizeExecution(executionId, fakeOutput); await expect(postExecutePromise).resolves.toEqual(fakeOutput); }); diff --git a/packages/cli/src/active-executions.ts b/packages/cli/src/active-executions.ts index e289e4dee5..515043e905 100644 --- a/packages/cli/src/active-executions.ts +++ b/packages/cli/src/active-executions.ts @@ -5,17 +5,13 @@ import type { ExecutionStatus, IWorkflowExecutionDataProcess, } from 'n8n-workflow'; -import { - ApplicationError, - createDeferredPromise, - ExecutionCancelledError, - sleep, -} from 'n8n-workflow'; +import { createDeferredPromise, ExecutionCancelledError, sleep } from 'n8n-workflow'; import { strict as assert } from 'node:assert'; import type PCancelable from 'p-cancelable'; import { Service } from 'typedi'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { ExecutionNotFoundError } from '@/errors/execution-not-found-error'; import type { ExecutionPayload, IExecutingWorkflowData, @@ -107,14 +103,16 @@ export class ActiveExecutions { // Automatically remove execution once the postExecutePromise settles void postExecutePromise.promise .catch((error) => { - // rethrow the error unless it's ExecutionCancelledError - if (!(error instanceof ExecutionCancelledError)) throw error; + if (error instanceof ExecutionCancelledError) return; + throw error; }) .finally(() => { this.concurrencyControl.release({ mode: executionData.executionMode }); delete this.activeExecutions[executionId]; }); + this.logger.debug('Execution added', { executionId }); + return executionId; } @@ -138,17 +136,19 @@ export class ActiveExecutions { execution?.responsePromise?.resolve(response); } - /** Forces an execution to stop */ + /** Cancel the execution promise and reject its post-execution promise. */ stopExecution(executionId: string): void { const execution = this.getExecution(executionId); execution.workflowExecution?.cancel(); execution.postExecutePromise.reject(new ExecutionCancelledError(executionId)); + this.logger.debug('Execution cancelled', { executionId }); } - /** Mark an execution as completed */ - finishExecution(executionId: string, fullRunData?: IRun) { + /** Resolve the post-execution promise in an execution. */ + finalizeExecution(executionId: string, fullRunData?: IRun) { const execution = this.getExecution(executionId); execution.postExecutePromise.resolve(fullRunData); + this.logger.debug('Execution finalized', { executionId }); } /** @@ -221,7 +221,7 @@ export class ActiveExecutions { private getExecution(executionId: string): IExecutingWorkflowData { const execution = this.activeExecutions[executionId]; if (!execution) { - throw new ApplicationError('No active execution found', { extra: { executionId } }); + throw new ExecutionNotFoundError(executionId); } return execution; } diff --git a/packages/cli/src/errors/execution-not-found-error.ts b/packages/cli/src/errors/execution-not-found-error.ts new file mode 100644 index 0000000000..45e0f02033 --- /dev/null +++ b/packages/cli/src/errors/execution-not-found-error.ts @@ -0,0 +1,7 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class ExecutionNotFoundError extends ApplicationError { + constructor(executionId: string) { + super('No active execution found', { extra: { executionId } }); + } +} diff --git a/packages/cli/src/interfaces.ts b/packages/cli/src/interfaces.ts index 09b2a9e7c4..629925e8dc 100644 --- a/packages/cli/src/interfaces.ts +++ b/packages/cli/src/interfaces.ts @@ -193,6 +193,7 @@ export interface IExecutionsCurrentSummary { export interface IExecutingWorkflowData { executionData: IWorkflowExecutionDataProcess; startedAt: Date; + /** This promise rejects when the execution is stopped. When the execution finishes (successfully or not), the promise resolves. */ postExecutePromise: IDeferredPromise; responsePromise?: IDeferredPromise; workflowExecution?: PCancelable; diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 7319e7cea5..4d3bd7a223 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -879,8 +879,7 @@ async function executeWorkflow( fullExecutionData.workflowId = workflowData.id; } - // remove execution from active executions - activeExecutions.finishExecution(executionId, fullRunData); + activeExecutions.finalizeExecution(executionId, fullRunData); await executionRepository.updateExistingExecution(executionId, fullExecutionData); throw objectToError( @@ -906,11 +905,11 @@ async function executeWorkflow( if (data.finished === true || data.status === 'waiting') { // Workflow did finish successfully - activeExecutions.finishExecution(executionId, data); + activeExecutions.finalizeExecution(executionId, data); const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); return returnData!.data!.main; } - activeExecutions.finishExecution(executionId, data); + activeExecutions.finalizeExecution(executionId, data); // Workflow did fail const { error } = data.data.resultData; diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 980c8f57e6..a4dd344b62 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -101,7 +101,7 @@ export class WorkflowRunner { // Remove from active execution with empty data. That will // set the execution to failed. - this.activeExecutions.finishExecution(executionId, fullRunData); + this.activeExecutions.finalizeExecution(executionId, fullRunData); if (hooks) { await hooks.executeHookFunctions('workflowExecuteAfter', [fullRunData]); @@ -131,7 +131,7 @@ export class WorkflowRunner { await workflowHooks.executeHookFunctions('workflowExecuteBefore', []); await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]); responsePromise?.reject(error); - this.activeExecutions.finishExecution(executionId); + this.activeExecutions.finalizeExecution(executionId); return executionId; } @@ -335,7 +335,7 @@ export class WorkflowRunner { fullRunData.finished = false; } fullRunData.status = this.activeExecutions.getStatus(executionId); - this.activeExecutions.finishExecution(executionId, fullRunData); + this.activeExecutions.finalizeExecution(executionId, fullRunData); }) .catch( async (error) => @@ -521,10 +521,7 @@ export class WorkflowRunner { data: fullExecutionData.data, }; - // NOTE: due to the optimization of not loading the execution data from the db when no post execution promises are present, - // the execution data in runData.data MAY not be available here. - // This means that any function expecting with runData has to check if the runData.data defined from this point - this.activeExecutions.finishExecution(executionId, runData); + this.activeExecutions.finalizeExecution(executionId, runData); // Normally also static data should be supplied here but as it only used for sending // data to editor-UI is not needed.