fix(core): Add recoveryInProgress flag file (#6962)

Issue: during startup, unfinished executions trigger a recovery process
that, under certain circumstances, can in itself crash the instance
(e.g. by running our of memory), resulting in an infinite recovery loop

This PR aims to change this behaviour by writing a flag file when the
recovery process starts, and removing it when it finishes. In the case
of a crash, this flag will persist and upon the next attempt, the
recovery will instead do the absolute minimal (marking executions as
'crashed'), without attempting any 'crashable' actions.
This commit is contained in:
Michael Auerswald 2023-08-18 17:12:24 +02:00 committed by GitHub
parent 4fc69b776c
commit 7b96820218
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 93 additions and 7 deletions

View file

@ -27,6 +27,8 @@ import {
} from '../EventMessageClasses/EventMessageGeneric';
import { recoverExecutionDataFromEventLogMessages } from './recoverEvents';
import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
import Container from 'typedi';
import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories';
export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';
@ -93,6 +95,10 @@ export class MessageEventBus extends EventEmitter {
LoggerProxy.debug('Initializing event writer');
this.logWriter = await MessageEventBusLogWriter.getInstance();
if (!this.logWriter) {
LoggerProxy.warn('Could not initialize event writer');
}
// unsent event check:
// - find unsent messages in current event log(s)
// - cycle event logs and start the logging to a fresh file
@ -105,14 +111,47 @@ export class MessageEventBus extends EventEmitter {
this.logWriter?.startLogging();
await this.send(unsentAndUnfinished.unsentMessages);
if (Object.keys(unsentAndUnfinished.unfinishedExecutions).length > 0) {
for (const executionId of Object.keys(unsentAndUnfinished.unfinishedExecutions)) {
await recoverExecutionDataFromEventLogMessages(
executionId,
unsentAndUnfinished.unfinishedExecutions[executionId],
true,
);
const unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions);
if (unfinishedExecutionIds.length > 0) {
LoggerProxy.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`);
LoggerProxy.info('This could be due to a crash of an active workflow or a restart of n8n.');
const activeWorkflows = await Container.get(WorkflowRepository).find({
where: { active: true },
select: ['id', 'name'],
});
if (activeWorkflows.length > 0) {
LoggerProxy.info('Currently active workflows:');
for (const workflowData of activeWorkflows) {
LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`);
}
}
if (this.logWriter?.isRecoveryProcessRunning()) {
// if we end up here, it means that the previous recovery process did not finish
// a possible reason would be that recreating the workflow data itself caused e.g an OOM error
// in that case, we do not want to retry the recovery process, but rather mark the executions as crashed
LoggerProxy.warn('Skipping recover process since it previously failed.');
for (const executionId of unfinishedExecutionIds) {
LoggerProxy.info(`Setting status of execution ${executionId} to crashed`);
await Container.get(ExecutionRepository).updateExistingExecution(executionId, {
status: 'crashed',
stoppedAt: new Date(),
});
}
} else {
// start actual recovery process and write recovery process flag file
this.logWriter?.startRecoveryProcess();
for (const executionId of unfinishedExecutionIds) {
LoggerProxy.warn(`Attempting to recover execution ${executionId}`);
await recoverExecutionDataFromEventLogMessages(
executionId,
unsentAndUnfinished.unfinishedExecutions[executionId],
true,
);
}
}
// remove the recovery process flag file
this.logWriter?.endRecoveryProcess();
}
// if configured, run this test every n ms

View file

@ -98,6 +98,22 @@ export class MessageEventBusLogWriter {
}
}
startRecoveryProcess() {
if (this.worker) {
this.worker.postMessage({ command: 'startRecoveryProcess', data: {} });
}
}
isRecoveryProcessRunning(): boolean {
return existsSync(this.getRecoveryInProgressFileName());
}
endRecoveryProcess() {
if (this.worker) {
this.worker.postMessage({ command: 'endRecoveryProcess', data: {} });
}
}
private async startThread() {
if (this.worker) {
await this.close();
@ -240,6 +256,10 @@ export class MessageEventBusLogWriter {
}
}
getRecoveryInProgressFileName(): string {
return `${MessageEventBusLogWriter.options.logFullBasePath}.recoveryInProgress`;
}
cleanAllLogs() {
for (let i = 0; i <= MessageEventBusLogWriter.options.keepNumberOfFiles; i++) {
if (existsSync(this.getLogFileName(i))) {

View file

@ -25,6 +25,25 @@ function setKeepFiles(keepNumberOfFiles: number) {
keepFiles = keepNumberOfFiles;
}
function buildRecoveryInProgressFileName(): string {
return `${logFileBasePath}.recoveryInProgress`;
}
function startRecoveryProcess() {
if (existsSync(buildRecoveryInProgressFileName())) {
return false;
}
const fileHandle = openSync(buildRecoveryInProgressFileName(), 'a');
closeSync(fileHandle);
return true;
}
function endRecoveryProcess() {
if (existsSync(buildRecoveryInProgressFileName())) {
rmSync(buildRecoveryInProgressFileName());
}
}
function buildLogFileNameWithCounter(counter?: number): string {
if (counter) {
return `${logFileBasePath}-${counter}.log`;
@ -112,6 +131,14 @@ if (!isMainThread) {
cleanAllLogs();
parentPort?.postMessage('cleanedAllLogs');
break;
case 'startRecoveryProcess':
const recoveryStarted = startRecoveryProcess();
parentPort?.postMessage({ command, data: recoveryStarted });
break;
case 'endRecoveryProcess':
endRecoveryProcess();
parentPort?.postMessage({ command, data: true });
break;
default:
break;
}