diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 966cf2b7ba..a0ca581d04 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -1124,6 +1124,12 @@ export const schema = { env: 'N8N_EVENTBUS_LOGWRITER_LOGBASENAME', }, }, + crashRecoveryMode: { + doc: 'Should n8n try to recover execution details after a crash, or just mark pending executions as crashed', + format: ['simple', 'extensive'] as const, + default: 'extensive', + env: 'N8N_EVENTBUS_RECOVERY_MODE', + }, }, redis: { diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 07215d4701..aee597c2cd 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -216,6 +216,16 @@ export class ExecutionRepository extends Repository { return newExecution; } + async markAsCrashed(executionIds: string[]) { + await this.update( + { id: In(executionIds) }, + { + status: 'crashed', + stoppedAt: new Date(), + }, + ); + } + async updateExistingExecution(executionId: string, execution: Partial) { // Se isolate startedAt because it must be set when the execution starts and should never change. // So we prevent updating it, if it's sent (it usually is and causes problems to executions that diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 60da7fb29b..c6d14bf9a3 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -126,18 +126,15 @@ export class MessageEventBus extends EventEmitter { LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`); } } - if (this.logWriter?.isRecoveryProcessRunning()) { + + const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning(); + if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') { + await Container.get(ExecutionRepository).markAsCrashed(unfinishedExecutionIds); // if we end up here, it means that the previous recovery process did not finish // a possible reason would be that recreating the workflow data itself caused e.g an OOM error // in that case, we do not want to retry the recovery process, but rather mark the executions as crashed - LoggerProxy.warn('Skipping recover process since it previously failed.'); - for (const executionId of unfinishedExecutionIds) { - LoggerProxy.info(`Setting status of execution ${executionId} to crashed`); - await Container.get(ExecutionRepository).updateExistingExecution(executionId, { - status: 'crashed', - stoppedAt: new Date(), - }); - } + if (recoveryAlreadyAttempted) + LoggerProxy.warn('Skipped recovery process since it previously failed.'); } else { // start actual recovery process and write recovery process flag file this.logWriter?.startRecoveryProcess();