refactor(core): Introduce ManualExecutionService (#12156)

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Iván Ovejero 2024-12-11 16:29:57 +01:00 committed by GitHub
parent 9fc0ecb89b
commit 77e2c75ca6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 142 additions and 119 deletions

View file

@ -1,8 +1,11 @@
import { mock } from 'jest-mock-extended';
import type { Workflow, IWorkflowExecutionDataProcess } from 'n8n-workflow'; import type { Workflow, IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { getExecutionStartNode } from '@/workflow-helpers'; import { ManualExecutionService } from '@/manual-execution.service';
describe('ManualExecutionService', () => {
const manualExecutionService = new ManualExecutionService(mock());
describe('WorkflowHelpers', () => {
describe('getExecutionStartNode', () => { describe('getExecutionStartNode', () => {
it('Should return undefined', () => { it('Should return undefined', () => {
const data = { const data = {
@ -16,9 +19,10 @@ describe('WorkflowHelpers', () => {
}; };
}, },
} as unknown as Workflow; } as unknown as Workflow;
const executionStartNode = getExecutionStartNode(data, workflow); const executionStartNode = manualExecutionService.getExecutionStartNode(data, workflow);
expect(executionStartNode).toBeUndefined(); expect(executionStartNode).toBeUndefined();
}); });
it('Should return startNode', () => { it('Should return startNode', () => {
const data = { const data = {
pinData: { pinData: {
@ -37,7 +41,7 @@ describe('WorkflowHelpers', () => {
return undefined; return undefined;
}, },
} as unknown as Workflow; } as unknown as Workflow;
const executionStartNode = getExecutionStartNode(data, workflow); const executionStartNode = manualExecutionService.getExecutionStartNode(data, workflow);
expect(executionStartNode).toEqual({ expect(executionStartNode).toEqual({
name: 'node2', name: 'node2',
}); });

View file

@ -0,0 +1,124 @@
import * as a from 'assert/strict';
import {
DirectedGraph,
filterDisabledNodes,
recreateNodeExecutionStack,
WorkflowExecute,
} from 'n8n-core';
import type {
IPinData,
IRun,
IRunExecutionData,
IWorkflowExecuteAdditionalData,
IWorkflowExecutionDataProcess,
Workflow,
} from 'n8n-workflow';
import type PCancelable from 'p-cancelable';
import { Service } from 'typedi';
import { Logger } from '@/logging/logger.service';
@Service()
export class ManualExecutionService {
constructor(private readonly logger: Logger) {}
getExecutionStartNode(data: IWorkflowExecutionDataProcess, workflow: Workflow) {
let startNode;
if (
data.startNodes?.length === 1 &&
Object.keys(data.pinData ?? {}).includes(data.startNodes[0].name)
) {
startNode = workflow.getNode(data.startNodes[0].name) ?? undefined;
}
return startNode;
}
// eslint-disable-next-line @typescript-eslint/promise-function-async
runManually(
data: IWorkflowExecutionDataProcess,
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData,
executionId: string,
pinData?: IPinData,
): PCancelable<IRun> {
if (data.triggerToStartFrom?.data && data.startNodes && !data.destinationNode) {
this.logger.debug(
`Execution ID ${executionId} had triggerToStartFrom. Starting from that trigger.`,
{ executionId },
);
const startNodes = data.startNodes.map((startNode) => {
const node = workflow.getNode(startNode.name);
a.ok(node, `Could not find a node named "${startNode.name}" in the workflow.`);
return node;
});
const runData = { [data.triggerToStartFrom.name]: [data.triggerToStartFrom.data] };
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(
filterDisabledNodes(DirectedGraph.fromWorkflow(workflow)),
new Set(startNodes),
runData,
data.pinData ?? {},
);
const executionData: IRunExecutionData = {
resultData: { runData, pinData },
executionData: {
contextData: {},
metadata: {},
nodeExecutionStack,
waitingExecution,
waitingExecutionSource,
},
};
const workflowExecute = new WorkflowExecute(additionalData, 'manual', executionData);
return workflowExecute.processRunExecutionData(workflow);
} else if (
data.runData === undefined ||
data.startNodes === undefined ||
data.startNodes.length === 0
) {
// Full Execution
// TODO: When the old partial execution logic is removed this block can
// be removed and the previous one can be merged into
// `workflowExecute.runPartialWorkflow2`.
// Partial executions then require either a destination node from which
// everything else can be derived, or a triggerToStartFrom with
// triggerData.
this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, {
executionId,
});
// Execute all nodes
const startNode = this.getExecutionStartNode(data, workflow);
// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
return workflowExecute.run(workflow, startNode, data.destinationNode, data.pinData);
} else {
// Partial Execution
this.logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId });
// Execute only the nodes between start and destination nodes
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
if (data.partialExecutionVersion === '1') {
return workflowExecute.runPartialWorkflow2(
workflow,
data.runData,
data.pinData,
data.dirtyNodeNames,
data.destinationNode,
);
} else {
return workflowExecute.runPartialWorkflow(
workflow,
data.runData,
data.startNodes,
data.destinationNode,
data.pinData,
);
}
}
}
}

View file

@ -7,9 +7,7 @@ import type {
NodeApiError, NodeApiError,
WorkflowExecuteMode, WorkflowExecuteMode,
WorkflowOperationError, WorkflowOperationError,
Workflow,
NodeOperationError, NodeOperationError,
IWorkflowExecutionDataProcess,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { Container } from 'typedi'; import { Container } from 'typedi';
import { v4 as uuid } from 'uuid'; import { v4 as uuid } from 'uuid';
@ -223,18 +221,6 @@ export async function replaceInvalidCredentials(workflow: WorkflowEntity): Promi
return workflow; return workflow;
} }
export function getExecutionStartNode(data: IWorkflowExecutionDataProcess, workflow: Workflow) {
let startNode;
if (
data.startNodes?.length === 1 &&
Object.keys(data.pinData ?? {}).includes(data.startNodes[0].name)
) {
startNode = workflow.getNode(data.startNodes[0].name) ?? undefined;
}
return startNode;
}
export async function getVariables(): Promise<IDataObject> { export async function getVariables(): Promise<IDataObject> {
const variables = await Container.get(VariablesService).getAllCached(); const variables = await Container.get(VariablesService).getAllCached();
return Object.freeze( return Object.freeze(

View file

@ -2,15 +2,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-shadow */ /* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */
import * as a from 'assert/strict'; import { ErrorReporter, InstanceSettings, WorkflowExecute } from 'n8n-core';
import {
DirectedGraph,
ErrorReporter,
InstanceSettings,
WorkflowExecute,
filterDisabledNodes,
recreateNodeExecutionStack,
} from 'n8n-core';
import type { import type {
ExecutionError, ExecutionError,
IDeferredPromise, IDeferredPromise,
@ -20,8 +12,6 @@ import type {
WorkflowExecuteMode, WorkflowExecuteMode,
WorkflowHooks, WorkflowHooks,
IWorkflowExecutionDataProcess, IWorkflowExecutionDataProcess,
IRunExecutionData,
IWorkflowExecuteAdditionalData,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { ExecutionCancelledError, Workflow } from 'n8n-workflow'; import { ExecutionCancelledError, Workflow } from 'n8n-workflow';
import PCancelable from 'p-cancelable'; import PCancelable from 'p-cancelable';
@ -37,12 +27,12 @@ import type { ScalingService } from '@/scaling/scaling.service';
import type { Job, JobData } from '@/scaling/scaling.types'; import type { Job, JobData } from '@/scaling/scaling.types';
import { PermissionChecker } from '@/user-management/permission-checker'; import { PermissionChecker } from '@/user-management/permission-checker';
import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data';
import * as WorkflowHelpers from '@/workflow-helpers';
import { generateFailedExecutionFromError } from '@/workflow-helpers'; import { generateFailedExecutionFromError } from '@/workflow-helpers';
import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service'; import { WorkflowStaticDataService } from '@/workflows/workflow-static-data.service';
import { ExecutionNotFoundError } from './errors/execution-not-found-error'; import { ExecutionNotFoundError } from './errors/execution-not-found-error';
import { EventService } from './events/event.service'; import { EventService } from './events/event.service';
import { ManualExecutionService } from './manual-execution.service';
@Service() @Service()
export class WorkflowRunner { export class WorkflowRunner {
@ -61,6 +51,7 @@ export class WorkflowRunner {
private readonly permissionChecker: PermissionChecker, private readonly permissionChecker: PermissionChecker,
private readonly eventService: EventService, private readonly eventService: EventService,
private readonly instanceSettings: InstanceSettings, private readonly instanceSettings: InstanceSettings,
private readonly manualExecutionService: ManualExecutionService,
) {} ) {}
/** The process did error */ /** The process did error */
@ -295,7 +286,13 @@ export class WorkflowRunner {
); );
workflowExecution = workflowExecute.processRunExecutionData(workflow); workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else { } else {
workflowExecution = this.runManually(data, workflow, additionalData, executionId, pinData); workflowExecution = this.manualExecutionService.runManually(
data,
workflow,
additionalData,
executionId,
pinData,
);
} }
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
@ -458,92 +455,4 @@ export class WorkflowRunner {
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
} }
// eslint-disable-next-line @typescript-eslint/promise-function-async
runManually(
data: IWorkflowExecutionDataProcess,
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData,
executionId: string,
pinData?: IPinData,
) {
if (data.triggerToStartFrom?.data && data.startNodes && !data.destinationNode) {
this.logger.debug(
`Execution ID ${executionId} had triggerToStartFrom. Starting from that trigger.`,
{ executionId },
);
const startNodes = data.startNodes.map((data) => {
const node = workflow.getNode(data.name);
a.ok(node, `Could not find a node named "${data.name}" in the workflow.`);
return node;
});
const runData = { [data.triggerToStartFrom.name]: [data.triggerToStartFrom.data] };
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(
filterDisabledNodes(DirectedGraph.fromWorkflow(workflow)),
new Set(startNodes),
runData,
data.pinData ?? {},
);
const executionData: IRunExecutionData = {
resultData: { runData, pinData },
executionData: {
contextData: {},
metadata: {},
nodeExecutionStack,
waitingExecution,
waitingExecutionSource,
},
};
const workflowExecute = new WorkflowExecute(additionalData, 'manual', executionData);
return workflowExecute.processRunExecutionData(workflow);
} else if (
data.runData === undefined ||
data.startNodes === undefined ||
data.startNodes.length === 0
) {
// Full Execution
// TODO: When the old partial execution logic is removed this block can
// be removed and the previous one can be merged into
// `workflowExecute.runPartialWorkflow2`.
// Partial executions then require either a destination node from which
// everything else can be derived, or a triggerToStartFrom with
// triggerData.
this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, {
executionId,
});
// Execute all nodes
const startNode = WorkflowHelpers.getExecutionStartNode(data, workflow);
// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
return workflowExecute.run(workflow, startNode, data.destinationNode, data.pinData);
} else {
// Partial Execution
this.logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId });
// Execute only the nodes between start and destination nodes
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
if (data.partialExecutionVersion === '1') {
return workflowExecute.runPartialWorkflow2(
workflow,
data.runData,
data.pinData,
data.dirtyNodeNames,
data.destinationNode,
);
} else {
return workflowExecute.runPartialWorkflow(
workflow,
data.runData,
data.startNodes,
data.destinationNode,
data.pinData,
);
}
}
}
} }