From cb7c4d29a6f042b590822e5b9c67fff0a8f0863d Mon Sep 17 00:00:00 2001 From: Danny Martini Date: Mon, 28 Oct 2024 10:16:19 +0100 Subject: [PATCH] feat(core): Handle nodes with multiple inputs and connections during partial executions (#11376) --- .../__tests__/getSourceDataGroups.test.ts | 352 ++++++++++- .../recreateNodeExecutionStack.test.ts | 582 ++++++++++++++++-- .../PartialExecutionUtils/getIncomingData.ts | 23 + .../getSourceDataGroups.ts | 87 ++- .../recreateNodeExecutionStack.ts | 191 +++--- packages/core/src/WorkflowExecute.ts | 2 +- 6 files changed, 1075 insertions(+), 162 deletions(-) diff --git a/packages/core/src/PartialExecutionUtils/__tests__/getSourceDataGroups.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/getSourceDataGroups.test.ts index dffbe310d1..d8c3485d65 100644 --- a/packages/core/src/PartialExecutionUtils/__tests__/getSourceDataGroups.test.ts +++ b/packages/core/src/PartialExecutionUtils/__tests__/getSourceDataGroups.test.ts @@ -52,15 +52,15 @@ describe('getSourceDataGroups', () => { expect(groups).toHaveLength(2); const group1 = groups[0]; - expect(group1).toHaveLength(2); - expect(group1[0]).toEqual({ + expect(group1.connections).toHaveLength(2); + expect(group1.connections[0]).toEqual({ from: source1, outputIndex: 0, type: NodeConnectionType.Main, inputIndex: 0, to: node, }); - expect(group1[1]).toEqual({ + expect(group1.connections[1]).toEqual({ from: source3, outputIndex: 0, type: NodeConnectionType.Main, @@ -69,8 +69,8 @@ describe('getSourceDataGroups', () => { }); const group2 = groups[1]; - expect(group2).toHaveLength(1); - expect(group2[0]).toEqual({ + expect(group2.connections).toHaveLength(1); + expect(group2.connections[0]).toEqual({ from: source2, outputIndex: 0, type: NodeConnectionType.Main, @@ -116,15 +116,15 @@ describe('getSourceDataGroups', () => { expect(groups).toHaveLength(2); const group1 = groups[0]; - expect(group1).toHaveLength(2); - expect(group1[0]).toEqual({ + expect(group1.connections).toHaveLength(2); + expect(group1.connections[0]).toEqual({ from: source1, outputIndex: 0, type: NodeConnectionType.Main, inputIndex: 0, to: node, }); - expect(group1[1]).toEqual({ + expect(group1.connections[1]).toEqual({ from: source3, outputIndex: 0, type: NodeConnectionType.Main, @@ -133,8 +133,8 @@ describe('getSourceDataGroups', () => { }); const group2 = groups[1]; - expect(group2).toHaveLength(1); - expect(group2[0]).toEqual({ + expect(group2.connections).toHaveLength(1); + expect(group2.connections[0]).toEqual({ from: source2, outputIndex: 0, type: NodeConnectionType.Main, @@ -152,7 +152,7 @@ describe('getSourceDataGroups', () => { //┌───────┐1 │ └────┘ //│source3├────┘ //└───────┘ - it('groups sources into possibly complete sets if all of them have data', () => { + it('groups sources into one complete set with 2 connections and one incomplete set with 1 connection', () => { // ARRANGE const source1 = createNodeData({ name: 'source1' }); const source2 = createNodeData({ name: 'source2' }); @@ -176,23 +176,341 @@ describe('getSourceDataGroups', () => { const groups = getSourceDataGroups(graph, node, runData, pinnedData); // ASSERT - expect(groups).toHaveLength(1); + const completeGroups = groups.filter((g) => g.complete); + { + expect(completeGroups).toHaveLength(1); + const group1 = completeGroups[0]; + expect(group1.connections).toHaveLength(2); + expect(group1.connections[0]).toEqual({ + from: source2, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 0, + to: node, + }); + expect(group1.connections[1]).toEqual({ + from: source3, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 1, + to: node, + }); + } - const group1 = groups[0]; - expect(group1).toHaveLength(2); - expect(group1[0]).toEqual({ - from: source2, + const incompleteGroups = groups.filter((g) => !g.complete); + { + expect(incompleteGroups).toHaveLength(1); + const group1 = incompleteGroups[0]; + expect(group1.connections).toHaveLength(1); + expect(group1.connections[0]).toEqual({ + from: source1, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 0, + to: node, + }); + } + }); + + //┌───────┐0 + //│source1├───────┐ + //└───────┘ │ + // │ + //┌───────┐1 │ + //│source2├───────┤ ┌────┐ + //└───────┘ └────► │ + // │node│ + //┌───────┐1 ┌────► │ + //│source3├───────┤ └────┘ + //└───────┘ │ + // │ + //┌───────┐0 │ + //│source4├───────┘ + //└───────┘ + it('groups sources into one complete set with 2 connections and one incomplete set with 2 connection', () => { + // ARRANGE + const source1 = createNodeData({ name: 'source1' }); + const source2 = createNodeData({ name: 'source2' }); + const source3 = createNodeData({ name: 'source3' }); + const source4 = createNodeData({ name: 'source4' }); + const node = createNodeData({ name: 'node' }); + + const graph = new DirectedGraph() + .addNodes(source1, source2, source3, source4, node) + .addConnections( + { from: source1, to: node, inputIndex: 0 }, + { from: source2, to: node, inputIndex: 0 }, + { from: source3, to: node, inputIndex: 1 }, + { from: source4, to: node, inputIndex: 1 }, + ); + const runData: IRunData = { + [source2.name]: [toITaskData([{ data: { value: 1 } }])], + [source3.name]: [toITaskData([{ data: { value: 1 } }])], + }; + const pinnedData: IPinData = {}; + + // ACT + const groups = getSourceDataGroups(graph, node, runData, pinnedData); + + // ASSERT + const completeGroups = groups.filter((g) => g.complete); + { + expect(completeGroups).toHaveLength(1); + const group1 = completeGroups[0]; + expect(group1.connections).toHaveLength(2); + expect(group1.connections[0]).toEqual({ + from: source2, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 0, + to: node, + }); + expect(group1.connections[1]).toEqual({ + from: source3, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 1, + to: node, + }); + } + + const incompleteGroups = groups.filter((g) => !g.complete); + { + expect(incompleteGroups).toHaveLength(1); + const group1 = incompleteGroups[0]; + expect(group1.connections).toHaveLength(2); + expect(group1.connections[0]).toEqual({ + from: source1, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 0, + to: node, + }); + expect(group1.connections[1]).toEqual({ + from: source4, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 1, + to: node, + }); + } + }); + + // ┌───────┐1 + // │source1├───────┐ + // └───────┘ │ + // │ + // ┌───────┐0 │ + // │source2├───────┤ ┌────┐ + // └───────┘ └────► │ + // │node│ + // ┌───────┐0 ┌────► │ + // │source3├───────┘ └────┘ + // └───────┘ + it('groups sources into two incomplete sets, one with 1 connection without and one with 2 connections one with data and one without', () => { + // ARRANGE + const source1 = createNodeData({ name: 'source1' }); + const source2 = createNodeData({ name: 'source2' }); + const source3 = createNodeData({ name: 'source3' }); + const node = createNodeData({ name: 'node' }); + + const graph = new DirectedGraph() + .addNodes(source1, source2, source3, node) + .addConnections( + { from: source1, to: node, inputIndex: 0 }, + { from: source2, to: node, inputIndex: 0 }, + { from: source3, to: node, inputIndex: 1 }, + ); + const runData: IRunData = { + [source1.name]: [toITaskData([{ data: { node: 'source1' } }])], + }; + const pinnedData: IPinData = {}; + + // ACT + const groups = getSourceDataGroups(graph, node, runData, pinnedData); + + // ASSERT + const completeGroups = groups.filter((g) => g.complete); + expect(completeGroups).toHaveLength(0); + + const incompleteGroups = groups.filter((g) => !g.complete); + expect(incompleteGroups).toHaveLength(2); + + const group1 = incompleteGroups[0]; + expect(group1.connections).toHaveLength(2); + expect(group1.connections[0]).toEqual({ + from: source1, outputIndex: 0, type: NodeConnectionType.Main, inputIndex: 0, to: node, }); - expect(group1[1]).toEqual({ + expect(group1.connections[1]).toEqual({ from: source3, outputIndex: 0, type: NodeConnectionType.Main, inputIndex: 1, to: node, }); + + const group2 = incompleteGroups[1]; + expect(group2.connections).toHaveLength(1); + expect(group2.connections[0]).toEqual({ + from: source2, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 0, + to: node, + }); + }); + + // ┌─────┐1 ►► + // ┌─►│Node1┼──┐ ┌─────┐ + // ┌───────┐1│ └─────┘ └──►│ │ + // │Trigger├─┤ │Node3│ + // └───────┘ │ ┌─────┐0 ┌──►│ │ + // └─►│Node2├──┘ └─────┘ + // └─────┘ + test('return an incomplete group when there is no data on input 2', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + const node3 = createNodeData({ name: 'node3' }); + const graph = new DirectedGraph() + .addNodes(trigger, node1, node2, node3) + .addConnections( + { from: trigger, to: node1 }, + { from: trigger, to: node2 }, + { from: node1, to: node3, inputIndex: 0 }, + { from: node2, to: node3, inputIndex: 1 }, + ); + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { nodeName: 'trigger' } }])], + [node1.name]: [toITaskData([{ data: { nodeName: 'node1' } }])], + }; + const pinData: IPinData = {}; + + // ACT + const groups = getSourceDataGroups(graph, node3, runData, pinData); + + // ASSERT + expect(groups).toHaveLength(1); + const group1 = groups[0]; + expect(group1.connections).toHaveLength(2); + expect(group1.complete).toEqual(false); + }); + + // ┌─────┐0 ►► + // ┌─►│Node1┼──┐ ┌─────┐ + // ┌───────┐1│ └─────┘ └──►│ │ + // │Trigger├─┤ │Node3│ + // └───────┘ │ ┌─────┐1 ┌──►│ │ + // └─►│Node2├──┘ └─────┘ + // └─────┘ + test('return an incomplete group when there is no data on input 1', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + const node3 = createNodeData({ name: 'node3' }); + const graph = new DirectedGraph() + .addNodes(trigger, node1, node2, node3) + .addConnections( + { from: trigger, to: node1 }, + { from: trigger, to: node2 }, + { from: node1, to: node3, inputIndex: 0 }, + { from: node2, to: node3, inputIndex: 1 }, + ); + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { nodeName: 'trigger' } }])], + [node2.name]: [toITaskData([{ data: { nodeName: 'node2' } }])], + }; + const pinData: IPinData = {}; + + // ACT + const groups = getSourceDataGroups(graph, node3, runData, pinData); + + // ASSERT + expect(groups).toHaveLength(1); + const group1 = groups[0]; + expect(group1.connections).toHaveLength(2); + expect(group1.complete).toEqual(false); + }); + + it('terminates with negative input indexes', () => { + // ARRANGE + const source1 = createNodeData({ name: 'source1' }); + const node = createNodeData({ name: 'node' }); + + const graph = new DirectedGraph() + .addNodes(source1, node) + .addConnections({ from: source1, to: node, inputIndex: -1 }); + const runData: IRunData = { + [source1.name]: [toITaskData([{ data: { node: source1.name } }])], + }; + const pinnedData: IPinData = {}; + + // ACT + const groups = getSourceDataGroups(graph, node, runData, pinnedData); + + // ASSERT + expect(groups).toHaveLength(1); + const group1 = groups[0]; + expect(group1.connections).toHaveLength(1); + expect(group1.connections[0]).toEqual({ + from: source1, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: -1, + to: node, + }); + }); + + it('terminates inputs with missing connections', () => { + // ARRANGE + const source1 = createNodeData({ name: 'source1' }); + const node = createNodeData({ name: 'node' }); + + const graph = new DirectedGraph() + .addNodes(source1, node) + .addConnections({ from: source1, to: node, inputIndex: 1 }); + const runData: IRunData = { + [source1.name]: [toITaskData([{ data: { node: source1.name } }])], + }; + const pinnedData: IPinData = {}; + + // ACT + const groups = getSourceDataGroups(graph, node, runData, pinnedData); + + // ASSERT + expect(groups).toHaveLength(1); + const group1 = groups[0]; + expect(group1.connections).toHaveLength(1); + expect(group1.connections[0]).toEqual({ + from: source1, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 1, + to: node, + }); + }); + + it('terminates if the graph has no connections', () => { + // ARRANGE + const source1 = createNodeData({ name: 'source1' }); + const node = createNodeData({ name: 'node' }); + + const graph = new DirectedGraph().addNodes(source1, node); + const runData: IRunData = { + [source1.name]: [toITaskData([{ data: { node: source1.name } }])], + }; + const pinnedData: IPinData = {}; + + // ACT + const groups = getSourceDataGroups(graph, node, runData, pinnedData); + + // ASSERT + expect(groups).toHaveLength(0); }); }); diff --git a/packages/core/src/PartialExecutionUtils/__tests__/recreateNodeExecutionStack.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/recreateNodeExecutionStack.test.ts index 8bae766912..b78b9df135 100644 --- a/packages/core/src/PartialExecutionUtils/__tests__/recreateNodeExecutionStack.test.ts +++ b/packages/core/src/PartialExecutionUtils/__tests__/recreateNodeExecutionStack.test.ts @@ -10,9 +10,19 @@ // PD denotes that the node has pinned data import { AssertionError } from 'assert'; -import { type IPinData, type IRunData } from 'n8n-workflow'; +import type { + INodeExecutionData, + ISourceData, + IWaitingForExecution, + IWaitingForExecutionSource, +} from 'n8n-workflow'; +import { NodeConnectionType, type IPinData, type IRunData } from 'n8n-workflow'; -import { recreateNodeExecutionStack } from '@/PartialExecutionUtils/recreateNodeExecutionStack'; +import { + addWaitingExecution, + addWaitingExecutionSource, + recreateNodeExecutionStack, +} from '@/PartialExecutionUtils/recreateNodeExecutionStack'; import { createNodeData, toITaskData } from './helpers'; import { DirectedGraph } from '../DirectedGraph'; @@ -41,7 +51,7 @@ describe('recreateNodeExecutionStack', () => { // ACT const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = - recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData); + recreateNodeExecutionStack(workflow, startNodes, runData, pinData); // ASSERT expect(nodeExecutionStack).toHaveLength(1); @@ -62,17 +72,8 @@ describe('recreateNodeExecutionStack', () => { }, }, ]); - - expect(waitingExecution).toEqual({ node: { '0': { main: [[{ json: { value: 1 } }]] } } }); - expect(waitingExecutionSource).toEqual({ - node: { - '0': { - main: [ - { previousNode: 'trigger', previousNodeOutput: undefined, previousNodeRun: undefined }, - ], - }, - }, - }); + expect(waitingExecution).toEqual({}); + expect(waitingExecutionSource).toEqual({}); }); // ►► @@ -93,7 +94,7 @@ describe('recreateNodeExecutionStack', () => { // ACT const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = - recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData); + recreateNodeExecutionStack(workflow, startNodes, runData, pinData); // ASSERT expect(nodeExecutionStack).toHaveLength(1); @@ -105,8 +106,8 @@ describe('recreateNodeExecutionStack', () => { }, ]); - expect(waitingExecution).toEqual({ node: { '0': { main: [null] } } }); - expect(waitingExecutionSource).toEqual({ node: { '0': { main: [null] } } }); + expect(waitingExecution).toEqual({}); + expect(waitingExecutionSource).toEqual({}); }); // PinData ►► @@ -129,7 +130,7 @@ describe('recreateNodeExecutionStack', () => { // ACT const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = - recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData); + recreateNodeExecutionStack(workflow, startNodes, runData, pinData); // ASSERT expect(nodeExecutionStack).toHaveLength(1); @@ -151,8 +152,8 @@ describe('recreateNodeExecutionStack', () => { }, ]); - expect(waitingExecution).toEqual({ node: { '0': { main: [null] } } }); - expect(waitingExecutionSource).toEqual({ node: { '0': { main: [null] } } }); + expect(waitingExecution).toEqual({}); + expect(waitingExecutionSource).toEqual({}); }); // XX ►► @@ -176,9 +177,9 @@ describe('recreateNodeExecutionStack', () => { const pinData = {}; // ACT & ASSERT - expect(() => - recreateNodeExecutionStack(graph, startNodes, node2, runData, pinData), - ).toThrowError(AssertionError); + expect(() => recreateNodeExecutionStack(graph, startNodes, runData, pinData)).toThrowError( + AssertionError, + ); }); // ►► @@ -214,10 +215,9 @@ describe('recreateNodeExecutionStack', () => { // ACT const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = - recreateNodeExecutionStack(graph, startNodes, node3, runData, pinData); + recreateNodeExecutionStack(graph, startNodes, runData, pinData); // ASSERT - expect(nodeExecutionStack).toEqual([ { data: { main: [[{ json: { value: 1 } }]] }, @@ -251,19 +251,8 @@ describe('recreateNodeExecutionStack', () => { }, ]); - expect(waitingExecution).toEqual({ - node3: { '0': { main: [[{ json: { value: 1 } }], [{ json: { value: 1 } }]] } }, - }); - expect(waitingExecutionSource).toEqual({ - node3: { - '0': { - main: [ - { previousNode: 'node1', previousNodeOutput: undefined, previousNodeRun: undefined }, - { previousNode: 'node2', previousNodeOutput: undefined, previousNodeRun: undefined }, - ], - }, - }, - }); + expect(waitingExecution).toEqual({}); + expect(waitingExecutionSource).toEqual({}); }); // ┌─────┐1 ►► @@ -299,7 +288,7 @@ describe('recreateNodeExecutionStack', () => { // ACT const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = - recreateNodeExecutionStack(graph, startNodes, node3, runData, pinData); + recreateNodeExecutionStack(graph, startNodes, runData, pinData); // ASSERT expect(nodeExecutionStack).toHaveLength(1); @@ -314,22 +303,515 @@ describe('recreateNodeExecutionStack', () => { }, }); - expect(waitingExecution).toEqual({ - node3: { - '0': { - main: [[{ json: { value: 1 } }]], + expect(waitingExecution).toEqual({}); + expect(waitingExecutionSource).toEqual({}); + }); + + // ┌─────┐ ┌─────┐ + // ┌──►node1┼────┬──────► │ + // │ └─────┘ │ │merge│ + // │ │ ┌───► │ + // ├─────────────┘ │ └─────┘ + // │ │ + //┌───────┐ │ ┌─────┐ │ + //│trigger├───┴────►node2├─────┘ + //└───────┘ └─────┘ + describe('multiple inputs', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + const merge = createNodeData({ name: 'merge' }); + const graph = new DirectedGraph() + .addNodes(trigger, node1, node2, merge) + .addConnections( + { from: trigger, to: node1 }, + { from: trigger, to: node2 }, + { from: trigger, to: merge, inputIndex: 0 }, + { from: node1, to: merge, inputIndex: 0 }, + { from: node2, to: merge, inputIndex: 1 }, + ); + + test('only the trigger has run data', () => { + // ARRANGE + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { node: 'trigger' } }])], + }; + const pinData: IPinData = {}; + const startNodes = new Set([node1, node2, merge]); + + // ACT + const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + recreateNodeExecutionStack(graph, startNodes, runData, pinData); + + // ASSERT + expect(nodeExecutionStack).toHaveLength(2); + expect(nodeExecutionStack[0]).toEqual({ + node: node1, + data: { main: [[{ json: { node: 'trigger' } }]] }, + source: { main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }] }, + }); + expect(nodeExecutionStack[1]).toEqual({ + node: node2, + data: { main: [[{ json: { node: 'trigger' } }]] }, + source: { main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }] }, + }); + + expect(waitingExecution).toEqual({ + [merge.name]: { + '0': { + main: [[{ json: { node: 'trigger' } }]], + }, }, - }, + }); + expect(waitingExecutionSource).toEqual({ + [merge.name]: { + '0': { + main: [ + { + previousNode: 'trigger', + previousNodeOutput: 0, + previousNodeRun: 0, + }, + ], + }, + }, + }); }); - expect(waitingExecutionSource).toEqual({ - node3: { - '0': { + + test('the trigger and node1 have run data', () => { + // ARRANGE + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { node: 'trigger' } }])], + [node1.name]: [toITaskData([{ data: { node: 'node1' } }])], + }; + const pinData: IPinData = {}; + const startNodes = new Set([node2, merge]); + + // ACT + const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + recreateNodeExecutionStack(graph, startNodes, runData, pinData); + + // ASSERT + expect(nodeExecutionStack).toHaveLength(2); + expect(nodeExecutionStack[0]).toEqual({ + node: node2, + data: { main: [[{ json: { node: 'trigger' } }]] }, + source: { main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }] }, + }); + expect(nodeExecutionStack[1]).toEqual({ + node: merge, + data: { main: [[{ json: { node: 'trigger' } }]] }, + source: { + main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }], + }, + }); + + expect(waitingExecution).toEqual({ + [merge.name]: { + '0': { + main: [[{ json: { node: 'node1' } }]], + }, + }, + }); + expect(waitingExecutionSource).toEqual({ + [merge.name]: { + '0': { + main: [ + { + previousNode: 'node1', + previousNodeOutput: 0, + previousNodeRun: 0, + }, + ], + }, + }, + }); + }); + + test('the trigger and node2 have run data', () => { + // ARRANGE + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { node: 'trigger' } }])], + [node2.name]: [toITaskData([{ data: { node: 'node2' } }])], + }; + const pinData: IPinData = {}; + const startNodes = new Set([node1, merge]); + + // ACT + const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + recreateNodeExecutionStack(graph, startNodes, runData, pinData); + + // ASSERT + expect(nodeExecutionStack).toHaveLength(2); + expect(nodeExecutionStack[0]).toEqual({ + node: node1, + data: { main: [[{ json: { node: 'trigger' } }]] }, + source: { main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }] }, + }); + expect(nodeExecutionStack[1]).toEqual({ + node: merge, + data: { main: [[{ json: { node: 'trigger' } }], [{ json: { node: 'node2' } }]] }, + source: { main: [ - { previousNode: 'node1', previousNodeOutput: undefined, previousNodeRun: undefined }, - { previousNode: 'node2', previousNodeOutput: 1, previousNodeRun: undefined }, + { previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }, + { previousNode: 'node2', previousNodeOutput: 0, previousNodeRun: 0 }, ], }, - }, + }); + + expect(waitingExecution).toEqual({}); + expect(waitingExecutionSource).toEqual({}); + }); + + test('the trigger, node1 and node2 have run data', () => { + // ARRANGE + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { node: 'trigger' } }])], + [node1.name]: [toITaskData([{ data: { node: 'node1' } }])], + [node2.name]: [toITaskData([{ data: { node: 'node2' } }])], + }; + const pinData: IPinData = {}; + const startNodes = new Set([merge]); + + // ACT + const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + recreateNodeExecutionStack(graph, startNodes, runData, pinData); + + // ASSERT + expect(nodeExecutionStack).toHaveLength(2); + expect(nodeExecutionStack[0]).toEqual({ + node: merge, + data: { main: [[{ json: { node: 'node1' } }], [{ json: { node: 'node2' } }]] }, + source: { + main: [ + { previousNode: 'node1', previousNodeOutput: 0, previousNodeRun: 0 }, + { previousNode: 'node2', previousNodeOutput: 0, previousNodeRun: 0 }, + ], + }, + }); + expect(nodeExecutionStack[1]).toEqual({ + node: merge, + data: { main: [[{ json: { node: 'trigger' } }]] }, + source: { + main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }], + }, + }); + + expect(waitingExecution).toEqual({}); + expect(waitingExecutionSource).toEqual({}); }); }); }); + +describe('addWaitingExecution', () => { + test('allow adding data partially', () => { + const waitingExecution: IWaitingForExecution = {}; + const nodeName1 = 'node 1'; + const nodeName2 = 'node 2'; + const executionData: INodeExecutionData[] = [{ json: { item: 1 } }, { json: { item: 2 } }]; + + // adding the data for the second input index first + { + addWaitingExecution( + waitingExecution, + nodeName1, + 1, // runIndex + NodeConnectionType.Main, + 1, // inputIndex + executionData, + ); + expect(waitingExecution).toEqual({ + [nodeName1]: { + // runIndex + 1: { + [NodeConnectionType.Main]: [undefined, executionData], + }, + }, + }); + } + + // adding the data for the first input + { + addWaitingExecution( + waitingExecution, + nodeName1, + 1, // runIndex + NodeConnectionType.Main, + 0, // inputIndex + executionData, + ); + expect(waitingExecution).toEqual({ + [nodeName1]: { + // runIndex + 1: { + [NodeConnectionType.Main]: [executionData, executionData], + }, + }, + }); + } + + // adding data for another node connection type + { + addWaitingExecution( + waitingExecution, + nodeName1, + 1, // runIndex + NodeConnectionType.AiMemory, + 0, // inputIndex + executionData, + ); + expect(waitingExecution).toEqual({ + [nodeName1]: { + // runIndex + 1: { + [NodeConnectionType.Main]: [executionData, executionData], + [NodeConnectionType.AiMemory]: [executionData], + }, + }, + }); + } + + // adding data for another run + { + addWaitingExecution( + waitingExecution, + nodeName1, + 0, // runIndex + NodeConnectionType.AiChain, + 0, // inputIndex + executionData, + ); + expect(waitingExecution).toEqual({ + [nodeName1]: { + // runIndex + 0: { + [NodeConnectionType.AiChain]: [executionData], + }, + 1: { + [NodeConnectionType.Main]: [executionData, executionData], + [NodeConnectionType.AiMemory]: [executionData], + }, + }, + }); + } + + // adding data for another node + { + addWaitingExecution( + waitingExecution, + nodeName2, + 0, // runIndex + NodeConnectionType.Main, + 2, // inputIndex + executionData, + ); + expect(waitingExecution).toEqual({ + [nodeName1]: { + // runIndex + 0: { + [NodeConnectionType.AiChain]: [executionData], + }, + 1: { + [NodeConnectionType.Main]: [executionData, executionData], + [NodeConnectionType.AiMemory]: [executionData], + }, + }, + [nodeName2]: { + // runIndex + 0: { + [NodeConnectionType.Main]: [undefined, undefined, executionData], + }, + }, + }); + } + + // allow adding null + { + addWaitingExecution( + waitingExecution, + nodeName2, + 0, // runIndex + NodeConnectionType.Main, + 0, // inputIndex + null, + ); + expect(waitingExecution).toEqual({ + [nodeName2]: { + // runIndex + 0: { + [NodeConnectionType.Main]: [null, undefined, executionData], + }, + }, + [nodeName1]: { + // runIndex + 0: { + [NodeConnectionType.AiChain]: [executionData], + }, + 1: { + [NodeConnectionType.Main]: [executionData, executionData], + [NodeConnectionType.AiMemory]: [executionData], + }, + }, + }); + } + }); +}); + +describe('addWaitingExecutionSource', () => { + test('allow adding data partially', () => { + const waitingExecutionSource: IWaitingForExecutionSource = {}; + const nodeName1 = 'node 1'; + const nodeName2 = 'node 2'; + const sourceData: ISourceData = { + previousNode: 'node 0', + previousNodeRun: 0, + previousNodeOutput: 0, + }; + + // adding the data for the second input index first + { + addWaitingExecutionSource( + waitingExecutionSource, + nodeName1, + 1, // runIndex + NodeConnectionType.Main, + 1, // inputIndex + sourceData, + ); + expect(waitingExecutionSource).toEqual({ + [nodeName1]: { + // runIndex + 1: { + [NodeConnectionType.Main]: [undefined, sourceData], + }, + }, + }); + } + + // adding the data for the first input + { + addWaitingExecutionSource( + waitingExecutionSource, + nodeName1, + 1, // runIndex + NodeConnectionType.Main, + 0, // inputIndex + sourceData, + ); + expect(waitingExecutionSource).toEqual({ + [nodeName1]: { + // runIndex + 1: { + [NodeConnectionType.Main]: [sourceData, sourceData], + }, + }, + }); + } + + // adding data for another node connection type + { + addWaitingExecutionSource( + waitingExecutionSource, + nodeName1, + 1, // runIndex + NodeConnectionType.AiMemory, + 0, // inputIndex + sourceData, + ); + expect(waitingExecutionSource).toEqual({ + [nodeName1]: { + // runIndex + 1: { + [NodeConnectionType.Main]: [sourceData, sourceData], + [NodeConnectionType.AiMemory]: [sourceData], + }, + }, + }); + } + + // adding data for another run + { + addWaitingExecutionSource( + waitingExecutionSource, + nodeName1, + 0, // runIndex + NodeConnectionType.AiChain, + 0, // inputIndex + sourceData, + ); + expect(waitingExecutionSource).toEqual({ + [nodeName1]: { + // runIndex + 0: { + [NodeConnectionType.AiChain]: [sourceData], + }, + 1: { + [NodeConnectionType.Main]: [sourceData, sourceData], + [NodeConnectionType.AiMemory]: [sourceData], + }, + }, + }); + } + + // adding data for another node + { + addWaitingExecutionSource( + waitingExecutionSource, + nodeName2, + 0, // runIndex + NodeConnectionType.Main, + 2, // inputIndex + sourceData, + ); + expect(waitingExecutionSource).toEqual({ + [nodeName1]: { + // runIndex + 0: { + [NodeConnectionType.AiChain]: [sourceData], + }, + 1: { + [NodeConnectionType.Main]: [sourceData, sourceData], + [NodeConnectionType.AiMemory]: [sourceData], + }, + }, + [nodeName2]: { + // runIndex + 0: { + [NodeConnectionType.Main]: [undefined, undefined, sourceData], + }, + }, + }); + } + + // allow adding null + { + addWaitingExecutionSource( + waitingExecutionSource, + nodeName2, + 0, // runIndex + NodeConnectionType.Main, + 0, // inputIndex + null, + ); + expect(waitingExecutionSource).toEqual({ + [nodeName1]: { + // runIndex + 0: { + [NodeConnectionType.AiChain]: [sourceData], + }, + 1: { + [NodeConnectionType.Main]: [sourceData, sourceData], + [NodeConnectionType.AiMemory]: [sourceData], + }, + }, + [nodeName2]: { + // runIndex + 0: { + [NodeConnectionType.Main]: [null, undefined, sourceData], + }, + }, + }); + } + }); +}); diff --git a/packages/core/src/PartialExecutionUtils/getIncomingData.ts b/packages/core/src/PartialExecutionUtils/getIncomingData.ts index 2f5f22cd35..acac8ad22d 100644 --- a/packages/core/src/PartialExecutionUtils/getIncomingData.ts +++ b/packages/core/src/PartialExecutionUtils/getIncomingData.ts @@ -20,3 +20,26 @@ export function getIncomingData( return runData[nodeName][runIndex].data[connectionType][outputIndex]; } + +function getRunIndexLength(runData: IRunData, nodeName: string) { + return runData[nodeName]?.length ?? 0; +} + +export function getIncomingDataFromAnyRun( + runData: IRunData, + nodeName: string, + connectionType: NodeConnectionType, + outputIndex: number, +): { data: INodeExecutionData[]; runIndex: number } | undefined { + const maxRunIndexes = getRunIndexLength(runData, nodeName); + + for (let runIndex = 0; runIndex < maxRunIndexes; runIndex++) { + const data = getIncomingData(runData, nodeName, runIndex, connectionType, outputIndex); + + if (data && data.length > 0) { + return { data, runIndex }; + } + } + + return undefined; +} diff --git a/packages/core/src/PartialExecutionUtils/getSourceDataGroups.ts b/packages/core/src/PartialExecutionUtils/getSourceDataGroups.ts index 58f8f2f745..d9a9940816 100644 --- a/packages/core/src/PartialExecutionUtils/getSourceDataGroups.ts +++ b/packages/core/src/PartialExecutionUtils/getSourceDataGroups.ts @@ -13,6 +13,25 @@ function sortByInputIndexThenByName( } } +type SourceConnectionGroup = { + /** + * This is true if all connections have data. If any connection does not have + * data it false. + * + * This is interesting to decide if a node should be put on the execution + * stack of the waiting stack in the execution engine. + */ + complete: boolean; + connections: GraphConnection[]; +}; + +function newGroup(): SourceConnectionGroup { + return { + complete: true, + connections: [], + }; +} + /** * Groups incoming connections to the node. The groups contain one connection * per input, if possible, with run data or pinned data. @@ -58,55 +77,87 @@ function sortByInputIndexThenByName( * * Since `source1` has no run data and no pinned data it's skipped in favor of * `source2` for the for input. + * It will become it's own group that is marked as `complete: false` * - * So this will return 1 group: - * 1. source2 and source3 + * So this will return 2 group: + * 1. source2 and source3, `complete: true` + * 2. source1, `complete: false` */ export function getSourceDataGroups( graph: DirectedGraph, node: INode, runData: IRunData, pinnedData: IPinData, -): GraphConnection[][] { +): SourceConnectionGroup[] { const connections = graph.getConnections({ to: node }); const sortedConnectionsWithData = []; + const sortedConnectionsWithoutData = []; for (const connection of connections) { const hasData = runData[connection.from.name] || pinnedData[connection.from.name]; if (hasData) { sortedConnectionsWithData.push(connection); + } else { + sortedConnectionsWithoutData.push(connection); } } + if (sortedConnectionsWithData.length === 0 && sortedConnectionsWithoutData.length === 0) { + return []; + } + sortedConnectionsWithData.sort(sortByInputIndexThenByName); + sortedConnectionsWithoutData.sort(sortByInputIndexThenByName); - const groups: GraphConnection[][] = []; - let currentGroup: GraphConnection[] = []; - let currentInputIndex = -1; + const groups: SourceConnectionGroup[] = []; + let currentGroup = newGroup(); + let currentInputIndex = + Math.min( + ...sortedConnectionsWithData.map((c) => c.inputIndex), + ...sortedConnectionsWithoutData.map((c) => c.inputIndex), + ) - 1; + + while (sortedConnectionsWithData.length > 0 || sortedConnectionsWithoutData.length > 0) { + currentInputIndex++; - while (sortedConnectionsWithData.length > 0) { const connectionWithDataIndex = sortedConnectionsWithData.findIndex( // eslint-disable-next-line @typescript-eslint/no-loop-func - (c) => c.inputIndex > currentInputIndex, + (c) => c.inputIndex === currentInputIndex, ); - const connection: GraphConnection | undefined = - sortedConnectionsWithData[connectionWithDataIndex]; - if (connection === undefined) { - groups.push(currentGroup); - currentGroup = []; - currentInputIndex = -1; + if (connectionWithDataIndex >= 0) { + const connection = sortedConnectionsWithData[connectionWithDataIndex]; + + currentGroup.connections.push(connection); + + sortedConnectionsWithData.splice(connectionWithDataIndex, 1); continue; } - currentInputIndex = connection.inputIndex; - currentGroup.push(connection); + const connectionWithoutDataIndex = sortedConnectionsWithoutData.findIndex( + // eslint-disable-next-line @typescript-eslint/no-loop-func + (c) => c.inputIndex === currentInputIndex, + ); - if (connectionWithDataIndex >= 0) { - sortedConnectionsWithData.splice(connectionWithDataIndex, 1); + if (connectionWithoutDataIndex >= 0) { + const connection = sortedConnectionsWithoutData[connectionWithoutDataIndex]; + + currentGroup.connections.push(connection); + currentGroup.complete = false; + + sortedConnectionsWithoutData.splice(connectionWithoutDataIndex, 1); + continue; } + + groups.push(currentGroup); + currentGroup = newGroup(); + currentInputIndex = + Math.min( + ...sortedConnectionsWithData.map((c) => c.inputIndex), + ...sortedConnectionsWithoutData.map((c) => c.inputIndex), + ) - 1; } groups.push(currentGroup); diff --git a/packages/core/src/PartialExecutionUtils/recreateNodeExecutionStack.ts b/packages/core/src/PartialExecutionUtils/recreateNodeExecutionStack.ts index 534969f960..542d4b8fbd 100644 --- a/packages/core/src/PartialExecutionUtils/recreateNodeExecutionStack.ts +++ b/packages/core/src/PartialExecutionUtils/recreateNodeExecutionStack.ts @@ -13,9 +13,47 @@ import { } from 'n8n-workflow'; import type { DirectedGraph } from './DirectedGraph'; -import { getIncomingData } from './getIncomingData'; +import { getIncomingDataFromAnyRun } from './getIncomingData'; import { getSourceDataGroups } from './getSourceDataGroups'; +export function addWaitingExecution( + waitingExecution: IWaitingForExecution, + nodeName: string, + runIndex: number, + inputType: NodeConnectionType, + inputIndex: number, + executionData: INodeExecutionData[] | null, +) { + const waitingExecutionObject = waitingExecution[nodeName] ?? {}; + const taskDataConnections = waitingExecutionObject[runIndex] ?? {}; + const executionDataList = taskDataConnections[inputType] ?? []; + + executionDataList[inputIndex] = executionData; + + taskDataConnections[inputType] = executionDataList; + waitingExecutionObject[runIndex] = taskDataConnections; + waitingExecution[nodeName] = waitingExecutionObject; +} + +export function addWaitingExecutionSource( + waitingExecutionSource: IWaitingForExecutionSource, + nodeName: string, + runIndex: number, + inputType: NodeConnectionType, + inputIndex: number, + sourceData: ISourceData | null, +) { + const waitingExecutionSourceObject = waitingExecutionSource[nodeName] ?? {}; + const taskDataConnectionsSource = waitingExecutionSourceObject[runIndex] ?? {}; + const sourceDataList = taskDataConnectionsSource[inputType] ?? []; + + sourceDataList[inputIndex] = sourceData; + + taskDataConnectionsSource[inputType] = sourceDataList; + waitingExecutionSourceObject[runIndex] = taskDataConnectionsSource; + waitingExecutionSource[nodeName] = waitingExecutionSourceObject; +} + /** * Recreates the node execution stack, waiting executions and waiting * execution sources from a directed graph, start nodes, the destination node, @@ -33,7 +71,6 @@ import { getSourceDataGroups } from './getSourceDataGroups'; export function recreateNodeExecutionStack( graph: DirectedGraph, startNodes: Set, - destinationNode: INode, runData: IRunData, pinData: IPinData, ): { @@ -59,9 +96,6 @@ export function recreateNodeExecutionStack( const waitingExecution: IWaitingForExecution = {}; const waitingExecutionSource: IWaitingForExecutionSource = {}; - // TODO: Don't hard code this! - const runIndex = 0; - for (const startNode of startNodes) { const incomingStartNodeConnections = graph .getDirectParentConnections(startNode) @@ -84,89 +118,94 @@ export function recreateNodeExecutionStack( const sourceDataSets = getSourceDataGroups(graph, startNode, runData, pinData); for (const sourceData of sourceDataSets) { - incomingData = []; + if (sourceData.complete) { + // All incoming connections have data, so let's put the node on the + // stack! + incomingData = []; - incomingSourceData = { main: [] }; + incomingSourceData = { main: [] }; - for (const incomingConnection of sourceData) { - const node = incomingConnection.from; + for (const incomingConnection of sourceData.connections) { + let runIndex = 0; + const sourceNode = incomingConnection.from; - if (pinData[node.name]) { - incomingData.push(pinData[node.name]); - } else { - a.ok( - runData[node.name], - `Start node(${incomingConnection.to.name}) has an incoming connection with no run or pinned data. This is not supported. The connection in question is "${node.name}->${startNode.name}". Are you sure the start nodes come from the "findStartNodes" function?`, - ); + if (pinData[sourceNode.name]) { + incomingData.push(pinData[sourceNode.name]); + } else { + a.ok( + runData[sourceNode.name], + `Start node(${incomingConnection.to.name}) has an incoming connection with no run or pinned data. This is not supported. The connection in question is "${sourceNode.name}->${startNode.name}". Are you sure the start nodes come from the "findStartNodes" function?`, + ); - const nodeIncomingData = getIncomingData( + const nodeIncomingData = getIncomingDataFromAnyRun( + runData, + sourceNode.name, + incomingConnection.type, + incomingConnection.outputIndex, + ); + + if (nodeIncomingData) { + runIndex = nodeIncomingData.runIndex; + incomingData.push(nodeIncomingData.data); + } + } + + incomingSourceData.main.push({ + previousNode: incomingConnection.from.name, + previousNodeOutput: incomingConnection.outputIndex, + previousNodeRun: runIndex, + }); + } + + const executeData: IExecuteData = { + node: startNode, + data: { main: incomingData }, + source: incomingSourceData, + }; + + nodeExecutionStack.push(executeData); + } else { + const nodeName = startNode.name; + const nextRunIndex = waitingExecution[nodeName] + ? Object.keys(waitingExecution[nodeName]).length + : 0; + + for (const incomingConnection of sourceData.connections) { + const sourceNode = incomingConnection.from; + const maybeNodeIncomingData = getIncomingDataFromAnyRun( runData, - node.name, - runIndex, + sourceNode.name, incomingConnection.type, incomingConnection.outputIndex, ); + const nodeIncomingData = maybeNodeIncomingData?.data ?? null; if (nodeIncomingData) { - incomingData.push(nodeIncomingData); + addWaitingExecution( + waitingExecution, + nodeName, + nextRunIndex, + incomingConnection.type, + incomingConnection.inputIndex, + nodeIncomingData, + ); + + addWaitingExecutionSource( + waitingExecutionSource, + nodeName, + nextRunIndex, + incomingConnection.type, + incomingConnection.inputIndex, + nodeIncomingData + ? { + previousNode: incomingConnection.from.name, + previousNodeRun: nextRunIndex, + previousNodeOutput: incomingConnection.outputIndex, + } + : null, + ); } } - - incomingSourceData.main.push({ - previousNode: incomingConnection.from.name, - previousNodeOutput: incomingConnection.outputIndex, - previousNodeRun: 0, - }); - } - - const executeData: IExecuteData = { - node: startNode, - data: { main: incomingData }, - source: incomingSourceData, - }; - - nodeExecutionStack.push(executeData); - } - } - - // TODO: Do we need this? - if (destinationNode) { - const destinationNodeName = destinationNode.name; - // Check if the destinationNode has to be added as waiting - // because some input data is already fully available - const incomingDestinationNodeConnections = graph - .getDirectParentConnections(destinationNode) - .filter((c) => c.type === NodeConnectionType.Main); - if (incomingDestinationNodeConnections !== undefined) { - for (const connection of incomingDestinationNodeConnections) { - if (waitingExecution[destinationNodeName] === undefined) { - waitingExecution[destinationNodeName] = {}; - waitingExecutionSource[destinationNodeName] = {}; - } - if (waitingExecution[destinationNodeName][runIndex] === undefined) { - waitingExecution[destinationNodeName][runIndex] = {}; - waitingExecutionSource[destinationNodeName][runIndex] = {}; - } - if (waitingExecution[destinationNodeName][runIndex][connection.type] === undefined) { - waitingExecution[destinationNodeName][runIndex][connection.type] = []; - waitingExecutionSource[destinationNodeName][runIndex][connection.type] = []; - } - - if (runData[connection.from.name] !== undefined) { - // Input data exists so add as waiting - // incomingDataDestination.push(runData[connection.node!][runIndex].data![connection.type][connection.index]); - waitingExecution[destinationNodeName][runIndex][connection.type].push( - runData[connection.from.name][runIndex].data![connection.type][connection.inputIndex], - ); - waitingExecutionSource[destinationNodeName][runIndex][connection.type].push({ - previousNode: connection.from.name, - previousNodeOutput: connection.inputIndex || undefined, - previousNodeRun: runIndex || undefined, - } as ISourceData); - } else { - waitingExecution[destinationNodeName][runIndex][connection.type].push(null); - waitingExecutionSource[destinationNodeName][runIndex][connection.type].push(null); - } } } } diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 1d9aee76c6..23b88abd5b 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -363,7 +363,7 @@ export class WorkflowExecute { // 7. Recreate Execution Stack const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = - recreateNodeExecutionStack(subgraph, startNodes, destination, runData, pinData ?? {}); + recreateNodeExecutionStack(subgraph, new Set(startNodes), runData, pinData ?? {}); // 8. Execute this.status = 'running';