From 36b314d0311ef84f275efbc20997c6a77db81b31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 16 Jul 2024 19:25:20 +0200 Subject: [PATCH] fix(core): Stopping an execution should reject any response promises (#9992) --- packages/cli/src/ActiveExecutions.ts | 40 +++++++++++++------ packages/cli/src/WaitTracker.ts | 2 +- packages/cli/src/WorkflowRunner.ts | 5 ++- .../repositories/execution.repository.ts | 9 +++-- .../cli/src/executions/execution.service.ts | 8 ++-- .../execution.service.integration.test.ts | 1 + packages/workflow/src/Interfaces.ts | 2 + .../src/errors/execution-cancelled.error.ts | 10 +++++ packages/workflow/src/errors/index.ts | 1 + 9 files changed, 55 insertions(+), 23 deletions(-) create mode 100644 packages/workflow/src/errors/execution-cancelled.error.ts diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index 03949e7ea5..97313d5cb2 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -6,7 +6,12 @@ import type { IRun, ExecutionStatus, } from 'n8n-workflow'; -import { ApplicationError, createDeferredPromise, sleep } from 'n8n-workflow'; +import { + ApplicationError, + createDeferredPromise, + ExecutionCancelledError, + sleep, +} from 'n8n-workflow'; import type { ExecutionPayload, @@ -138,16 +143,13 @@ export class ActiveExecutions { promise.resolve(fullRunData); } - // Remove from the list of active executions - delete this.activeExecutions[executionId]; - - this.concurrencyControl.release({ mode: execution.executionData.executionMode }); + this.postExecuteCleanup(executionId); } /** * Forces an execution to stop */ - async stopExecution(executionId: string): Promise { + stopExecution(executionId: string): void { const execution = this.activeExecutions[executionId]; if (execution === undefined) { // There is no execution running with that id @@ -156,7 +158,25 @@ export class ActiveExecutions { execution.workflowExecution!.cancel(); - return await this.getPostExecutePromise(executionId); + // Reject all the waiting promises + const reason = new ExecutionCancelledError(executionId); + for (const promise of execution.postExecutePromises) { + promise.reject(reason); + } + + this.postExecuteCleanup(executionId); + } + + private postExecuteCleanup(executionId: string) { + const execution = this.activeExecutions[executionId]; + if (execution === undefined) { + return; + } + + // Remove from the list of active executions + delete this.activeExecutions[executionId]; + + this.concurrencyControl.release({ mode: execution.executionData.executionMode }); } /** @@ -215,11 +235,7 @@ export class ActiveExecutions { await this.concurrencyControl.removeAll(this.activeExecutions); } - const stopPromises = executionIds.map( - async (executionId) => await this.stopExecution(executionId), - ); - - await Promise.allSettled(stopPromises); + executionIds.forEach((executionId) => this.stopExecution(executionId)); } let count = 0; diff --git a/packages/cli/src/WaitTracker.ts b/packages/cli/src/WaitTracker.ts index 36fb4f9900..5050d80ba9 100644 --- a/packages/cli/src/WaitTracker.ts +++ b/packages/cli/src/WaitTracker.ts @@ -86,7 +86,7 @@ export class WaitTracker { } } - async stopExecution(executionId: string) { + stopExecution(executionId: string) { if (!this.waitingExecutions[executionId]) return; clearTimeout(this.waitingExecutions[executionId].timer); diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index fd92d1b8ac..402da960f0 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -16,8 +16,8 @@ import type { } from 'n8n-workflow'; import { ErrorReporterProxy as ErrorReporter, + ExecutionCancelledError, Workflow, - WorkflowOperationError, } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; @@ -188,6 +188,7 @@ export class WorkflowRunner { } }) .catch((error) => { + if (error instanceof ExecutionCancelledError) return; ErrorReporter.error(error); this.logger.error( 'There was a problem running internal hook "onWorkflowPostExecute"', @@ -426,7 +427,7 @@ export class WorkflowRunner { { retryOf: data.retryOf ? data.retryOf.toString() : undefined }, ); - const error = new WorkflowOperationError('Workflow-Execution has been canceled!'); + const error = new ExecutionCancelledError(executionId); await this.processError(error, new Date(), data.executionMode, executionId, hooksWorker); reject(error); diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 7722d4a819..1ebb22d8eb 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -20,14 +20,16 @@ import type { SelectQueryBuilder, } from '@n8n/typeorm'; import { parse, stringify } from 'flatted'; +import { GlobalConfig } from '@n8n/config'; import { ApplicationError, - WorkflowOperationError, type ExecutionStatus, type ExecutionSummary, type IRunExecutionData, } from 'n8n-workflow'; import { BinaryDataService } from 'n8n-core'; +import { ExecutionCancelledError, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow'; + import type { ExecutionPayload, IExecutionBase, @@ -43,9 +45,7 @@ import { ExecutionDataRepository } from './executionData.repository'; import { Logger } from '@/Logger'; import type { ExecutionSummaries } from '@/executions/execution.types'; import { PostgresLiveRowsRetrievalError } from '@/errors/postgres-live-rows-retrieval.error'; -import { GlobalConfig } from '@n8n/config'; import { separate } from '@/utils'; -import { ErrorReporterProxy as ErrorReporter } from 'n8n-workflow'; export interface IGetExecutionsQueryFilter { id?: FindOperator | string; @@ -641,8 +641,9 @@ export class ExecutionRepository extends Repository { } async stopDuringRun(execution: IExecutionResponse) { - const error = new WorkflowOperationError('Workflow-Execution has been canceled!'); + const error = new ExecutionCancelledError(execution.id); + execution.data ??= { resultData: { runData: {} } }; execution.data.resultData.error = { ...error, message: error.message, diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index 853dbb64ff..bb8650e99f 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -444,11 +444,11 @@ export class ExecutionService { } if (this.activeExecutions.has(execution.id)) { - await this.activeExecutions.stopExecution(execution.id); + this.activeExecutions.stopExecution(execution.id); } if (this.waitTracker.has(execution.id)) { - await this.waitTracker.stopExecution(execution.id); + this.waitTracker.stopExecution(execution.id); } return await this.executionRepository.stopDuringRun(execution); @@ -461,11 +461,11 @@ export class ExecutionService { } if (this.activeExecutions.has(execution.id)) { - await this.activeExecutions.stopExecution(execution.id); + this.activeExecutions.stopExecution(execution.id); } if (this.waitTracker.has(execution.id)) { - await this.waitTracker.stopExecution(execution.id); + this.waitTracker.stopExecution(execution.id); } const job = await this.queue.findRunningJobBy({ executionId: execution.id }); diff --git a/packages/cli/test/integration/execution.service.integration.test.ts b/packages/cli/test/integration/execution.service.integration.test.ts index 7b46655d23..e30d55602a 100644 --- a/packages/cli/test/integration/execution.service.integration.test.ts +++ b/packages/cli/test/integration/execution.service.integration.test.ts @@ -29,6 +29,7 @@ describe('ExecutionService', () => { mock(), mock(), mock(), + mock(), ); }); diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 68dd5bea06..5d3b573e63 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -19,6 +19,7 @@ import type { Workflow } from './Workflow'; import type { WorkflowActivationError } from './errors/workflow-activation.error'; import type { WorkflowOperationError } from './errors/workflow-operation.error'; import type { WorkflowHooks } from './WorkflowHooks'; +import type { ExecutionCancelledError } from './errors'; import type { NodeOperationError } from './errors/node-operation.error'; import type { NodeApiError } from './errors/node-api.error'; import type { AxiosProxyConfig } from 'axios'; @@ -80,6 +81,7 @@ export type ExecutionError = | ExpressionError | WorkflowActivationError | WorkflowOperationError + | ExecutionCancelledError | NodeOperationError | NodeApiError; diff --git a/packages/workflow/src/errors/execution-cancelled.error.ts b/packages/workflow/src/errors/execution-cancelled.error.ts new file mode 100644 index 0000000000..bc625e0f3a --- /dev/null +++ b/packages/workflow/src/errors/execution-cancelled.error.ts @@ -0,0 +1,10 @@ +import { ExecutionBaseError } from './abstract/execution-base.error'; + +export class ExecutionCancelledError extends ExecutionBaseError { + constructor(executionId: string) { + super('The execution was cancelled', { + level: 'warning', + extra: { executionId }, + }); + } +} diff --git a/packages/workflow/src/errors/index.ts b/packages/workflow/src/errors/index.ts index b23367e91d..b48fecb3bc 100644 --- a/packages/workflow/src/errors/index.ts +++ b/packages/workflow/src/errors/index.ts @@ -1,6 +1,7 @@ export { ApplicationError } from './application.error'; export { ExpressionError } from './expression.error'; export { CredentialAccessError } from './credential-access-error'; +export { ExecutionCancelledError } from './execution-cancelled.error'; export { NodeApiError } from './node-api.error'; export { NodeOperationError } from './node-operation.error'; export { NodeSslError } from './node-ssl.error';