From 67fb6d6fdd08d1905a9e17c14a653999afd64fc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Mon, 23 Sep 2024 12:08:57 +0200 Subject: [PATCH] refactor(core): Refactor execution post-execute promises (no-changelog) (#10809) Co-authored-by: Danny Martini --- .../src/__tests__/active-executions.test.ts | 13 ++- .../cli/src/__tests__/workflow-runner.test.ts | 5 ++ packages/cli/src/active-executions.ts | 86 +++++++------------ .../src/errors/execution-not-found-error.ts | 7 ++ packages/cli/src/interfaces.ts | 3 +- .../src/workflow-execute-additional-data.ts | 7 +- packages/cli/src/workflow-runner.ts | 38 ++------ 7 files changed, 65 insertions(+), 94 deletions(-) create mode 100644 packages/cli/src/errors/execution-not-found-error.ts diff --git a/packages/cli/src/__tests__/active-executions.test.ts b/packages/cli/src/__tests__/active-executions.test.ts index cd30aeb015..a8c7b0f18e 100644 --- a/packages/cli/src/__tests__/active-executions.test.ts +++ b/packages/cli/src/__tests__/active-executions.test.ts @@ -94,10 +94,17 @@ describe('ActiveExecutions', () => { }); test('Should remove an existing execution', async () => { + // ARRANGE const newExecution = mockExecutionData(); const executionId = await activeExecutions.add(newExecution); - activeExecutions.remove(executionId); + // ACT + activeExecutions.finalizeExecution(executionId); + + // Wait until the next tick to ensure that the post-execution promise has settled + await new Promise(setImmediate); + + // ASSERT expect(activeExecutions.getActiveExecutions().length).toBe(0); }); @@ -110,7 +117,7 @@ describe('ActiveExecutions', () => { setTimeout(res, 100); }); const fakeOutput = mockFullRunData(); - activeExecutions.remove(executionId, fakeOutput); + activeExecutions.finalizeExecution(executionId, fakeOutput); await expect(postExecutePromise).resolves.toEqual(fakeOutput); }); @@ -126,7 +133,7 @@ describe('ActiveExecutions', () => { const cancellablePromise = mockCancelablePromise(); cancellablePromise.cancel = cancelExecution; activeExecutions.attachWorkflowExecution(executionId, cancellablePromise); - void activeExecutions.stopExecution(executionId); + activeExecutions.stopExecution(executionId); expect(cancelExecution).toHaveBeenCalledTimes(1); }); diff --git a/packages/cli/src/__tests__/workflow-runner.test.ts b/packages/cli/src/__tests__/workflow-runner.test.ts index 9a3fcd67b4..f828880c74 100644 --- a/packages/cli/src/__tests__/workflow-runner.test.ts +++ b/packages/cli/src/__tests__/workflow-runner.test.ts @@ -1,6 +1,7 @@ import { WorkflowHooks, type ExecutionError, type IWorkflowExecuteHooks } from 'n8n-workflow'; import Container from 'typedi'; +import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; import type { User } from '@/databases/entities/user'; import { Telemetry } from '@/telemetry'; @@ -72,6 +73,10 @@ test('processError should process error', async () => { }, workflow, ); + await Container.get(ActiveExecutions).add( + { executionMode: 'webhook', workflowData: workflow }, + execution.id, + ); config.set('executions.mode', 'regular'); await runner.processError( new Error('test') as ExecutionError, diff --git a/packages/cli/src/active-executions.ts b/packages/cli/src/active-executions.ts index 2e8b42edc5..8f7661925b 100644 --- a/packages/cli/src/active-executions.ts +++ b/packages/cli/src/active-executions.ts @@ -5,17 +5,13 @@ import type { ExecutionStatus, IWorkflowExecutionDataProcess, } from 'n8n-workflow'; -import { - ApplicationError, - createDeferredPromise, - ExecutionCancelledError, - sleep, -} from 'n8n-workflow'; +import { createDeferredPromise, ExecutionCancelledError, sleep } from 'n8n-workflow'; import { strict as assert } from 'node:assert'; import type PCancelable from 'p-cancelable'; import { Service } from 'typedi'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import { ExecutionNotFoundError } from '@/errors/execution-not-found-error'; import type { ExecutionPayload, IExecutingWorkflowData, @@ -95,13 +91,29 @@ export class ActiveExecutions { await this.executionRepository.updateExistingExecution(executionId, execution); } + const postExecutePromise = createDeferredPromise(); + this.activeExecutions[executionId] = { executionData, startedAt: new Date(), - postExecutePromises: [], + postExecutePromise, status: executionStatus, }; + // Automatically remove execution once the postExecutePromise settles + void postExecutePromise.promise + .catch((error) => { + if (error instanceof ExecutionCancelledError) return; + throw error; + }) + .finally(() => { + this.concurrencyControl.release({ mode: executionData.executionMode }); + delete this.activeExecutions[executionId]; + this.logger.debug('Execution removed', { executionId }); + }); + + this.logger.debug('Execution added', { executionId }); + return executionId; } @@ -125,68 +137,30 @@ export class ActiveExecutions { execution?.responsePromise?.resolve(response); } - getPostExecutePromiseCount(executionId: string): number { - return this.activeExecutions[executionId]?.postExecutePromises.length ?? 0; - } - - /** - * Remove an active execution - */ - remove(executionId: string, fullRunData?: IRun): void { - const execution = this.activeExecutions[executionId]; - if (execution === undefined) { - return; - } - - // Resolve all the waiting promises - for (const promise of execution.postExecutePromises) { - promise.resolve(fullRunData); - } - - this.postExecuteCleanup(executionId); - } - - /** - * Forces an execution to stop - */ + /** Cancel the execution promise and reject its post-execution promise. */ stopExecution(executionId: string): void { const execution = this.activeExecutions[executionId]; if (execution === undefined) { // There is no execution running with that id return; } - - execution.workflowExecution!.cancel(); - - // Reject all the waiting promises - const reason = new ExecutionCancelledError(executionId); - for (const promise of execution.postExecutePromises) { - promise.reject(reason); - } - - this.postExecuteCleanup(executionId); + execution.workflowExecution?.cancel(); + execution.postExecutePromise.reject(new ExecutionCancelledError(executionId)); + this.logger.debug('Execution cancelled', { executionId }); } - private postExecuteCleanup(executionId: string) { - const execution = this.activeExecutions[executionId]; - if (execution === undefined) { - return; - } - - // Remove from the list of active executions - delete this.activeExecutions[executionId]; - - this.concurrencyControl.release({ mode: execution.executionData.executionMode }); + /** Resolve the post-execution promise in an execution. */ + finalizeExecution(executionId: string, fullRunData?: IRun) { + const execution = this.getExecution(executionId); + execution.postExecutePromise.resolve(fullRunData); + this.logger.debug('Execution finalized', { executionId }); } /** * Returns a promise which will resolve with the data of the execution with the given id */ async getPostExecutePromise(executionId: string): Promise { - // Create the promise which will be resolved when the execution finished - const waitPromise = createDeferredPromise(); - this.getExecution(executionId).postExecutePromises.push(waitPromise); - return await waitPromise.promise; + return await this.getExecution(executionId).postExecutePromise.promise; } /** @@ -252,7 +226,7 @@ export class ActiveExecutions { private getExecution(executionId: string): IExecutingWorkflowData { const execution = this.activeExecutions[executionId]; if (!execution) { - throw new ApplicationError('No active execution found', { extra: { executionId } }); + throw new ExecutionNotFoundError(executionId); } return execution; } diff --git a/packages/cli/src/errors/execution-not-found-error.ts b/packages/cli/src/errors/execution-not-found-error.ts new file mode 100644 index 0000000000..45e0f02033 --- /dev/null +++ b/packages/cli/src/errors/execution-not-found-error.ts @@ -0,0 +1,7 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class ExecutionNotFoundError extends ApplicationError { + constructor(executionId: string) { + super('No active execution found', { extra: { executionId } }); + } +} diff --git a/packages/cli/src/interfaces.ts b/packages/cli/src/interfaces.ts index 02da5a7c77..629925e8dc 100644 --- a/packages/cli/src/interfaces.ts +++ b/packages/cli/src/interfaces.ts @@ -193,7 +193,8 @@ export interface IExecutionsCurrentSummary { export interface IExecutingWorkflowData { executionData: IWorkflowExecutionDataProcess; startedAt: Date; - postExecutePromises: Array>; + /** This promise rejects when the execution is stopped. When the execution finishes (successfully or not), the promise resolves. */ + postExecutePromise: IDeferredPromise; responsePromise?: IDeferredPromise; workflowExecution?: PCancelable; status: ExecutionStatus; diff --git a/packages/cli/src/workflow-execute-additional-data.ts b/packages/cli/src/workflow-execute-additional-data.ts index 040a56b7a3..4d3bd7a223 100644 --- a/packages/cli/src/workflow-execute-additional-data.ts +++ b/packages/cli/src/workflow-execute-additional-data.ts @@ -879,8 +879,7 @@ async function executeWorkflow( fullExecutionData.workflowId = workflowData.id; } - // remove execution from active executions - activeExecutions.remove(executionId, fullRunData); + activeExecutions.finalizeExecution(executionId, fullRunData); await executionRepository.updateExistingExecution(executionId, fullExecutionData); throw objectToError( @@ -906,11 +905,11 @@ async function executeWorkflow( if (data.finished === true || data.status === 'waiting') { // Workflow did finish successfully - activeExecutions.remove(executionId, data); + activeExecutions.finalizeExecution(executionId, data); const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data); return returnData!.data!.main; } - activeExecutions.remove(executionId, data); + activeExecutions.finalizeExecution(executionId, data); // Workflow did fail const { error } = data.data.resultData; diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index c27baa5ba1..a4dd344b62 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -26,7 +26,6 @@ import { ActiveExecutions } from '@/active-executions'; import config from '@/config'; import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExternalHooks } from '@/external-hooks'; -import type { IExecutionResponse } from '@/interfaces'; import { Logger } from '@/logger'; import { NodeTypes } from '@/node-types'; import type { ScalingService } from '@/scaling/scaling.service'; @@ -102,7 +101,7 @@ export class WorkflowRunner { // Remove from active execution with empty data. That will // set the execution to failed. - this.activeExecutions.remove(executionId, fullRunData); + this.activeExecutions.finalizeExecution(executionId, fullRunData); if (hooks) { await hooks.executeHookFunctions('workflowExecuteAfter', [fullRunData]); @@ -132,7 +131,7 @@ export class WorkflowRunner { await workflowHooks.executeHookFunctions('workflowExecuteBefore', []); await workflowHooks.executeHookFunctions('workflowExecuteAfter', [runData]); responsePromise?.reject(error); - this.activeExecutions.remove(executionId); + this.activeExecutions.finalizeExecution(executionId); return executionId; } @@ -336,7 +335,7 @@ export class WorkflowRunner { fullRunData.finished = false; } fullRunData.status = this.activeExecutions.getStatus(executionId); - this.activeExecutions.remove(executionId, fullRunData); + this.activeExecutions.finalizeExecution(executionId, fullRunData); }) .catch( async (error) => @@ -505,45 +504,24 @@ export class WorkflowRunner { reject(error); } - // optimization: only pull and unflatten execution data from the Db when it is needed - const executionHasPostExecutionPromises = - this.activeExecutions.getPostExecutePromiseCount(executionId) > 0; - - if (executionHasPostExecutionPromises) { - this.logger.debug( - `Reading execution data for execution ${executionId} from db for PostExecutionPromise.`, - ); - } else { - this.logger.debug( - `Skipping execution data for execution ${executionId} since there are no PostExecutionPromise.`, - ); - } - const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, { - includeData: executionHasPostExecutionPromises, - unflattenData: executionHasPostExecutionPromises, + includeData: true, + unflattenData: true, }); if (!fullExecutionData) { return reject(new Error(`Could not find execution with id "${executionId}"`)); } const runData: IRun = { - data: {}, finished: fullExecutionData.finished, mode: fullExecutionData.mode, startedAt: fullExecutionData.startedAt, stoppedAt: fullExecutionData.stoppedAt, status: fullExecutionData.status, - } as IRun; + data: fullExecutionData.data, + }; - if (executionHasPostExecutionPromises) { - runData.data = (fullExecutionData as IExecutionResponse).data; - } - - // NOTE: due to the optimization of not loading the execution data from the db when no post execution promises are present, - // the execution data in runData.data MAY not be available here. - // This means that any function expecting with runData has to check if the runData.data defined from this point - this.activeExecutions.remove(executionId, runData); + this.activeExecutions.finalizeExecution(executionId, runData); // Normally also static data should be supplied here but as it only used for sending // data to editor-UI is not needed.