From 7b96820218449958180d4c34bebdc4c4de9172e1 Mon Sep 17 00:00:00 2001 From: Michael Auerswald Date: Fri, 18 Aug 2023 17:12:24 +0200 Subject: [PATCH] fix(core): Add recoveryInProgress flag file (#6962) Issue: during startup, unfinished executions trigger a recovery process that, under certain circumstances, can in itself crash the instance (e.g. by running our of memory), resulting in an infinite recovery loop This PR aims to change this behaviour by writing a flag file when the recovery process starts, and removing it when it finishes. In the case of a crash, this flag will persist and upon the next attempt, the recovery will instead do the absolute minimal (marking executions as 'crashed'), without attempting any 'crashable' actions. --- .../MessageEventBus/MessageEventBus.ts | 53 ++++++++++++++++--- .../MessageEventBusLogWriter.ts | 20 +++++++ .../MessageEventBusLogWriterWorker.ts | 27 ++++++++++ 3 files changed, 93 insertions(+), 7 deletions(-) diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 09be1d79cf..60da7fb29b 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -27,6 +27,8 @@ import { } from '../EventMessageClasses/EventMessageGeneric'; import { recoverExecutionDataFromEventLogMessages } from './recoverEvents'; import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee'; +import Container from 'typedi'; +import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories'; export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished'; @@ -93,6 +95,10 @@ export class MessageEventBus extends EventEmitter { LoggerProxy.debug('Initializing event writer'); this.logWriter = await MessageEventBusLogWriter.getInstance(); + if (!this.logWriter) { + LoggerProxy.warn('Could not initialize event writer'); + } + // unsent event check: // - find unsent messages in current event log(s) // - cycle event logs and start the logging to a fresh file @@ -105,14 +111,47 @@ export class MessageEventBus extends EventEmitter { this.logWriter?.startLogging(); await this.send(unsentAndUnfinished.unsentMessages); - if (Object.keys(unsentAndUnfinished.unfinishedExecutions).length > 0) { - for (const executionId of Object.keys(unsentAndUnfinished.unfinishedExecutions)) { - await recoverExecutionDataFromEventLogMessages( - executionId, - unsentAndUnfinished.unfinishedExecutions[executionId], - true, - ); + const unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions); + + if (unfinishedExecutionIds.length > 0) { + LoggerProxy.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`); + LoggerProxy.info('This could be due to a crash of an active workflow or a restart of n8n.'); + const activeWorkflows = await Container.get(WorkflowRepository).find({ + where: { active: true }, + select: ['id', 'name'], + }); + if (activeWorkflows.length > 0) { + LoggerProxy.info('Currently active workflows:'); + for (const workflowData of activeWorkflows) { + LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`); + } } + if (this.logWriter?.isRecoveryProcessRunning()) { + // if we end up here, it means that the previous recovery process did not finish + // a possible reason would be that recreating the workflow data itself caused e.g an OOM error + // in that case, we do not want to retry the recovery process, but rather mark the executions as crashed + LoggerProxy.warn('Skipping recover process since it previously failed.'); + for (const executionId of unfinishedExecutionIds) { + LoggerProxy.info(`Setting status of execution ${executionId} to crashed`); + await Container.get(ExecutionRepository).updateExistingExecution(executionId, { + status: 'crashed', + stoppedAt: new Date(), + }); + } + } else { + // start actual recovery process and write recovery process flag file + this.logWriter?.startRecoveryProcess(); + for (const executionId of unfinishedExecutionIds) { + LoggerProxy.warn(`Attempting to recover execution ${executionId}`); + await recoverExecutionDataFromEventLogMessages( + executionId, + unsentAndUnfinished.unfinishedExecutions[executionId], + true, + ); + } + } + // remove the recovery process flag file + this.logWriter?.endRecoveryProcess(); } // if configured, run this test every n ms diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts index 80837609a9..470e832c61 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriter.ts @@ -98,6 +98,22 @@ export class MessageEventBusLogWriter { } } + startRecoveryProcess() { + if (this.worker) { + this.worker.postMessage({ command: 'startRecoveryProcess', data: {} }); + } + } + + isRecoveryProcessRunning(): boolean { + return existsSync(this.getRecoveryInProgressFileName()); + } + + endRecoveryProcess() { + if (this.worker) { + this.worker.postMessage({ command: 'endRecoveryProcess', data: {} }); + } + } + private async startThread() { if (this.worker) { await this.close(); @@ -240,6 +256,10 @@ export class MessageEventBusLogWriter { } } + getRecoveryInProgressFileName(): string { + return `${MessageEventBusLogWriter.options.logFullBasePath}.recoveryInProgress`; + } + cleanAllLogs() { for (let i = 0; i <= MessageEventBusLogWriter.options.keepNumberOfFiles; i++) { if (existsSync(this.getLogFileName(i))) { diff --git a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts index 06d95a3bb2..5e2d771186 100644 --- a/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts +++ b/packages/cli/src/eventbus/MessageEventBusWriter/MessageEventBusLogWriterWorker.ts @@ -25,6 +25,25 @@ function setKeepFiles(keepNumberOfFiles: number) { keepFiles = keepNumberOfFiles; } +function buildRecoveryInProgressFileName(): string { + return `${logFileBasePath}.recoveryInProgress`; +} + +function startRecoveryProcess() { + if (existsSync(buildRecoveryInProgressFileName())) { + return false; + } + const fileHandle = openSync(buildRecoveryInProgressFileName(), 'a'); + closeSync(fileHandle); + return true; +} + +function endRecoveryProcess() { + if (existsSync(buildRecoveryInProgressFileName())) { + rmSync(buildRecoveryInProgressFileName()); + } +} + function buildLogFileNameWithCounter(counter?: number): string { if (counter) { return `${logFileBasePath}-${counter}.log`; @@ -112,6 +131,14 @@ if (!isMainThread) { cleanAllLogs(); parentPort?.postMessage('cleanedAllLogs'); break; + case 'startRecoveryProcess': + const recoveryStarted = startRecoveryProcess(); + parentPort?.postMessage({ command, data: recoveryStarted }); + break; + case 'endRecoveryProcess': + endRecoveryProcess(); + parentPort?.postMessage({ command, data: true }); + break; default: break; }