refactor(core): Extract more code out in WorkflowExecute and add unit tests (#13327)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2025-02-19 13:42:37 +01:00 committed by GitHub
parent 14b6f8b972
commit 60ff82f648
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 161 additions and 54 deletions

View file

@ -1353,4 +1353,113 @@ describe('WorkflowExecute', () => {
expect(result?.[1][0].pairedItem).toEqual({ item: 0 });
});
});
describe('ensureInputData', () => {
const node: INode = {
id: '1',
name: 'TestNode',
type: 'test.set',
typeVersion: 1,
position: [0, 0],
parameters: {},
};
const parentNode: INode = {
id: '2',
name: 'ParentNode',
type: 'test.set',
typeVersion: 1,
position: [0, 0],
parameters: {},
};
const runExecutionData = mock<IRunExecutionData>({
executionData: {
nodeExecutionStack: [],
},
});
const workflow = new Workflow({
id: 'test',
nodes: [],
connections: {},
active: false,
nodeTypes: mock(),
});
let executionData: IExecuteData;
let workflowExecute: WorkflowExecute;
beforeEach(() => {
executionData = {
node,
data: {},
source: null,
};
workflowExecute = new WorkflowExecute(mock(), 'manual', runExecutionData);
jest.resetAllMocks();
});
test('should return true when node has no input connections', () => {
workflow.nodes = {};
workflow.connectionsByDestinationNode = {};
const hasInputData = workflowExecute.ensureInputData(workflow, node, executionData);
expect(hasInputData).toBe(true);
});
test('should return false when execution data does not have main connection', () => {
workflow.nodes = {
[node.name]: node,
[parentNode.name]: parentNode,
};
workflow.connectionsByDestinationNode = {
[node.name]: {
main: [[{ node: parentNode.name, type: NodeConnectionType.Main, index: 0 }]],
},
};
const hasInputData = workflowExecute.ensureInputData(workflow, node, executionData);
expect(hasInputData).toBe(false);
expect(runExecutionData.executionData?.nodeExecutionStack).toContain(executionData);
});
test('should return true when input data is available for force input node execution', () => {
workflow.nodes = {
[node.name]: node,
[parentNode.name]: parentNode,
};
workflow.connectionsByDestinationNode = {
[node.name]: {
main: [[{ node: parentNode.name, type: NodeConnectionType.Main, index: 0 }]],
},
};
executionData.data = { main: [[{ json: { test: 'data' } }]] };
const hasInputData = workflowExecute.ensureInputData(workflow, node, executionData);
expect(hasInputData).toBe(true);
});
test('should return false when input data is not available for force input node execution', () => {
workflow.nodes = {
[node.name]: node,
[parentNode.name]: parentNode,
};
workflow.connectionsByDestinationNode = {
[node.name]: {
main: [[{ node: parentNode.name, type: NodeConnectionType.Main, index: 0 }]],
},
};
executionData.data = { main: [null] };
const hasInputData = workflowExecute.ensureInputData(workflow, node, executionData);
expect(hasInputData).toBe(false);
expect(runExecutionData.executionData?.nodeExecutionStack).toContain(executionData);
});
});
});

View file

@ -37,7 +37,6 @@ import type {
CloseFunction,
StartNodeData,
NodeExecutionHint,
NodeInputConnections,
IRunNodeResponse,
IWorkflowIssues,
INodeIssues,
@ -1375,61 +1374,12 @@ export class WorkflowExecute {
continue;
}
// Check if all the data which is needed to run the node is available
if (workflow.connectionsByDestinationNode.hasOwnProperty(executionNode.name)) {
// Check if the node has incoming connections
if (workflow.connectionsByDestinationNode[executionNode.name].hasOwnProperty('main')) {
let inputConnections: NodeInputConnections;
let connectionIndex: number;
// eslint-disable-next-line prefer-const
inputConnections = workflow.connectionsByDestinationNode[executionNode.name].main;
for (
connectionIndex = 0;
connectionIndex < inputConnections.length;
connectionIndex++
) {
if (
workflow.getHighestNode(
executionNode.name,
NodeConnectionType.Main,
connectionIndex,
).length === 0
) {
// If there is no valid incoming node (if all are disabled)
// then ignore that it has inputs and simply execute it as it is without
// any data
continue;
}
if (!executionData.data.hasOwnProperty('main')) {
// ExecutionData does not even have the connection set up so can
// not have that data, so add it again to be executed later
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
const hasInputData = this.ensureInputData(workflow, executionNode, executionData);
if (!hasInputData) {
lastExecutionTry = currentExecutionTry;
continue executionLoop;
}
if (forceInputNodeExecution) {
// Check if it has the data for all the inputs
// The most nodes just have one but merge node for example has two and data
// of both inputs has to be available to be able to process the node.
if (
executionData.data.main.length < connectionIndex ||
executionData.data.main[connectionIndex] === null
) {
// Does not have the data of the connections so add back to stack
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
lastExecutionTry = currentExecutionTry;
continue executionLoop;
}
}
}
}
}
startTime = new Date().getTime();
let maxTries = 1;
@ -2039,6 +1989,54 @@ export class WorkflowExecute {
});
}
/**
* This method determines if a specific node has incoming connections and verifies if execution data is available for all required inputs.
* If any required input data is missing, the node execution is deferred by pushing it back onto the execution stack.
*
* @param workflow - The workflow containing the node and connections.
* @param executionNode - The node being checked.
* @param executionData - The data available for executing the node.
* @returns `true` if the node has the required input data and can execute immediately, otherwise `false`.
*/
ensureInputData(workflow: Workflow, executionNode: INode, executionData: IExecuteData): boolean {
const inputConnections = workflow.connectionsByDestinationNode[executionNode.name]?.main ?? [];
for (let connectionIndex = 0; connectionIndex < inputConnections.length; connectionIndex++) {
const highestNodes = workflow.getHighestNode(
executionNode.name,
NodeConnectionType.Main,
connectionIndex,
);
if (highestNodes.length === 0) {
// If there is no valid incoming node (if all are disabled)
// then ignore that it has inputs and simply execute it as it is without
// any data
return true;
}
if (!executionData.data.hasOwnProperty('main')) {
// ExecutionData does not even have the connection set up so can
// not have that data, so add it again to be executed later
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
return false;
}
if (this.forceInputNodeExecution(workflow)) {
// Check if it has the data for all the inputs
// The most nodes just have one but merge node for example has two and data
// of both inputs has to be available to be able to process the node.
if (
executionData.data.main.length < connectionIndex ||
executionData.data.main[connectionIndex] === null
) {
// Does not have the data of the connections so add back to stack
this.runExecutionData.executionData!.nodeExecutionStack.push(executionData);
return false;
}
}
}
return true;
}
/**
* Processes the final state of a workflow execution and prepares the execution result.
* This method handles different completion scenarios: success, waiting, error, and canceled states.