diff --git a/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts b/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts index 75b4270d3d..432cc6e621 100644 --- a/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts +++ b/packages/core/src/execution-engine/__tests__/workflow-execute.test.ts @@ -1353,4 +1353,113 @@ describe('WorkflowExecute', () => { expect(result?.[1][0].pairedItem).toEqual({ item: 0 }); }); }); + + describe('ensureInputData', () => { + const node: INode = { + id: '1', + name: 'TestNode', + type: 'test.set', + typeVersion: 1, + position: [0, 0], + parameters: {}, + }; + const parentNode: INode = { + id: '2', + name: 'ParentNode', + type: 'test.set', + typeVersion: 1, + position: [0, 0], + parameters: {}, + }; + const runExecutionData = mock({ + executionData: { + nodeExecutionStack: [], + }, + }); + const workflow = new Workflow({ + id: 'test', + nodes: [], + connections: {}, + active: false, + nodeTypes: mock(), + }); + + let executionData: IExecuteData; + let workflowExecute: WorkflowExecute; + beforeEach(() => { + executionData = { + node, + data: {}, + source: null, + }; + workflowExecute = new WorkflowExecute(mock(), 'manual', runExecutionData); + jest.resetAllMocks(); + }); + + test('should return true when node has no input connections', () => { + workflow.nodes = {}; + workflow.connectionsByDestinationNode = {}; + + const hasInputData = workflowExecute.ensureInputData(workflow, node, executionData); + + expect(hasInputData).toBe(true); + }); + + test('should return false when execution data does not have main connection', () => { + workflow.nodes = { + [node.name]: node, + [parentNode.name]: parentNode, + }; + + workflow.connectionsByDestinationNode = { + [node.name]: { + main: [[{ node: parentNode.name, type: NodeConnectionType.Main, index: 0 }]], + }, + }; + + const hasInputData = workflowExecute.ensureInputData(workflow, node, executionData); + + expect(hasInputData).toBe(false); + expect(runExecutionData.executionData?.nodeExecutionStack).toContain(executionData); + }); + + test('should return true when input data is available for force input node execution', () => { + workflow.nodes = { + [node.name]: node, + [parentNode.name]: parentNode, + }; + + workflow.connectionsByDestinationNode = { + [node.name]: { + main: [[{ node: parentNode.name, type: NodeConnectionType.Main, index: 0 }]], + }, + }; + + executionData.data = { main: [[{ json: { test: 'data' } }]] }; + + const hasInputData = workflowExecute.ensureInputData(workflow, node, executionData); + + expect(hasInputData).toBe(true); + }); + + test('should return false when input data is not available for force input node execution', () => { + workflow.nodes = { + [node.name]: node, + [parentNode.name]: parentNode, + }; + + workflow.connectionsByDestinationNode = { + [node.name]: { + main: [[{ node: parentNode.name, type: NodeConnectionType.Main, index: 0 }]], + }, + }; + + executionData.data = { main: [null] }; + + const hasInputData = workflowExecute.ensureInputData(workflow, node, executionData); + + expect(hasInputData).toBe(false); + expect(runExecutionData.executionData?.nodeExecutionStack).toContain(executionData); + }); + }); }); diff --git a/packages/core/src/execution-engine/workflow-execute.ts b/packages/core/src/execution-engine/workflow-execute.ts index c51d302af0..eb42cc9473 100644 --- a/packages/core/src/execution-engine/workflow-execute.ts +++ b/packages/core/src/execution-engine/workflow-execute.ts @@ -37,7 +37,6 @@ import type { CloseFunction, StartNodeData, NodeExecutionHint, - NodeInputConnections, IRunNodeResponse, IWorkflowIssues, INodeIssues, @@ -1375,59 +1374,10 @@ export class WorkflowExecute { continue; } - // Check if all the data which is needed to run the node is available - if (workflow.connectionsByDestinationNode.hasOwnProperty(executionNode.name)) { - // Check if the node has incoming connections - if (workflow.connectionsByDestinationNode[executionNode.name].hasOwnProperty('main')) { - let inputConnections: NodeInputConnections; - let connectionIndex: number; - - // eslint-disable-next-line prefer-const - inputConnections = workflow.connectionsByDestinationNode[executionNode.name].main; - - for ( - connectionIndex = 0; - connectionIndex < inputConnections.length; - connectionIndex++ - ) { - if ( - workflow.getHighestNode( - executionNode.name, - NodeConnectionType.Main, - connectionIndex, - ).length === 0 - ) { - // If there is no valid incoming node (if all are disabled) - // then ignore that it has inputs and simply execute it as it is without - // any data - continue; - } - - if (!executionData.data.hasOwnProperty('main')) { - // ExecutionData does not even have the connection set up so can - // not have that data, so add it again to be executed later - this.runExecutionData.executionData!.nodeExecutionStack.push(executionData); - lastExecutionTry = currentExecutionTry; - continue executionLoop; - } - - if (forceInputNodeExecution) { - // Check if it has the data for all the inputs - // The most nodes just have one but merge node for example has two and data - // of both inputs has to be available to be able to process the node. - if ( - executionData.data.main.length < connectionIndex || - executionData.data.main[connectionIndex] === null - ) { - // Does not have the data of the connections so add back to stack - this.runExecutionData.executionData!.nodeExecutionStack.push(executionData); - lastExecutionTry = currentExecutionTry; - - continue executionLoop; - } - } - } - } + const hasInputData = this.ensureInputData(workflow, executionNode, executionData); + if (!hasInputData) { + lastExecutionTry = currentExecutionTry; + continue executionLoop; } startTime = new Date().getTime(); @@ -2039,6 +1989,54 @@ export class WorkflowExecute { }); } + /** + * This method determines if a specific node has incoming connections and verifies if execution data is available for all required inputs. + * If any required input data is missing, the node execution is deferred by pushing it back onto the execution stack. + * + * @param workflow - The workflow containing the node and connections. + * @param executionNode - The node being checked. + * @param executionData - The data available for executing the node. + * @returns `true` if the node has the required input data and can execute immediately, otherwise `false`. + */ + ensureInputData(workflow: Workflow, executionNode: INode, executionData: IExecuteData): boolean { + const inputConnections = workflow.connectionsByDestinationNode[executionNode.name]?.main ?? []; + for (let connectionIndex = 0; connectionIndex < inputConnections.length; connectionIndex++) { + const highestNodes = workflow.getHighestNode( + executionNode.name, + NodeConnectionType.Main, + connectionIndex, + ); + if (highestNodes.length === 0) { + // If there is no valid incoming node (if all are disabled) + // then ignore that it has inputs and simply execute it as it is without + // any data + return true; + } + + if (!executionData.data.hasOwnProperty('main')) { + // ExecutionData does not even have the connection set up so can + // not have that data, so add it again to be executed later + this.runExecutionData.executionData!.nodeExecutionStack.push(executionData); + return false; + } + + if (this.forceInputNodeExecution(workflow)) { + // Check if it has the data for all the inputs + // The most nodes just have one but merge node for example has two and data + // of both inputs has to be available to be able to process the node. + if ( + executionData.data.main.length < connectionIndex || + executionData.data.main[connectionIndex] === null + ) { + // Does not have the data of the connections so add back to stack + this.runExecutionData.executionData!.nodeExecutionStack.push(executionData); + return false; + } + } + } + return true; + } + /** * Processes the final state of a workflow execution and prepares the execution result. * This method handles different completion scenarios: success, waiting, error, and canceled states.