fix(core): Handle max stalled count error better (#12824)

This commit is contained in:
Iván Ovejero 2025-01-27 13:44:20 +01:00 committed by GitHub
parent 648c6f9315
commit eabf160957
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 17 additions and 9 deletions

View file

@ -5,9 +5,12 @@ import { ApplicationError } from 'n8n-workflow';
*/ */
export class MaxStalledCountError extends ApplicationError { export class MaxStalledCountError extends ApplicationError {
constructor(cause: Error) { constructor(cause: Error) {
super('The execution has reached the maximum number of attempts and will no longer retry.', { super(
level: 'warning', 'This execution failed to be processed too many times and will no longer retry. To allow this execution to complete, please break down your workflow or scale up your workers or adjust your worker settings.',
cause, {
}); level: 'warning',
cause,
},
);
} }
} }

View file

@ -17,7 +17,6 @@ import config from '@/config';
import { HIGHEST_SHUTDOWN_PRIORITY, Time } from '@/constants'; import { HIGHEST_SHUTDOWN_PRIORITY, Time } from '@/constants';
import { ExecutionRepository } from '@/databases/repositories/execution.repository'; import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import { OnShutdown } from '@/decorators/on-shutdown'; import { OnShutdown } from '@/decorators/on-shutdown';
import { MaxStalledCountError } from '@/errors/max-stalled-count.error';
import { EventService } from '@/events/event.service'; import { EventService } from '@/events/event.service';
import { OrchestrationService } from '@/services/orchestration.service'; import { OrchestrationService } from '@/services/orchestration.service';
import { assertNever } from '@/utils'; import { assertNever } from '@/utils';
@ -271,10 +270,6 @@ export class ScalingService {
this.queue.on('error', (error: Error) => { this.queue.on('error', (error: Error) => {
if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy if ('code' in error && error.code === 'ECONNREFUSED') return; // handled by RedisClientService.retryStrategy
if (error.message.includes('job stalled more than maxStalledCount')) {
throw new MaxStalledCountError(error);
}
/** /**
* Non-recoverable error on worker start with Redis unavailable. * Non-recoverable error on worker start with Redis unavailable.
* Even if Redis recovers, worker will remain unable to process jobs. * Even if Redis recovers, worker will remain unable to process jobs.

View file

@ -37,6 +37,8 @@ import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-da
import { generateFailedExecutionFromError } from '@/workflow-helpers'; import { generateFailedExecutionFromError } from '@/workflow-helpers';
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
import { MaxStalledCountError } from './errors/max-stalled-count.error';
@Service() @Service()
export class WorkflowRunner { export class WorkflowRunner {
private scalingService: ScalingService; private scalingService: ScalingService;
@ -416,6 +418,13 @@ export class WorkflowRunner {
try { try {
await job.finished(); await job.finished();
} catch (error) { } catch (error) {
if (
error instanceof Error &&
error.message.includes('job stalled more than maxStalledCount')
) {
error = new MaxStalledCountError(error);
}
// We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the // We use "getWorkflowHooksWorkerExecuter" as "getWorkflowHooksWorkerMain" does not contain the
// "workflowExecuteAfter" which we require. // "workflowExecuteAfter" which we require.
const hooks = getWorkflowHooksWorkerExecuter( const hooks = getWorkflowHooksWorkerExecuter(
@ -424,6 +433,7 @@ export class WorkflowRunner {
data.workflowData, data.workflowData,
{ retryOf: data.retryOf ? data.retryOf.toString() : undefined }, { retryOf: data.retryOf ? data.retryOf.toString() : undefined },
); );
await this.processError(error, new Date(), data.executionMode, executionId, hooks); await this.processError(error, new Date(), data.executionMode, executionId, hooks);
reject(error); reject(error);