mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 12:57:29 -08:00
fix(core): Ensure job processor does not reprocess amended executions (#11438)
This commit is contained in:
parent
ad292350b3
commit
c152a3ac56
|
@ -0,0 +1,21 @@
|
||||||
|
import { mock } from 'jest-mock-extended';
|
||||||
|
|
||||||
|
import type { ExecutionRepository } from '@/databases/repositories/execution.repository';
|
||||||
|
import type { IExecutionResponse } from '@/interfaces';
|
||||||
|
|
||||||
|
import { JobProcessor } from '../job-processor';
|
||||||
|
import type { Job } from '../scaling.types';
|
||||||
|
|
||||||
|
describe('JobProcessor', () => {
|
||||||
|
it('should refrain from processing a crashed execution', async () => {
|
||||||
|
const executionRepository = mock<ExecutionRepository>();
|
||||||
|
executionRepository.findSingleExecution.mockResolvedValue(
|
||||||
|
mock<IExecutionResponse>({ status: 'crashed' }),
|
||||||
|
);
|
||||||
|
const jobProcessor = new JobProcessor(mock(), executionRepository, mock(), mock(), mock());
|
||||||
|
|
||||||
|
const result = await jobProcessor.processJob(mock<Job>());
|
||||||
|
|
||||||
|
expect(result).toEqual({ success: false });
|
||||||
|
});
|
||||||
|
});
|
|
@ -58,6 +58,13 @@ export class JobProcessor {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bull's implicit retry mechanism and n8n's execution recovery mechanism may
|
||||||
|
* cause a crashed execution to be enqueued. We refrain from processing it,
|
||||||
|
* until we have reworked both mechanisms to prevent this scenario.
|
||||||
|
*/
|
||||||
|
if (execution.status === 'crashed') return { success: false };
|
||||||
|
|
||||||
const workflowId = execution.workflowData.id;
|
const workflowId = execution.workflowData.id;
|
||||||
|
|
||||||
this.logger.info(`Worker started execution ${executionId} (job ${job.id})`, {
|
this.logger.info(`Worker started execution ${executionId} (job ${job.id})`, {
|
||||||
|
|
|
@ -14,7 +14,6 @@ import type {
|
||||||
IWorkflowExecutionDataProcess,
|
IWorkflowExecutionDataProcess,
|
||||||
} from 'n8n-workflow';
|
} from 'n8n-workflow';
|
||||||
import {
|
import {
|
||||||
ApplicationError,
|
|
||||||
ErrorReporterProxy as ErrorReporter,
|
ErrorReporterProxy as ErrorReporter,
|
||||||
ExecutionCancelledError,
|
ExecutionCancelledError,
|
||||||
Workflow,
|
Workflow,
|
||||||
|
@ -381,17 +380,6 @@ export class WorkflowRunner {
|
||||||
let job: Job;
|
let job: Job;
|
||||||
let hooks: WorkflowHooks;
|
let hooks: WorkflowHooks;
|
||||||
try {
|
try {
|
||||||
// check to help diagnose PAY-2100
|
|
||||||
if (
|
|
||||||
data.executionData?.executionData?.nodeExecutionStack?.length === 0 &&
|
|
||||||
config.getEnv('deployment.type') === 'internal'
|
|
||||||
) {
|
|
||||||
await this.executionRepository.setRunning(executionId); // set `startedAt` so we display it correctly in UI
|
|
||||||
throw new ApplicationError('Execution to enqueue has empty node execution stack', {
|
|
||||||
extra: { executionData: data.executionData },
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 });
|
job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 });
|
||||||
|
|
||||||
hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain(
|
hooks = WorkflowExecuteAdditionalData.getWorkflowHooksWorkerMain(
|
||||||
|
|
Loading…
Reference in a new issue