mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
refactor(core): Add an option to use simple recovery process by default (#7097)
This commit is contained in:
parent
ee36f2d20b
commit
cffda65b33
|
@ -1124,6 +1124,12 @@ export const schema = {
|
||||||
env: 'N8N_EVENTBUS_LOGWRITER_LOGBASENAME',
|
env: 'N8N_EVENTBUS_LOGWRITER_LOGBASENAME',
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
crashRecoveryMode: {
|
||||||
|
doc: 'Should n8n try to recover execution details after a crash, or just mark pending executions as crashed',
|
||||||
|
format: ['simple', 'extensive'] as const,
|
||||||
|
default: 'extensive',
|
||||||
|
env: 'N8N_EVENTBUS_RECOVERY_MODE',
|
||||||
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
redis: {
|
redis: {
|
||||||
|
|
|
@ -216,6 +216,16 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
|
||||||
return newExecution;
|
return newExecution;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async markAsCrashed(executionIds: string[]) {
|
||||||
|
await this.update(
|
||||||
|
{ id: In(executionIds) },
|
||||||
|
{
|
||||||
|
status: 'crashed',
|
||||||
|
stoppedAt: new Date(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
async updateExistingExecution(executionId: string, execution: Partial<IExecutionResponse>) {
|
async updateExistingExecution(executionId: string, execution: Partial<IExecutionResponse>) {
|
||||||
// Se isolate startedAt because it must be set when the execution starts and should never change.
|
// Se isolate startedAt because it must be set when the execution starts and should never change.
|
||||||
// So we prevent updating it, if it's sent (it usually is and causes problems to executions that
|
// So we prevent updating it, if it's sent (it usually is and causes problems to executions that
|
||||||
|
|
|
@ -126,18 +126,15 @@ export class MessageEventBus extends EventEmitter {
|
||||||
LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`);
|
LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (this.logWriter?.isRecoveryProcessRunning()) {
|
|
||||||
|
const recoveryAlreadyAttempted = this.logWriter?.isRecoveryProcessRunning();
|
||||||
|
if (recoveryAlreadyAttempted || config.getEnv('eventBus.crashRecoveryMode') === 'simple') {
|
||||||
|
await Container.get(ExecutionRepository).markAsCrashed(unfinishedExecutionIds);
|
||||||
// if we end up here, it means that the previous recovery process did not finish
|
// if we end up here, it means that the previous recovery process did not finish
|
||||||
// a possible reason would be that recreating the workflow data itself caused e.g an OOM error
|
// a possible reason would be that recreating the workflow data itself caused e.g an OOM error
|
||||||
// in that case, we do not want to retry the recovery process, but rather mark the executions as crashed
|
// in that case, we do not want to retry the recovery process, but rather mark the executions as crashed
|
||||||
LoggerProxy.warn('Skipping recover process since it previously failed.');
|
if (recoveryAlreadyAttempted)
|
||||||
for (const executionId of unfinishedExecutionIds) {
|
LoggerProxy.warn('Skipped recovery process since it previously failed.');
|
||||||
LoggerProxy.info(`Setting status of execution ${executionId} to crashed`);
|
|
||||||
await Container.get(ExecutionRepository).updateExistingExecution(executionId, {
|
|
||||||
status: 'crashed',
|
|
||||||
stoppedAt: new Date(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// start actual recovery process and write recovery process flag file
|
// start actual recovery process and write recovery process flag file
|
||||||
this.logWriter?.startRecoveryProcess();
|
this.logWriter?.startRecoveryProcess();
|
||||||
|
|
Loading…
Reference in a new issue