fix(core): Support execution recovery when saving execution progress (#10104)

This commit is contained in:
Iván Ovejero 2024-07-22 13:59:40 +02:00 committed by GitHub
parent 03a833db51
commit d887c82d80
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 175 additions and 158 deletions

View file

@ -0,0 +1,116 @@
import type { WorkflowEntity } from '@/databases/entities/WorkflowEntity';
import { NodeConnectionType } from 'n8n-workflow';
/**
* Workflow producing an execution whose data will be truncated by an instance crash.
*/
export const OOM_WORKFLOW: Partial<WorkflowEntity> = {
nodes: [
{
parameters: {},
id: '48ce17fe-9651-42ae-910c-48602a00f0bb',
name: 'When clicking "Test workflow"',
type: 'n8n-nodes-base.manualTrigger',
typeVersion: 1,
position: [640, 260],
},
{
parameters: {
category: 'oom',
memorySizeValue: 1000,
},
id: '07a48151-96d3-45eb-961c-1daf85fbe052',
name: 'DebugHelper',
type: 'n8n-nodes-base.debugHelper',
typeVersion: 1,
position: [840, 260],
},
],
connections: {
'When clicking "Test workflow"': {
main: [
[
{
node: 'DebugHelper',
type: NodeConnectionType.Main,
index: 0,
},
],
],
},
},
pinData: {},
};
/**
* Snapshot of an execution that will be truncated by an instance crash.
*/
export const IN_PROGRESS_EXECUTION_DATA = {
startData: {},
resultData: {
runData: {
'When clicking "Test workflow"': [
{
hints: [],
startTime: 1716138610153,
executionTime: 1,
source: [],
executionStatus: 'success',
data: {
main: [
[
{
json: {},
pairedItem: {
item: 0,
},
},
],
],
},
},
],
},
lastNodeExecuted: 'When clicking "Test workflow"',
},
executionData: {
contextData: {},
nodeExecutionStack: [
{
node: {
parameters: {
category: 'oom',
memorySizeValue: 1000,
},
id: '07a48151-96d3-45eb-961c-1daf85fbe052',
name: 'DebugHelper',
type: 'n8n-nodes-base.debugHelper',
typeVersion: 1,
position: [840, 260],
},
data: {
main: [
[
{
json: {},
pairedItem: {
item: 0,
},
},
],
],
},
source: {
main: [
{
previousNode: 'When clicking "Test workflow"',
},
],
},
},
],
metadata: {},
waitingExecution: {},
waitingExecutionSource: {},
},
};

View file

@ -1,6 +1,6 @@
import Container from 'typedi';
import { stringify } from 'flatted';
import { NodeConnectionType, randomInt } from 'n8n-workflow';
import { randomInt } from 'n8n-workflow';
import { mockInstance } from '@test/mocking';
import { createWorkflow } from '@test-integration/db/workflows';
@ -12,169 +12,19 @@ import { OrchestrationService } from '@/services/orchestration.service';
import config from '@/config';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
import { ExecutionRepository } from '@/databases/repositories/execution.repository';
import type { WorkflowEntity } from '@/databases/entities/WorkflowEntity';
import { InternalHooks } from '@/InternalHooks';
import { Push } from '@/push';
import { ARTIFICIAL_TASK_DATA } from '@/constants';
import { NodeCrashedError } from '@/errors/node-crashed.error';
import { WorkflowCrashedError } from '@/errors/workflow-crashed.error';
import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNode';
import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow';
import { IN_PROGRESS_EXECUTION_DATA, OOM_WORKFLOW } from './constants';
import { setupMessages } from './utils';
import type { EventService } from '@/eventbus/event.service';
import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses';
import type { Logger } from '@/Logger';
/**
* Workflow producing an execution whose data will be truncated by an instance crash.
*/
export const OOM_WORKFLOW: Partial<WorkflowEntity> = {
nodes: [
{
parameters: {},
id: '48ce17fe-9651-42ae-910c-48602a00f0bb',
name: 'When clicking "Test workflow"',
type: 'n8n-nodes-base.manualTrigger',
typeVersion: 1,
position: [640, 260],
},
{
parameters: {
category: 'oom',
memorySizeValue: 1000,
},
id: '07a48151-96d3-45eb-961c-1daf85fbe052',
name: 'DebugHelper',
type: 'n8n-nodes-base.debugHelper',
typeVersion: 1,
position: [840, 260],
},
],
connections: {
'When clicking "Test workflow"': {
main: [
[
{
node: 'DebugHelper',
type: NodeConnectionType.Main,
index: 0,
},
],
],
},
},
pinData: {},
};
/**
* Snapshot of an execution that will be truncated by an instance crash.
*/
export const IN_PROGRESS_EXECUTION_DATA = {
startData: {},
resultData: {
runData: {
'When clicking "Test workflow"': [
{
hints: [],
startTime: 1716138610153,
executionTime: 1,
source: [],
executionStatus: 'success',
data: {
main: [
[
{
json: {},
pairedItem: {
item: 0,
},
},
],
],
},
},
],
},
lastNodeExecuted: 'When clicking "Test workflow"',
},
executionData: {
contextData: {},
nodeExecutionStack: [
{
node: {
parameters: {
category: 'oom',
memorySizeValue: 1000,
},
id: '07a48151-96d3-45eb-961c-1daf85fbe052',
name: 'DebugHelper',
type: 'n8n-nodes-base.debugHelper',
typeVersion: 1,
position: [840, 260],
},
data: {
main: [
[
{
json: {},
pairedItem: {
item: 0,
},
},
],
],
},
source: {
main: [
{
previousNode: 'When clicking "Test workflow"',
},
],
},
},
],
metadata: {},
waitingExecution: {},
waitingExecutionSource: {},
},
};
export const setupMessages = (executionId: string, workflowName: string): EventMessage[] => {
return [
new EventMessageWorkflow({
eventName: 'n8n.workflow.started',
payload: { executionId },
}),
new EventMessageNode({
eventName: 'n8n.node.started',
payload: {
executionId,
workflowName,
nodeName: 'When clicking "Test workflow"',
nodeType: 'n8n-nodes-base.manualTrigger',
},
}),
new EventMessageNode({
eventName: 'n8n.node.finished',
payload: {
executionId,
workflowName,
nodeName: 'When clicking "Test workflow"',
nodeType: 'n8n-nodes-base.manualTrigger',
},
}),
new EventMessageNode({
eventName: 'n8n.node.started',
payload: {
executionId,
workflowName,
nodeName: 'DebugHelper',
nodeType: 'n8n-nodes-base.debugHelper',
},
}),
];
};
describe('ExecutionRecoveryService', () => {
let push: Push;
let executionRecoveryService: ExecutionRecoveryService;
@ -446,12 +296,20 @@ describe('ExecutionRecoveryService', () => {
const manualTriggerTaskData = runData['When clicking "Test workflow"'].at(0);
const debugHelperTaskData = runData.DebugHelper.at(0);
expect(manualTriggerTaskData?.executionStatus).toBe('success');
expect(manualTriggerTaskData?.error).toBeUndefined();
expect(manualTriggerTaskData?.startTime).not.toBe(ARTIFICIAL_TASK_DATA);
if (!manualTriggerTaskData) fail("Expected manual trigger's `taskData` to be defined");
if (!debugHelperTaskData) fail("Expected debug helper's `taskData` to be defined");
expect(debugHelperTaskData?.executionStatus).toBe('crashed');
expect(debugHelperTaskData?.error).toBeInstanceOf(NodeCrashedError);
const originalManualTriggerTaskData =
IN_PROGRESS_EXECUTION_DATA.resultData.runData['When clicking "Test workflow"'].at(
0,
)?.data;
expect(manualTriggerTaskData.executionStatus).toBe('success');
expect(manualTriggerTaskData.error).toBeUndefined();
expect(manualTriggerTaskData.data).toStrictEqual(originalManualTriggerTaskData); // unchanged
expect(debugHelperTaskData.executionStatus).toBe('crashed');
expect(debugHelperTaskData.error).toBeInstanceOf(NodeCrashedError);
});
test('should update `status`, `stoppedAt` and `data` if last node finished', async () => {

View file

@ -0,0 +1,39 @@
import type { EventMessageTypes as EventMessage } from '@/eventbus/EventMessageClasses';
import { EventMessageNode } from '@/eventbus/EventMessageClasses/EventMessageNode';
import { EventMessageWorkflow } from '@/eventbus/EventMessageClasses/EventMessageWorkflow';
export const setupMessages = (executionId: string, workflowName: string): EventMessage[] => {
return [
new EventMessageWorkflow({
eventName: 'n8n.workflow.started',
payload: { executionId },
}),
new EventMessageNode({
eventName: 'n8n.node.started',
payload: {
executionId,
workflowName,
nodeName: 'When clicking "Test workflow"',
nodeType: 'n8n-nodes-base.manualTrigger',
},
}),
new EventMessageNode({
eventName: 'n8n.node.finished',
payload: {
executionId,
workflowName,
nodeName: 'When clicking "Test workflow"',
nodeType: 'n8n-nodes-base.manualTrigger',
},
}),
new EventMessageNode({
eventName: 'n8n.node.started',
payload: {
executionId,
workflowName,
nodeName: 'DebugHelper',
nodeType: 'n8n-nodes-base.debugHelper',
},
}),
];
};

View file

@ -190,6 +190,10 @@ export class ExecutionRecoveryService {
if (!nodeStartedMessage) continue;
const nodeHasRunData = runExecutionData.resultData.runData[node.name] !== undefined;
if (nodeHasRunData) continue; // when saving execution progress
const nodeFinishedMessage = nodeMessages.find(
(m) => m.payload.nodeName === node.name && m.eventName === 'n8n.node.finished',
);