refactor(core): Include logless case in crash recovery (no-changelog) (#9725)

This commit is contained in:
Iván Ovejero 2024-06-13 12:54:51 +02:00 committed by GitHub
parent 28d1a5d00d
commit cfef49e60a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 227 additions and 239 deletions

View file

@ -260,7 +260,9 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
return String(executionId); return String(executionId);
} }
async markAsCrashed(executionIds: string[]) { async markAsCrashed(executionIds: string | string[]) {
if (!Array.isArray(executionIds)) executionIds = [executionIds];
await this.update( await this.update(
{ id: In(executionIds) }, { id: In(executionIds) },
{ {
@ -268,6 +270,10 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
stoppedAt: new Date(), stoppedAt: new Date(),
}, },
); );
this.logger.info('[Execution Recovery] Marked executions as `crashed`', {
executionIds,
});
} }
/** /**

View file

@ -175,18 +175,8 @@ export class MessageEventBus extends EventEmitter {
// start actual recovery process and write recovery process flag file // start actual recovery process and write recovery process flag file
this.logWriter?.startRecoveryProcess(); this.logWriter?.startRecoveryProcess();
for (const executionId of unfinishedExecutionIds) { for (const executionId of unfinishedExecutionIds) {
this.logger.warn(`Attempting to recover execution ${executionId}`); const logMesssages = unsentAndUnfinished.unfinishedExecutions[executionId];
if (!unsentAndUnfinished.unfinishedExecutions[executionId]?.length) { await this.recoveryService.recoverFromLogs(executionId, logMesssages ?? []);
this.logger.debug(
`No event messages found, marking execution ${executionId} as 'crashed'`,
);
await this.executionRepository.markAsCrashed([executionId]);
} else {
await this.recoveryService.recover(
executionId,
unsentAndUnfinished.unfinishedExecutions[executionId],
);
}
} }
} }
// remove the recovery process flag file // remove the recovery process flag file
@ -367,7 +357,7 @@ export class MessageEventBus extends EventEmitter {
async getUnsentAndUnfinishedExecutions(): Promise<{ async getUnsentAndUnfinishedExecutions(): Promise<{
unsentMessages: EventMessageTypes[]; unsentMessages: EventMessageTypes[];
unfinishedExecutions: Record<string, EventMessageTypes[]>; unfinishedExecutions: Record<string, EventMessageTypes[] | undefined>;
}> { }> {
const queryResult = await this.logWriter?.getUnsentAndUnfinishedExecutions(); const queryResult = await this.logWriter?.getUnsentAndUnfinishedExecutions();
return queryResult; return queryResult;

View file

@ -174,17 +174,15 @@ export const setupMessages = (executionId: string, workflowName: string): EventM
describe('ExecutionRecoveryService', () => { describe('ExecutionRecoveryService', () => {
let executionRecoveryService: ExecutionRecoveryService; let executionRecoveryService: ExecutionRecoveryService;
let push: Push; let push: Push;
let executionRepository: ExecutionRepository;
beforeAll(async () => { beforeAll(async () => {
await testDb.init(); await testDb.init();
mockInstance(InternalHooks); mockInstance(InternalHooks);
push = mockInstance(Push); push = mockInstance(Push);
executionRepository = Container.get(ExecutionRepository);
executionRecoveryService = new ExecutionRecoveryService( executionRecoveryService = new ExecutionRecoveryService(push, executionRepository);
push,
Container.get(ExecutionRepository),
);
}); });
afterEach(async () => { afterEach(async () => {
@ -195,226 +193,212 @@ describe('ExecutionRecoveryService', () => {
await testDb.terminate(); await testDb.terminate();
}); });
describe('recover', () => { describe('recoverFromLogs', () => {
it('should amend, persist, run hooks, broadcast', async () => { describe('if no messages', () => {
/** test('should return `null` if no execution found', async () => {
* Arrange /**
*/ * Arrange
// @ts-expect-error Private method */
const amendSpy = jest.spyOn(executionRecoveryService, 'amend'); const inexistentExecutionId = randomInteger(100).toString();
const executionRepository = Container.get(ExecutionRepository); const noMessages: EventMessage[] = [];
const dbUpdateSpy = jest.spyOn(executionRepository, 'update');
// @ts-expect-error Private method
const runHooksSpy = jest.spyOn(executionRecoveryService, 'runHooks');
const workflow = await createWorkflow(OOM_WORKFLOW); /**
* Act
*/
const amendedExecution = await executionRecoveryService.recoverFromLogs(
inexistentExecutionId,
noMessages,
);
const execution = await createExecution( /**
{ * Assert
status: 'running', */
data: stringify(IN_PROGRESS_EXECUTION_DATA), expect(amendedExecution).toBeNull();
}, });
workflow,
);
const messages = setupMessages(execution.id, workflow.name); test('should update `status` and `stoppedAt`', async () => {
/**
/** * Arrange
* Act */
*/ const workflow = await createWorkflow(OOM_WORKFLOW);
const execution = await createExecution(
await executionRecoveryService.recover(execution.id, messages); {
status: 'running',
/** data: stringify(IN_PROGRESS_EXECUTION_DATA),
* Assert
*/
expect(amendSpy).toHaveBeenCalledTimes(1);
expect(amendSpy).toHaveBeenCalledWith(execution.id, messages);
expect(dbUpdateSpy).toHaveBeenCalledTimes(1);
expect(runHooksSpy).toHaveBeenCalledTimes(1);
expect(push.once).toHaveBeenCalledTimes(1);
});
test('should amend a truncated execution where last node did not finish', async () => {
/**
* Arrange
*/
const workflow = await createWorkflow(OOM_WORKFLOW);
const execution = await createExecution(
{
status: 'running',
data: stringify(IN_PROGRESS_EXECUTION_DATA),
},
workflow,
);
const messages = setupMessages(execution.id, workflow.name);
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recover(execution.id, messages);
/**
* Assert
*/
const startOfLastNodeRun = messages
.find((m) => m.eventName === 'n8n.node.started' && m.payload.nodeName === 'DebugHelper')
?.ts.toJSDate();
expect(amendedExecution).toEqual(
expect.objectContaining({
status: 'crashed',
stoppedAt: startOfLastNodeRun,
}),
);
const resultData = amendedExecution?.data.resultData;
if (!resultData) fail('Expected `resultData` to be defined');
expect(resultData.error).toBeInstanceOf(WorkflowCrashedError);
expect(resultData.lastNodeExecuted).toBe('DebugHelper');
const runData = resultData.runData;
if (!runData) fail('Expected `runData` to be defined');
const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0);
const debugHelperTaskData = runData.DebugHelper.at(0);
expect(manualTriggerTaskData?.executionStatus).toBe('success');
expect(manualTriggerTaskData?.error).toBeUndefined();
expect(manualTriggerTaskData?.startTime).not.toBe(ARTIFICIAL_TASK_DATA);
expect(debugHelperTaskData?.executionStatus).toBe('crashed');
expect(debugHelperTaskData?.error).toBeInstanceOf(NodeCrashedError);
});
test('should amend a truncated execution where last node finished', async () => {
/**
* Arrange
*/
const workflow = await createWorkflow(OOM_WORKFLOW);
const execution = await createExecution(
{
status: 'running',
data: stringify(IN_PROGRESS_EXECUTION_DATA),
},
workflow,
);
const messages = setupMessages(execution.id, workflow.name);
messages.push(
new EventMessageNode({
eventName: 'n8n.node.finished',
payload: {
executionId: execution.id,
workflowName: workflow.name,
nodeName: 'DebugHelper',
nodeType: 'n8n-nodes-base.debugHelper',
}, },
}), workflow,
); );
/** /**
* Act * Act
*/ */
const amendedExecution = await executionRecoveryService.recoverFromLogs(execution.id, []);
const amendedExecution = await executionRecoveryService.recover(execution.id, messages); /**
* Assert
*/
if (!amendedExecution) fail('Expected `amendedExecution` to exist');
/** expect(amendedExecution.status).toBe('crashed');
* Assert expect(amendedExecution.stoppedAt).not.toBe(execution.stoppedAt);
*/ });
const endOfLastNoderun = messages
.find((m) => m.eventName === 'n8n.node.finished' && m.payload.nodeName === 'DebugHelper')
?.ts.toJSDate();
expect(amendedExecution).toEqual(
expect.objectContaining({
status: 'crashed',
stoppedAt: endOfLastNoderun,
}),
);
const resultData = amendedExecution?.data.resultData;
if (!resultData) fail('Expected `resultData` to be defined');
expect(resultData.error).toBeUndefined();
expect(resultData.lastNodeExecuted).toBe('DebugHelper');
const runData = resultData.runData;
if (!runData) fail('Expected `runData` to be defined');
const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0);
const debugHelperTaskData = runData.DebugHelper.at(0);
expect(manualTriggerTaskData?.executionStatus).toBe('success');
expect(manualTriggerTaskData?.error).toBeUndefined();
expect(debugHelperTaskData?.executionStatus).toBe('success');
expect(debugHelperTaskData?.error).toBeUndefined();
expect(debugHelperTaskData?.data).toEqual(ARTIFICIAL_TASK_DATA);
}); });
test('should return `null` if no messages', async () => { describe('if messages', () => {
/** test('should return `null` if no execution found', async () => {
* Arrange /**
*/ * Arrange
const workflow = await createWorkflow(OOM_WORKFLOW); */
const execution = await createExecution( const inexistentExecutionId = randomInteger(100).toString();
{ const messages = setupMessages(inexistentExecutionId, 'Some workflow');
status: 'running',
data: stringify(IN_PROGRESS_EXECUTION_DATA),
},
workflow,
);
const noMessages: EventMessage[] = [];
/** /**
* Act * Act
*/ */
const amendedExecution = await executionRecoveryService.recoverFromLogs(
inexistentExecutionId,
messages,
);
const amendedExecution = await executionRecoveryService.recover(execution.id, noMessages); /**
* Assert
*/
expect(amendedExecution).toBeNull();
});
/** test('should update `status`, `stoppedAt` and `data` if last node did not finish', async () => {
* Assert /**
*/ * Arrange
*/
expect(amendedExecution).toBeNull(); const workflow = await createWorkflow(OOM_WORKFLOW);
});
test('should return `null` if no execution', async () => { const execution = await createExecution(
/** {
* Arrange status: 'running',
*/ data: stringify(IN_PROGRESS_EXECUTION_DATA),
const inexistentExecutionId = randomInteger(100).toString(); },
const messages = setupMessages(inexistentExecutionId, 'Some workflow'); workflow,
);
/** const messages = setupMessages(execution.id, workflow.name);
* Act
*/
const amendedExecution = await executionRecoveryService.recover( /**
inexistentExecutionId, * Act
messages, */
);
/** const amendedExecution = await executionRecoveryService.recoverFromLogs(
* Assert execution.id,
*/ messages,
);
expect(amendedExecution).toBeNull(); /**
* Assert
*/
const startOfLastNodeRun = messages
.find((m) => m.eventName === 'n8n.node.started' && m.payload.nodeName === 'DebugHelper')
?.ts.toJSDate();
expect(amendedExecution).toEqual(
expect.objectContaining({
status: 'crashed',
stoppedAt: startOfLastNodeRun,
}),
);
const resultData = amendedExecution?.data.resultData;
if (!resultData) fail('Expected `resultData` to be defined');
expect(resultData.error).toBeInstanceOf(WorkflowCrashedError);
expect(resultData.lastNodeExecuted).toBe('DebugHelper');
const runData = resultData.runData;
if (!runData) fail('Expected `runData` to be defined');
const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0);
const debugHelperTaskData = runData.DebugHelper.at(0);
expect(manualTriggerTaskData?.executionStatus).toBe('success');
expect(manualTriggerTaskData?.error).toBeUndefined();
expect(manualTriggerTaskData?.startTime).not.toBe(ARTIFICIAL_TASK_DATA);
expect(debugHelperTaskData?.executionStatus).toBe('crashed');
expect(debugHelperTaskData?.error).toBeInstanceOf(NodeCrashedError);
});
test('should update `status`, `stoppedAt` and `data` if last node finished', async () => {
/**
* Arrange
*/
const workflow = await createWorkflow(OOM_WORKFLOW);
const execution = await createExecution(
{
status: 'running',
data: stringify(IN_PROGRESS_EXECUTION_DATA),
},
workflow,
);
const messages = setupMessages(execution.id, workflow.name);
messages.push(
new EventMessageNode({
eventName: 'n8n.node.finished',
payload: {
executionId: execution.id,
workflowName: workflow.name,
nodeName: 'DebugHelper',
nodeType: 'n8n-nodes-base.debugHelper',
},
}),
);
/**
* Act
*/
const amendedExecution = await executionRecoveryService.recoverFromLogs(
execution.id,
messages,
);
/**
* Assert
*/
const endOfLastNoderun = messages
.find((m) => m.eventName === 'n8n.node.finished' && m.payload.nodeName === 'DebugHelper')
?.ts.toJSDate();
expect(amendedExecution).toEqual(
expect.objectContaining({
status: 'crashed',
stoppedAt: endOfLastNoderun,
}),
);
const resultData = amendedExecution?.data.resultData;
if (!resultData) fail('Expected `resultData` to be defined');
expect(resultData.error).toBeUndefined();
expect(resultData.lastNodeExecuted).toBe('DebugHelper');
const runData = resultData.runData;
if (!runData) fail('Expected `runData` to be defined');
const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0);
const debugHelperTaskData = runData.DebugHelper.at(0);
expect(manualTriggerTaskData?.executionStatus).toBe('success');
expect(manualTriggerTaskData?.error).toBeUndefined();
expect(debugHelperTaskData?.executionStatus).toBe('success');
expect(debugHelperTaskData?.error).toBeUndefined();
expect(debugHelperTaskData?.data).toEqual(ARTIFICIAL_TASK_DATA);
});
}); });
}); });
}); });

View file

@ -13,7 +13,7 @@ import { WorkflowCrashedError } from '@/errors/workflow-crashed.error';
import { ARTIFICIAL_TASK_DATA } from '@/constants'; import { ARTIFICIAL_TASK_DATA } from '@/constants';
/** /**
* Service for recovering executions truncated by an instance crash. * Service for recovering key properties in executions.
*/ */
@Service() @Service()
export class ExecutionRecoveryService { export class ExecutionRecoveryService {
@ -23,20 +23,9 @@ export class ExecutionRecoveryService {
) {} ) {}
/** /**
* "Recovery" means (1) amending key properties of a truncated execution, * Recover key properties of a truncated execution using event logs.
* (2) running post-execution hooks, and (3) returning the amended execution
* so the UI can reflect the error. "Recovery" does **not** mean injecting
* execution data from the logs (they hold none), or resuming the execution
* from the point of truncation, or re-running the whole execution.
*
* Recovery is only possible if event logs are available in the container.
* In regular mode, logs should but might not be available, e.g. due to container
* being recycled, max log size causing rotation, etc. In queue mode, as workers
* log to their own filesystems, only manual exections can be recovered.
*/ */
async recover(executionId: string, messages: EventMessageTypes[]) { async recoverFromLogs(executionId: string, messages: EventMessageTypes[]) {
if (messages.length === 0) return null;
const amendedExecution = await this.amend(executionId, messages); const amendedExecution = await this.amend(executionId, messages);
if (!amendedExecution) return null; if (!amendedExecution) return null;
@ -53,10 +42,16 @@ export class ExecutionRecoveryService {
return amendedExecution; return amendedExecution;
} }
// ----------------------------------
// private
// ----------------------------------
/** /**
* Amend `status`, `stoppedAt`, and `data` of an execution using event log messages. * Amend `status`, `stoppedAt`, and (if possible) `data` properties of an execution.
*/ */
private async amend(executionId: string, messages: EventMessageTypes[]) { private async amend(executionId: string, messages: EventMessageTypes[]) {
if (messages.length === 0) return await this.amendWithoutLogs(executionId);
const { nodeMessages, workflowMessages } = this.toRelevantMessages(messages); const { nodeMessages, workflowMessages } = this.toRelevantMessages(messages);
if (nodeMessages.length === 0) return null; if (nodeMessages.length === 0) return null;
@ -114,9 +109,20 @@ export class ExecutionRecoveryService {
} as IExecutionResponse; } as IExecutionResponse;
} }
// ---------------------------------- private async amendWithoutLogs(executionId: string) {
// private const exists = await this.executionRepository.exists({ where: { id: executionId } });
// ----------------------------------
if (!exists) return null;
await this.executionRepository.markAsCrashed(executionId);
const execution = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
return execution ?? null;
}
private toRelevantMessages(messages: EventMessageTypes[]) { private toRelevantMessages(messages: EventMessageTypes[]) {
return messages.reduce<{ return messages.reduce<{
@ -152,6 +158,8 @@ export class ExecutionRecoveryService {
} }
private async runHooks(execution: IExecutionResponse) { private async runHooks(execution: IExecutionResponse) {
execution.data ??= { resultData: { runData: {} } };
await Container.get(InternalHooks).onWorkflowPostExecute(execution.id, execution.workflowData, { await Container.get(InternalHooks).onWorkflowPostExecute(execution.id, execution.workflowData, {
data: execution.data, data: execution.data,
finished: false, finished: false,