refactor(core): Encapsulate manual execution flow in WorkflowRunner (#12135)

This commit is contained in:
Iván Ovejero 2024-12-10 16:38:24 +01:00 committed by GitHub
parent eb7d5934ef
commit b37e5142b2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -20,6 +20,7 @@ import type {
WorkflowHooks,
IWorkflowExecutionDataProcess,
IRunExecutionData,
IWorkflowExecuteAdditionalData,
} from 'n8n-workflow';
import {
ErrorReporterProxy as ErrorReporter,
@ -295,88 +296,8 @@ export class WorkflowRunner {
data.executionData,
);
workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else if (data.triggerToStartFrom?.data && data.startNodes && !data.destinationNode) {
this.logger.debug(
`Execution ID ${executionId} had triggerToStartFrom. Starting from that trigger.`,
{ executionId },
);
const startNodes = data.startNodes.map((data) => {
const node = workflow.getNode(data.name);
a.ok(node, `Could not find a node named "${data.name}" in the workflow.`);
return node;
});
const runData = { [data.triggerToStartFrom.name]: [data.triggerToStartFrom.data] };
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(
filterDisabledNodes(DirectedGraph.fromWorkflow(workflow)),
new Set(startNodes),
runData,
data.pinData ?? {},
);
const executionData: IRunExecutionData = {
resultData: { runData, pinData },
executionData: {
contextData: {},
metadata: {},
nodeExecutionStack,
waitingExecution,
waitingExecutionSource,
},
};
const workflowExecute = new WorkflowExecute(additionalData, 'manual', executionData);
workflowExecution = workflowExecute.processRunExecutionData(workflow);
} else if (
data.runData === undefined ||
data.startNodes === undefined ||
data.startNodes.length === 0
) {
// Full Execution
// TODO: When the old partial execution logic is removed this block can
// be removed and the previous one can be merged into
// `workflowExecute.runPartialWorkflow2`.
// Partial executions then require either a destination node from which
// everything else can be derived, or a triggerToStartFrom with
// triggerData.
this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, {
executionId,
});
// Execute all nodes
const startNode = WorkflowHelpers.getExecutionStartNode(data, workflow);
// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.run(
workflow,
startNode,
data.destinationNode,
data.pinData,
);
} else {
// Partial Execution
this.logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId });
// Execute only the nodes between start and destination nodes
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
if (data.partialExecutionVersion === '1') {
workflowExecution = workflowExecute.runPartialWorkflow2(
workflow,
data.runData,
data.pinData,
data.dirtyNodeNames,
data.destinationNode,
);
} else {
workflowExecution = workflowExecute.runPartialWorkflow(
workflow,
data.runData,
data.startNodes,
data.destinationNode,
data.pinData,
);
}
workflowExecution = this.runManually(data, workflow, additionalData, executionId, pinData);
}
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
@ -539,4 +460,92 @@ export class WorkflowRunner {
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
}
// eslint-disable-next-line @typescript-eslint/promise-function-async
runManually(
data: IWorkflowExecutionDataProcess,
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData,
executionId: string,
pinData?: IPinData,
) {
if (data.triggerToStartFrom?.data && data.startNodes && !data.destinationNode) {
this.logger.debug(
`Execution ID ${executionId} had triggerToStartFrom. Starting from that trigger.`,
{ executionId },
);
const startNodes = data.startNodes.map((data) => {
const node = workflow.getNode(data.name);
a.ok(node, `Could not find a node named "${data.name}" in the workflow.`);
return node;
});
const runData = { [data.triggerToStartFrom.name]: [data.triggerToStartFrom.data] };
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(
filterDisabledNodes(DirectedGraph.fromWorkflow(workflow)),
new Set(startNodes),
runData,
data.pinData ?? {},
);
const executionData: IRunExecutionData = {
resultData: { runData, pinData },
executionData: {
contextData: {},
metadata: {},
nodeExecutionStack,
waitingExecution,
waitingExecutionSource,
},
};
const workflowExecute = new WorkflowExecute(additionalData, 'manual', executionData);
return workflowExecute.processRunExecutionData(workflow);
} else if (
data.runData === undefined ||
data.startNodes === undefined ||
data.startNodes.length === 0
) {
// Full Execution
// TODO: When the old partial execution logic is removed this block can
// be removed and the previous one can be merged into
// `workflowExecute.runPartialWorkflow2`.
// Partial executions then require either a destination node from which
// everything else can be derived, or a triggerToStartFrom with
// triggerData.
this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, {
executionId,
});
// Execute all nodes
const startNode = WorkflowHelpers.getExecutionStartNode(data, workflow);
// Can execute without webhook so go on
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
return workflowExecute.run(workflow, startNode, data.destinationNode, data.pinData);
} else {
// Partial Execution
this.logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId });
// Execute only the nodes between start and destination nodes
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
if (data.partialExecutionVersion === '1') {
return workflowExecute.runPartialWorkflow2(
workflow,
data.runData,
data.pinData,
data.dirtyNodeNames,
data.destinationNode,
);
} else {
return workflowExecute.runPartialWorkflow(
workflow,
data.runData,
data.startNodes,
data.destinationNode,
data.pinData,
);
}
}
}
}