From e26b406665e20761279c4e315d04501350427de5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 30 Dec 2024 13:17:55 +0100 Subject: [PATCH] fix(core): Fix execution cancellation issues in scaling mode (#12343) --- .../executions/__tests__/execution.service.test.ts | 8 ++++---- packages/cli/src/executions/execution.service.ts | 14 +------------- packages/cli/src/workflow-runner.ts | 2 +- packages/core/src/error-reporter.ts | 3 ++- 4 files changed, 8 insertions(+), 19 deletions(-) diff --git a/packages/cli/src/executions/__tests__/execution.service.test.ts b/packages/cli/src/executions/__tests__/execution.service.test.ts index b91836578c..101773a0f7 100644 --- a/packages/cli/src/executions/__tests__/execution.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution.service.test.ts @@ -242,8 +242,8 @@ describe('ExecutionService', () => { */ expect(waitTracker.stopExecution).not.toHaveBeenCalled(); expect(activeExecutions.stopExecution).toHaveBeenCalled(); - expect(scalingService.findJobsByStatus).toHaveBeenCalled(); - expect(scalingService.stopJob).toHaveBeenCalled(); + expect(scalingService.findJobsByStatus).not.toHaveBeenCalled(); + expect(scalingService.stopJob).not.toHaveBeenCalled(); expect(executionRepository.stopDuringRun).toHaveBeenCalled(); }); @@ -268,8 +268,8 @@ describe('ExecutionService', () => { * Assert */ expect(waitTracker.stopExecution).toHaveBeenCalledWith(execution.id); - expect(scalingService.findJobsByStatus).toHaveBeenCalled(); - expect(scalingService.stopJob).toHaveBeenCalled(); + expect(scalingService.findJobsByStatus).not.toHaveBeenCalled(); + expect(scalingService.stopJob).not.toHaveBeenCalled(); expect(executionRepository.stopDuringRun).toHaveBeenCalled(); }); }); diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index ffddb27164..86338da924 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -16,7 +16,7 @@ import { Workflow, WorkflowOperationError, } from 'n8n-workflow'; -import { Container, Service } from 'typedi'; +import { Service } from 'typedi'; import { ActiveExecutions } from '@/active-executions'; import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; @@ -477,18 +477,6 @@ export class ExecutionService { this.waitTracker.stopExecution(execution.id); } - const { ScalingService } = await import('@/scaling/scaling.service'); - const scalingService = Container.get(ScalingService); - const jobs = await scalingService.findJobsByStatus(['active', 'waiting']); - - const job = jobs.find(({ data }) => data.executionId === execution.id); - - if (job) { - await scalingService.stopJob(job); - } else { - this.logger.debug('Job to stop not in queue', { executionId: execution.id }); - } - return await this.executionRepository.stopDuringRun(execution); } diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 30cd50d6f0..bb27fca216 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -66,7 +66,7 @@ export class WorkflowRunner { // // FIXME: This is a quick fix. The proper fix would be to not remove // the execution from the active executions while it's still running. - if (error instanceof ExecutionNotFoundError) { + if (error instanceof ExecutionNotFoundError || error instanceof ExecutionCancelledError) { return; } diff --git a/packages/core/src/error-reporter.ts b/packages/core/src/error-reporter.ts index b52fd6d2f6..cd20dc9f9a 100644 --- a/packages/core/src/error-reporter.ts +++ b/packages/core/src/error-reporter.ts @@ -2,7 +2,7 @@ import type { NodeOptions } from '@sentry/node'; import { close } from '@sentry/node'; import type { ErrorEvent, EventHint } from '@sentry/types'; import { AxiosError } from 'axios'; -import { ApplicationError, type ReportingOptions } from 'n8n-workflow'; +import { ApplicationError, ExecutionCancelledError, type ReportingOptions } from 'n8n-workflow'; import { createHash } from 'node:crypto'; import { Service } from 'typedi'; @@ -143,6 +143,7 @@ export class ErrorReporter { } error(e: unknown, options?: ReportingOptions) { + if (e instanceof ExecutionCancelledError) return; const toReport = this.wrap(e); if (toReport) this.report(toReport, options); }