diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 44e9986f38..ab6a191c6c 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -260,7 +260,9 @@ export class ExecutionRepository extends Repository { return String(executionId); } - async markAsCrashed(executionIds: string[]) { + async markAsCrashed(executionIds: string | string[]) { + if (!Array.isArray(executionIds)) executionIds = [executionIds]; + await this.update( { id: In(executionIds) }, { @@ -268,6 +270,10 @@ export class ExecutionRepository extends Repository { stoppedAt: new Date(), }, ); + + this.logger.info('[Execution Recovery] Marked executions as `crashed`', { + executionIds, + }); } /** diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 6b44fe9ed1..81f94206f3 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -175,18 +175,8 @@ export class MessageEventBus extends EventEmitter { // start actual recovery process and write recovery process flag file this.logWriter?.startRecoveryProcess(); for (const executionId of unfinishedExecutionIds) { - this.logger.warn(`Attempting to recover execution ${executionId}`); - if (!unsentAndUnfinished.unfinishedExecutions[executionId]?.length) { - this.logger.debug( - `No event messages found, marking execution ${executionId} as 'crashed'`, - ); - await this.executionRepository.markAsCrashed([executionId]); - } else { - await this.recoveryService.recover( - executionId, - unsentAndUnfinished.unfinishedExecutions[executionId], - ); - } + const logMesssages = unsentAndUnfinished.unfinishedExecutions[executionId]; + await this.recoveryService.recoverFromLogs(executionId, logMesssages ?? []); } } // remove the recovery process flag file @@ -367,7 +357,7 @@ export class MessageEventBus extends EventEmitter { async getUnsentAndUnfinishedExecutions(): Promise<{ unsentMessages: EventMessageTypes[]; - unfinishedExecutions: Record; + unfinishedExecutions: Record; }> { const queryResult = await this.logWriter?.getUnsentAndUnfinishedExecutions(); return queryResult; diff --git a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts index c9d75ee6a3..e40f4e78ac 100644 --- a/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts +++ b/packages/cli/src/executions/__tests__/execution-recovery.service.test.ts @@ -174,17 +174,15 @@ export const setupMessages = (executionId: string, workflowName: string): EventM describe('ExecutionRecoveryService', () => { let executionRecoveryService: ExecutionRecoveryService; let push: Push; + let executionRepository: ExecutionRepository; beforeAll(async () => { await testDb.init(); mockInstance(InternalHooks); push = mockInstance(Push); - - executionRecoveryService = new ExecutionRecoveryService( - push, - Container.get(ExecutionRepository), - ); + executionRepository = Container.get(ExecutionRepository); + executionRecoveryService = new ExecutionRecoveryService(push, executionRepository); }); afterEach(async () => { @@ -195,226 +193,212 @@ describe('ExecutionRecoveryService', () => { await testDb.terminate(); }); - describe('recover', () => { - it('should amend, persist, run hooks, broadcast', async () => { - /** - * Arrange - */ - // @ts-expect-error Private method - const amendSpy = jest.spyOn(executionRecoveryService, 'amend'); - const executionRepository = Container.get(ExecutionRepository); - const dbUpdateSpy = jest.spyOn(executionRepository, 'update'); - // @ts-expect-error Private method - const runHooksSpy = jest.spyOn(executionRecoveryService, 'runHooks'); + describe('recoverFromLogs', () => { + describe('if no messages', () => { + test('should return `null` if no execution found', async () => { + /** + * Arrange + */ + const inexistentExecutionId = randomInteger(100).toString(); + const noMessages: EventMessage[] = []; - const workflow = await createWorkflow(OOM_WORKFLOW); + /** + * Act + */ + const amendedExecution = await executionRecoveryService.recoverFromLogs( + inexistentExecutionId, + noMessages, + ); - const execution = await createExecution( - { - status: 'running', - data: stringify(IN_PROGRESS_EXECUTION_DATA), - }, - workflow, - ); + /** + * Assert + */ + expect(amendedExecution).toBeNull(); + }); - const messages = setupMessages(execution.id, workflow.name); - - /** - * Act - */ - - await executionRecoveryService.recover(execution.id, messages); - - /** - * Assert - */ - - expect(amendSpy).toHaveBeenCalledTimes(1); - expect(amendSpy).toHaveBeenCalledWith(execution.id, messages); - expect(dbUpdateSpy).toHaveBeenCalledTimes(1); - expect(runHooksSpy).toHaveBeenCalledTimes(1); - expect(push.once).toHaveBeenCalledTimes(1); - }); - - test('should amend a truncated execution where last node did not finish', async () => { - /** - * Arrange - */ - - const workflow = await createWorkflow(OOM_WORKFLOW); - - const execution = await createExecution( - { - status: 'running', - data: stringify(IN_PROGRESS_EXECUTION_DATA), - }, - workflow, - ); - - const messages = setupMessages(execution.id, workflow.name); - - /** - * Act - */ - - const amendedExecution = await executionRecoveryService.recover(execution.id, messages); - - /** - * Assert - */ - - const startOfLastNodeRun = messages - .find((m) => m.eventName === 'n8n.node.started' && m.payload.nodeName === 'DebugHelper') - ?.ts.toJSDate(); - - expect(amendedExecution).toEqual( - expect.objectContaining({ - status: 'crashed', - stoppedAt: startOfLastNodeRun, - }), - ); - - const resultData = amendedExecution?.data.resultData; - - if (!resultData) fail('Expected `resultData` to be defined'); - - expect(resultData.error).toBeInstanceOf(WorkflowCrashedError); - expect(resultData.lastNodeExecuted).toBe('DebugHelper'); - - const runData = resultData.runData; - - if (!runData) fail('Expected `runData` to be defined'); - - const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0); - const debugHelperTaskData = runData.DebugHelper.at(0); - - expect(manualTriggerTaskData?.executionStatus).toBe('success'); - expect(manualTriggerTaskData?.error).toBeUndefined(); - expect(manualTriggerTaskData?.startTime).not.toBe(ARTIFICIAL_TASK_DATA); - - expect(debugHelperTaskData?.executionStatus).toBe('crashed'); - expect(debugHelperTaskData?.error).toBeInstanceOf(NodeCrashedError); - }); - - test('should amend a truncated execution where last node finished', async () => { - /** - * Arrange - */ - - const workflow = await createWorkflow(OOM_WORKFLOW); - - const execution = await createExecution( - { - status: 'running', - data: stringify(IN_PROGRESS_EXECUTION_DATA), - }, - workflow, - ); - - const messages = setupMessages(execution.id, workflow.name); - messages.push( - new EventMessageNode({ - eventName: 'n8n.node.finished', - payload: { - executionId: execution.id, - workflowName: workflow.name, - nodeName: 'DebugHelper', - nodeType: 'n8n-nodes-base.debugHelper', + test('should update `status` and `stoppedAt`', async () => { + /** + * Arrange + */ + const workflow = await createWorkflow(OOM_WORKFLOW); + const execution = await createExecution( + { + status: 'running', + data: stringify(IN_PROGRESS_EXECUTION_DATA), }, - }), - ); + workflow, + ); - /** - * Act - */ + /** + * Act + */ + const amendedExecution = await executionRecoveryService.recoverFromLogs(execution.id, []); - const amendedExecution = await executionRecoveryService.recover(execution.id, messages); + /** + * Assert + */ + if (!amendedExecution) fail('Expected `amendedExecution` to exist'); - /** - * Assert - */ - - const endOfLastNoderun = messages - .find((m) => m.eventName === 'n8n.node.finished' && m.payload.nodeName === 'DebugHelper') - ?.ts.toJSDate(); - - expect(amendedExecution).toEqual( - expect.objectContaining({ - status: 'crashed', - stoppedAt: endOfLastNoderun, - }), - ); - - const resultData = amendedExecution?.data.resultData; - - if (!resultData) fail('Expected `resultData` to be defined'); - - expect(resultData.error).toBeUndefined(); - expect(resultData.lastNodeExecuted).toBe('DebugHelper'); - - const runData = resultData.runData; - - if (!runData) fail('Expected `runData` to be defined'); - - const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0); - const debugHelperTaskData = runData.DebugHelper.at(0); - - expect(manualTriggerTaskData?.executionStatus).toBe('success'); - expect(manualTriggerTaskData?.error).toBeUndefined(); - - expect(debugHelperTaskData?.executionStatus).toBe('success'); - expect(debugHelperTaskData?.error).toBeUndefined(); - expect(debugHelperTaskData?.data).toEqual(ARTIFICIAL_TASK_DATA); + expect(amendedExecution.status).toBe('crashed'); + expect(amendedExecution.stoppedAt).not.toBe(execution.stoppedAt); + }); }); - test('should return `null` if no messages', async () => { - /** - * Arrange - */ - const workflow = await createWorkflow(OOM_WORKFLOW); - const execution = await createExecution( - { - status: 'running', - data: stringify(IN_PROGRESS_EXECUTION_DATA), - }, - workflow, - ); - const noMessages: EventMessage[] = []; + describe('if messages', () => { + test('should return `null` if no execution found', async () => { + /** + * Arrange + */ + const inexistentExecutionId = randomInteger(100).toString(); + const messages = setupMessages(inexistentExecutionId, 'Some workflow'); - /** - * Act - */ + /** + * Act + */ + const amendedExecution = await executionRecoveryService.recoverFromLogs( + inexistentExecutionId, + messages, + ); - const amendedExecution = await executionRecoveryService.recover(execution.id, noMessages); + /** + * Assert + */ + expect(amendedExecution).toBeNull(); + }); - /** - * Assert - */ + test('should update `status`, `stoppedAt` and `data` if last node did not finish', async () => { + /** + * Arrange + */ - expect(amendedExecution).toBeNull(); - }); + const workflow = await createWorkflow(OOM_WORKFLOW); - test('should return `null` if no execution', async () => { - /** - * Arrange - */ - const inexistentExecutionId = randomInteger(100).toString(); - const messages = setupMessages(inexistentExecutionId, 'Some workflow'); + const execution = await createExecution( + { + status: 'running', + data: stringify(IN_PROGRESS_EXECUTION_DATA), + }, + workflow, + ); - /** - * Act - */ + const messages = setupMessages(execution.id, workflow.name); - const amendedExecution = await executionRecoveryService.recover( - inexistentExecutionId, - messages, - ); + /** + * Act + */ - /** - * Assert - */ + const amendedExecution = await executionRecoveryService.recoverFromLogs( + execution.id, + messages, + ); - expect(amendedExecution).toBeNull(); + /** + * Assert + */ + + const startOfLastNodeRun = messages + .find((m) => m.eventName === 'n8n.node.started' && m.payload.nodeName === 'DebugHelper') + ?.ts.toJSDate(); + + expect(amendedExecution).toEqual( + expect.objectContaining({ + status: 'crashed', + stoppedAt: startOfLastNodeRun, + }), + ); + + const resultData = amendedExecution?.data.resultData; + + if (!resultData) fail('Expected `resultData` to be defined'); + + expect(resultData.error).toBeInstanceOf(WorkflowCrashedError); + expect(resultData.lastNodeExecuted).toBe('DebugHelper'); + + const runData = resultData.runData; + + if (!runData) fail('Expected `runData` to be defined'); + + const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0); + const debugHelperTaskData = runData.DebugHelper.at(0); + + expect(manualTriggerTaskData?.executionStatus).toBe('success'); + expect(manualTriggerTaskData?.error).toBeUndefined(); + expect(manualTriggerTaskData?.startTime).not.toBe(ARTIFICIAL_TASK_DATA); + + expect(debugHelperTaskData?.executionStatus).toBe('crashed'); + expect(debugHelperTaskData?.error).toBeInstanceOf(NodeCrashedError); + }); + + test('should update `status`, `stoppedAt` and `data` if last node finished', async () => { + /** + * Arrange + */ + const workflow = await createWorkflow(OOM_WORKFLOW); + + const execution = await createExecution( + { + status: 'running', + data: stringify(IN_PROGRESS_EXECUTION_DATA), + }, + workflow, + ); + + const messages = setupMessages(execution.id, workflow.name); + messages.push( + new EventMessageNode({ + eventName: 'n8n.node.finished', + payload: { + executionId: execution.id, + workflowName: workflow.name, + nodeName: 'DebugHelper', + nodeType: 'n8n-nodes-base.debugHelper', + }, + }), + ); + + /** + * Act + */ + const amendedExecution = await executionRecoveryService.recoverFromLogs( + execution.id, + messages, + ); + + /** + * Assert + */ + const endOfLastNoderun = messages + .find((m) => m.eventName === 'n8n.node.finished' && m.payload.nodeName === 'DebugHelper') + ?.ts.toJSDate(); + + expect(amendedExecution).toEqual( + expect.objectContaining({ + status: 'crashed', + stoppedAt: endOfLastNoderun, + }), + ); + + const resultData = amendedExecution?.data.resultData; + + if (!resultData) fail('Expected `resultData` to be defined'); + + expect(resultData.error).toBeUndefined(); + expect(resultData.lastNodeExecuted).toBe('DebugHelper'); + + const runData = resultData.runData; + + if (!runData) fail('Expected `runData` to be defined'); + + const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0); + const debugHelperTaskData = runData.DebugHelper.at(0); + + expect(manualTriggerTaskData?.executionStatus).toBe('success'); + expect(manualTriggerTaskData?.error).toBeUndefined(); + + expect(debugHelperTaskData?.executionStatus).toBe('success'); + expect(debugHelperTaskData?.error).toBeUndefined(); + expect(debugHelperTaskData?.data).toEqual(ARTIFICIAL_TASK_DATA); + }); }); }); }); diff --git a/packages/cli/src/executions/execution-recovery.service.ts b/packages/cli/src/executions/execution-recovery.service.ts index 1a57030561..4f6123403a 100644 --- a/packages/cli/src/executions/execution-recovery.service.ts +++ b/packages/cli/src/executions/execution-recovery.service.ts @@ -13,7 +13,7 @@ import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; import { ARTIFICIAL_TASK_DATA } from '@/constants'; /** - * Service for recovering executions truncated by an instance crash. + * Service for recovering key properties in executions. */ @Service() export class ExecutionRecoveryService { @@ -23,20 +23,9 @@ export class ExecutionRecoveryService { ) {} /** - * "Recovery" means (1) amending key properties of a truncated execution, - * (2) running post-execution hooks, and (3) returning the amended execution - * so the UI can reflect the error. "Recovery" does **not** mean injecting - * execution data from the logs (they hold none), or resuming the execution - * from the point of truncation, or re-running the whole execution. - * - * Recovery is only possible if event logs are available in the container. - * In regular mode, logs should but might not be available, e.g. due to container - * being recycled, max log size causing rotation, etc. In queue mode, as workers - * log to their own filesystems, only manual exections can be recovered. + * Recover key properties of a truncated execution using event logs. */ - async recover(executionId: string, messages: EventMessageTypes[]) { - if (messages.length === 0) return null; - + async recoverFromLogs(executionId: string, messages: EventMessageTypes[]) { const amendedExecution = await this.amend(executionId, messages); if (!amendedExecution) return null; @@ -53,10 +42,16 @@ export class ExecutionRecoveryService { return amendedExecution; } + // ---------------------------------- + // private + // ---------------------------------- + /** - * Amend `status`, `stoppedAt`, and `data` of an execution using event log messages. + * Amend `status`, `stoppedAt`, and (if possible) `data` properties of an execution. */ private async amend(executionId: string, messages: EventMessageTypes[]) { + if (messages.length === 0) return await this.amendWithoutLogs(executionId); + const { nodeMessages, workflowMessages } = this.toRelevantMessages(messages); if (nodeMessages.length === 0) return null; @@ -114,9 +109,20 @@ export class ExecutionRecoveryService { } as IExecutionResponse; } - // ---------------------------------- - // private - // ---------------------------------- + private async amendWithoutLogs(executionId: string) { + const exists = await this.executionRepository.exists({ where: { id: executionId } }); + + if (!exists) return null; + + await this.executionRepository.markAsCrashed(executionId); + + const execution = await this.executionRepository.findSingleExecution(executionId, { + includeData: true, + unflattenData: true, + }); + + return execution ?? null; + } private toRelevantMessages(messages: EventMessageTypes[]) { return messages.reduce<{ @@ -152,6 +158,8 @@ export class ExecutionRecoveryService { } private async runHooks(execution: IExecutionResponse) { + execution.data ??= { resultData: { runData: {} } }; + await Container.get(InternalHooks).onWorkflowPostExecute(execution.id, execution.workflowData, { data: execution.data, finished: false,