From c152a3ac56f140a39eea4771a94f5a3082118df7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 29 Oct 2024 08:51:55 +0100 Subject: [PATCH] fix(core): Ensure job processor does not reprocess amended executions (#11438) --- .../__tests__/job-processor.service.test.ts | 21 +++++++++++++++++++ packages/cli/src/scaling/job-processor.ts | 7 +++++++ packages/cli/src/workflow-runner.ts | 12 ----------- 3 files changed, 28 insertions(+), 12 deletions(-) create mode 100644 packages/cli/src/scaling/__tests__/job-processor.service.test.ts diff --git a/packages/cli/src/scaling/__tests__/job-processor.service.test.ts b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts new file mode 100644 index 0000000000..6a3fa5caa4 --- /dev/null +++ b/packages/cli/src/scaling/__tests__/job-processor.service.test.ts @@ -0,0 +1,21 @@ +import { mock } from 'jest-mock-extended'; + +import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import type { IExecutionResponse } from '@/interfaces'; + +import { JobProcessor } from '../job-processor'; +import type { Job } from '../scaling.types'; + +describe('JobProcessor', () => { + it('should refrain from processing a crashed execution', async () => { + const executionRepository = mock(); + executionRepository.findSingleExecution.mockResolvedValue( + mock({ status: 'crashed' }), + ); + const jobProcessor = new JobProcessor(mock(), executionRepository, mock(), mock(), mock()); + + const result = await jobProcessor.processJob(mock()); + + expect(result).toEqual({ success: false }); + }); +}); diff --git a/packages/cli/src/scaling/job-processor.ts b/packages/cli/src/scaling/job-processor.ts index 9a531d3039..6bf2524304 100644 --- a/packages/cli/src/scaling/job-processor.ts +++ b/packages/cli/src/scaling/job-processor.ts @@ -58,6 +58,13 @@ export class JobProcessor { ); } + /** + * Bull's implicit retry mechanism and n8n's execution recovery mechanism may + * cause a crashed execution to be enqueued. We refrain from processing it, + * until we have reworked both mechanisms to prevent this scenario. + */ + if (execution.status === 'crashed') return { success: false }; + const workflowId = execution.workflowData.id; this.logger.info(`Worker started execution ${executionId} (job ${job.id})`, { diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 4dd5e08714..02e0b94afd 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -14,7 +14,6 @@ import type { IWorkflowExecutionDataProcess, } from 'n8n-workflow'; import { - ApplicationError, ErrorReporterProxy as ErrorReporter, ExecutionCancelledError, Workflow, @@ -381,17 +380,6 @@ export class WorkflowRunner { let job: Job; let hooks: WorkflowHooks; try { - // check to help diagnose PAY-2100 - if ( - data.executionData?.executionData?.nodeExecutionStack?.length === 0 && - config.getEnv('deployment.type') === 'internal' - ) { - await this.executionRepository.setRunning(executionId); // set `startedAt` so we display it correctly in UI - throw new ApplicationError('Execution to enqueue has empty node execution stack', { - extra: { executionData: data.executionData }, - }); - } - job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 }); hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain(