feat(core): Handle nodes with multiple inputs and connections during partial executions (#11376)

This commit is contained in:
Danny Martini 2024-10-28 10:16:19 +01:00 committed by GitHub
parent ae37c520a9
commit cb7c4d29a6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 1075 additions and 162 deletions

View file

@ -52,15 +52,15 @@ describe('getSourceDataGroups', () => {
expect(groups).toHaveLength(2);
const group1 = groups[0];
expect(group1).toHaveLength(2);
expect(group1[0]).toEqual({
expect(group1.connections).toHaveLength(2);
expect(group1.connections[0]).toEqual({
from: source1,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
expect(group1[1]).toEqual({
expect(group1.connections[1]).toEqual({
from: source3,
outputIndex: 0,
type: NodeConnectionType.Main,
@ -69,8 +69,8 @@ describe('getSourceDataGroups', () => {
});
const group2 = groups[1];
expect(group2).toHaveLength(1);
expect(group2[0]).toEqual({
expect(group2.connections).toHaveLength(1);
expect(group2.connections[0]).toEqual({
from: source2,
outputIndex: 0,
type: NodeConnectionType.Main,
@ -116,15 +116,15 @@ describe('getSourceDataGroups', () => {
expect(groups).toHaveLength(2);
const group1 = groups[0];
expect(group1).toHaveLength(2);
expect(group1[0]).toEqual({
expect(group1.connections).toHaveLength(2);
expect(group1.connections[0]).toEqual({
from: source1,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
expect(group1[1]).toEqual({
expect(group1.connections[1]).toEqual({
from: source3,
outputIndex: 0,
type: NodeConnectionType.Main,
@ -133,8 +133,8 @@ describe('getSourceDataGroups', () => {
});
const group2 = groups[1];
expect(group2).toHaveLength(1);
expect(group2[0]).toEqual({
expect(group2.connections).toHaveLength(1);
expect(group2.connections[0]).toEqual({
from: source2,
outputIndex: 0,
type: NodeConnectionType.Main,
@ -152,7 +152,7 @@ describe('getSourceDataGroups', () => {
//┌───────┐1 │ └────┘
//│source3├────┘
//└───────┘
it('groups sources into possibly complete sets if all of them have data', () => {
it('groups sources into one complete set with 2 connections and one incomplete set with 1 connection', () => {
// ARRANGE
const source1 = createNodeData({ name: 'source1' });
const source2 = createNodeData({ name: 'source2' });
@ -176,23 +176,341 @@ describe('getSourceDataGroups', () => {
const groups = getSourceDataGroups(graph, node, runData, pinnedData);
// ASSERT
expect(groups).toHaveLength(1);
const completeGroups = groups.filter((g) => g.complete);
{
expect(completeGroups).toHaveLength(1);
const group1 = completeGroups[0];
expect(group1.connections).toHaveLength(2);
expect(group1.connections[0]).toEqual({
from: source2,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
expect(group1.connections[1]).toEqual({
from: source3,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 1,
to: node,
});
}
const group1 = groups[0];
expect(group1).toHaveLength(2);
expect(group1[0]).toEqual({
from: source2,
const incompleteGroups = groups.filter((g) => !g.complete);
{
expect(incompleteGroups).toHaveLength(1);
const group1 = incompleteGroups[0];
expect(group1.connections).toHaveLength(1);
expect(group1.connections[0]).toEqual({
from: source1,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
}
});
//┌───────┐0
//│source1├───────┐
//└───────┘ │
// │
//┌───────┐1 │
//│source2├───────┤ ┌────┐
//└───────┘ └────► │
// │node│
//┌───────┐1 ┌────► │
//│source3├───────┤ └────┘
//└───────┘ │
// │
//┌───────┐0 │
//│source4├───────┘
//└───────┘
it('groups sources into one complete set with 2 connections and one incomplete set with 2 connection', () => {
// ARRANGE
const source1 = createNodeData({ name: 'source1' });
const source2 = createNodeData({ name: 'source2' });
const source3 = createNodeData({ name: 'source3' });
const source4 = createNodeData({ name: 'source4' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(source1, source2, source3, source4, node)
.addConnections(
{ from: source1, to: node, inputIndex: 0 },
{ from: source2, to: node, inputIndex: 0 },
{ from: source3, to: node, inputIndex: 1 },
{ from: source4, to: node, inputIndex: 1 },
);
const runData: IRunData = {
[source2.name]: [toITaskData([{ data: { value: 1 } }])],
[source3.name]: [toITaskData([{ data: { value: 1 } }])],
};
const pinnedData: IPinData = {};
// ACT
const groups = getSourceDataGroups(graph, node, runData, pinnedData);
// ASSERT
const completeGroups = groups.filter((g) => g.complete);
{
expect(completeGroups).toHaveLength(1);
const group1 = completeGroups[0];
expect(group1.connections).toHaveLength(2);
expect(group1.connections[0]).toEqual({
from: source2,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
expect(group1.connections[1]).toEqual({
from: source3,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 1,
to: node,
});
}
const incompleteGroups = groups.filter((g) => !g.complete);
{
expect(incompleteGroups).toHaveLength(1);
const group1 = incompleteGroups[0];
expect(group1.connections).toHaveLength(2);
expect(group1.connections[0]).toEqual({
from: source1,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
expect(group1.connections[1]).toEqual({
from: source4,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 1,
to: node,
});
}
});
// ┌───────┐1
// │source1├───────┐
// └───────┘ │
// │
// ┌───────┐0 │
// │source2├───────┤ ┌────┐
// └───────┘ └────► │
// │node│
// ┌───────┐0 ┌────► │
// │source3├───────┘ └────┘
// └───────┘
it('groups sources into two incomplete sets, one with 1 connection without and one with 2 connections one with data and one without', () => {
// ARRANGE
const source1 = createNodeData({ name: 'source1' });
const source2 = createNodeData({ name: 'source2' });
const source3 = createNodeData({ name: 'source3' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(source1, source2, source3, node)
.addConnections(
{ from: source1, to: node, inputIndex: 0 },
{ from: source2, to: node, inputIndex: 0 },
{ from: source3, to: node, inputIndex: 1 },
);
const runData: IRunData = {
[source1.name]: [toITaskData([{ data: { node: 'source1' } }])],
};
const pinnedData: IPinData = {};
// ACT
const groups = getSourceDataGroups(graph, node, runData, pinnedData);
// ASSERT
const completeGroups = groups.filter((g) => g.complete);
expect(completeGroups).toHaveLength(0);
const incompleteGroups = groups.filter((g) => !g.complete);
expect(incompleteGroups).toHaveLength(2);
const group1 = incompleteGroups[0];
expect(group1.connections).toHaveLength(2);
expect(group1.connections[0]).toEqual({
from: source1,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
expect(group1[1]).toEqual({
expect(group1.connections[1]).toEqual({
from: source3,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 1,
to: node,
});
const group2 = incompleteGroups[1];
expect(group2.connections).toHaveLength(1);
expect(group2.connections[0]).toEqual({
from: source2,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
});
// ┌─────┐1 ►►
// ┌─►│Node1┼──┐ ┌─────┐
// ┌───────┐1│ └─────┘ └──►│ │
// │Trigger├─┤ │Node3│
// └───────┘ │ ┌─────┐0 ┌──►│ │
// └─►│Node2├──┘ └─────┘
// └─────┘
test('return an incomplete group when there is no data on input 2', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const node3 = createNodeData({ name: 'node3' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2, node3)
.addConnections(
{ from: trigger, to: node1 },
{ from: trigger, to: node2 },
{ from: node1, to: node3, inputIndex: 0 },
{ from: node2, to: node3, inputIndex: 1 },
);
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { nodeName: 'trigger' } }])],
[node1.name]: [toITaskData([{ data: { nodeName: 'node1' } }])],
};
const pinData: IPinData = {};
// ACT
const groups = getSourceDataGroups(graph, node3, runData, pinData);
// ASSERT
expect(groups).toHaveLength(1);
const group1 = groups[0];
expect(group1.connections).toHaveLength(2);
expect(group1.complete).toEqual(false);
});
// ┌─────┐0 ►►
// ┌─►│Node1┼──┐ ┌─────┐
// ┌───────┐1│ └─────┘ └──►│ │
// │Trigger├─┤ │Node3│
// └───────┘ │ ┌─────┐1 ┌──►│ │
// └─►│Node2├──┘ └─────┘
// └─────┘
test('return an incomplete group when there is no data on input 1', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const node3 = createNodeData({ name: 'node3' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2, node3)
.addConnections(
{ from: trigger, to: node1 },
{ from: trigger, to: node2 },
{ from: node1, to: node3, inputIndex: 0 },
{ from: node2, to: node3, inputIndex: 1 },
);
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { nodeName: 'trigger' } }])],
[node2.name]: [toITaskData([{ data: { nodeName: 'node2' } }])],
};
const pinData: IPinData = {};
// ACT
const groups = getSourceDataGroups(graph, node3, runData, pinData);
// ASSERT
expect(groups).toHaveLength(1);
const group1 = groups[0];
expect(group1.connections).toHaveLength(2);
expect(group1.complete).toEqual(false);
});
it('terminates with negative input indexes', () => {
// ARRANGE
const source1 = createNodeData({ name: 'source1' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(source1, node)
.addConnections({ from: source1, to: node, inputIndex: -1 });
const runData: IRunData = {
[source1.name]: [toITaskData([{ data: { node: source1.name } }])],
};
const pinnedData: IPinData = {};
// ACT
const groups = getSourceDataGroups(graph, node, runData, pinnedData);
// ASSERT
expect(groups).toHaveLength(1);
const group1 = groups[0];
expect(group1.connections).toHaveLength(1);
expect(group1.connections[0]).toEqual({
from: source1,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: -1,
to: node,
});
});
it('terminates inputs with missing connections', () => {
// ARRANGE
const source1 = createNodeData({ name: 'source1' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(source1, node)
.addConnections({ from: source1, to: node, inputIndex: 1 });
const runData: IRunData = {
[source1.name]: [toITaskData([{ data: { node: source1.name } }])],
};
const pinnedData: IPinData = {};
// ACT
const groups = getSourceDataGroups(graph, node, runData, pinnedData);
// ASSERT
expect(groups).toHaveLength(1);
const group1 = groups[0];
expect(group1.connections).toHaveLength(1);
expect(group1.connections[0]).toEqual({
from: source1,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 1,
to: node,
});
});
it('terminates if the graph has no connections', () => {
// ARRANGE
const source1 = createNodeData({ name: 'source1' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph().addNodes(source1, node);
const runData: IRunData = {
[source1.name]: [toITaskData([{ data: { node: source1.name } }])],
};
const pinnedData: IPinData = {};
// ACT
const groups = getSourceDataGroups(graph, node, runData, pinnedData);
// ASSERT
expect(groups).toHaveLength(0);
});
});

View file

@ -10,9 +10,19 @@
// PD denotes that the node has pinned data
import { AssertionError } from 'assert';
import { type IPinData, type IRunData } from 'n8n-workflow';
import type {
INodeExecutionData,
ISourceData,
IWaitingForExecution,
IWaitingForExecutionSource,
} from 'n8n-workflow';
import { NodeConnectionType, type IPinData, type IRunData } from 'n8n-workflow';
import { recreateNodeExecutionStack } from '@/PartialExecutionUtils/recreateNodeExecutionStack';
import {
addWaitingExecution,
addWaitingExecutionSource,
recreateNodeExecutionStack,
} from '@/PartialExecutionUtils/recreateNodeExecutionStack';
import { createNodeData, toITaskData } from './helpers';
import { DirectedGraph } from '../DirectedGraph';
@ -41,7 +51,7 @@ describe('recreateNodeExecutionStack', () => {
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData);
recreateNodeExecutionStack(workflow, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(1);
@ -62,17 +72,8 @@ describe('recreateNodeExecutionStack', () => {
},
},
]);
expect(waitingExecution).toEqual({ node: { '0': { main: [[{ json: { value: 1 } }]] } } });
expect(waitingExecutionSource).toEqual({
node: {
'0': {
main: [
{ previousNode: 'trigger', previousNodeOutput: undefined, previousNodeRun: undefined },
],
},
},
});
expect(waitingExecution).toEqual({});
expect(waitingExecutionSource).toEqual({});
});
// ►►
@ -93,7 +94,7 @@ describe('recreateNodeExecutionStack', () => {
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData);
recreateNodeExecutionStack(workflow, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(1);
@ -105,8 +106,8 @@ describe('recreateNodeExecutionStack', () => {
},
]);
expect(waitingExecution).toEqual({ node: { '0': { main: [null] } } });
expect(waitingExecutionSource).toEqual({ node: { '0': { main: [null] } } });
expect(waitingExecution).toEqual({});
expect(waitingExecutionSource).toEqual({});
});
// PinData ►►
@ -129,7 +130,7 @@ describe('recreateNodeExecutionStack', () => {
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData);
recreateNodeExecutionStack(workflow, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(1);
@ -151,8 +152,8 @@ describe('recreateNodeExecutionStack', () => {
},
]);
expect(waitingExecution).toEqual({ node: { '0': { main: [null] } } });
expect(waitingExecutionSource).toEqual({ node: { '0': { main: [null] } } });
expect(waitingExecution).toEqual({});
expect(waitingExecutionSource).toEqual({});
});
// XX ►►
@ -176,9 +177,9 @@ describe('recreateNodeExecutionStack', () => {
const pinData = {};
// ACT & ASSERT
expect(() =>
recreateNodeExecutionStack(graph, startNodes, node2, runData, pinData),
).toThrowError(AssertionError);
expect(() => recreateNodeExecutionStack(graph, startNodes, runData, pinData)).toThrowError(
AssertionError,
);
});
// ►►
@ -214,10 +215,9 @@ describe('recreateNodeExecutionStack', () => {
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, startNodes, node3, runData, pinData);
recreateNodeExecutionStack(graph, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toEqual([
{
data: { main: [[{ json: { value: 1 } }]] },
@ -251,19 +251,8 @@ describe('recreateNodeExecutionStack', () => {
},
]);
expect(waitingExecution).toEqual({
node3: { '0': { main: [[{ json: { value: 1 } }], [{ json: { value: 1 } }]] } },
});
expect(waitingExecutionSource).toEqual({
node3: {
'0': {
main: [
{ previousNode: 'node1', previousNodeOutput: undefined, previousNodeRun: undefined },
{ previousNode: 'node2', previousNodeOutput: undefined, previousNodeRun: undefined },
],
},
},
});
expect(waitingExecution).toEqual({});
expect(waitingExecutionSource).toEqual({});
});
// ┌─────┐1 ►►
@ -299,7 +288,7 @@ describe('recreateNodeExecutionStack', () => {
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, startNodes, node3, runData, pinData);
recreateNodeExecutionStack(graph, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(1);
@ -314,22 +303,515 @@ describe('recreateNodeExecutionStack', () => {
},
});
expect(waitingExecution).toEqual({
node3: {
'0': {
main: [[{ json: { value: 1 } }]],
expect(waitingExecution).toEqual({});
expect(waitingExecutionSource).toEqual({});
});
// ┌─────┐ ┌─────┐
// ┌──►node1┼────┬──────► │
// │ └─────┘ │ │merge│
// │ │ ┌───► │
// ├─────────────┘ │ └─────┘
// │ │
//┌───────┐ │ ┌─────┐ │
//│trigger├───┴────►node2├─────┘
//└───────┘ └─────┘
describe('multiple inputs', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const merge = createNodeData({ name: 'merge' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2, merge)
.addConnections(
{ from: trigger, to: node1 },
{ from: trigger, to: node2 },
{ from: trigger, to: merge, inputIndex: 0 },
{ from: node1, to: merge, inputIndex: 0 },
{ from: node2, to: merge, inputIndex: 1 },
);
test('only the trigger has run data', () => {
// ARRANGE
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { node: 'trigger' } }])],
};
const pinData: IPinData = {};
const startNodes = new Set([node1, node2, merge]);
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(2);
expect(nodeExecutionStack[0]).toEqual({
node: node1,
data: { main: [[{ json: { node: 'trigger' } }]] },
source: { main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }] },
});
expect(nodeExecutionStack[1]).toEqual({
node: node2,
data: { main: [[{ json: { node: 'trigger' } }]] },
source: { main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }] },
});
expect(waitingExecution).toEqual({
[merge.name]: {
'0': {
main: [[{ json: { node: 'trigger' } }]],
},
},
},
});
expect(waitingExecutionSource).toEqual({
[merge.name]: {
'0': {
main: [
{
previousNode: 'trigger',
previousNodeOutput: 0,
previousNodeRun: 0,
},
],
},
},
});
});
expect(waitingExecutionSource).toEqual({
node3: {
'0': {
test('the trigger and node1 have run data', () => {
// ARRANGE
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { node: 'trigger' } }])],
[node1.name]: [toITaskData([{ data: { node: 'node1' } }])],
};
const pinData: IPinData = {};
const startNodes = new Set([node2, merge]);
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(2);
expect(nodeExecutionStack[0]).toEqual({
node: node2,
data: { main: [[{ json: { node: 'trigger' } }]] },
source: { main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }] },
});
expect(nodeExecutionStack[1]).toEqual({
node: merge,
data: { main: [[{ json: { node: 'trigger' } }]] },
source: {
main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }],
},
});
expect(waitingExecution).toEqual({
[merge.name]: {
'0': {
main: [[{ json: { node: 'node1' } }]],
},
},
});
expect(waitingExecutionSource).toEqual({
[merge.name]: {
'0': {
main: [
{
previousNode: 'node1',
previousNodeOutput: 0,
previousNodeRun: 0,
},
],
},
},
});
});
test('the trigger and node2 have run data', () => {
// ARRANGE
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { node: 'trigger' } }])],
[node2.name]: [toITaskData([{ data: { node: 'node2' } }])],
};
const pinData: IPinData = {};
const startNodes = new Set([node1, merge]);
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(2);
expect(nodeExecutionStack[0]).toEqual({
node: node1,
data: { main: [[{ json: { node: 'trigger' } }]] },
source: { main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }] },
});
expect(nodeExecutionStack[1]).toEqual({
node: merge,
data: { main: [[{ json: { node: 'trigger' } }], [{ json: { node: 'node2' } }]] },
source: {
main: [
{ previousNode: 'node1', previousNodeOutput: undefined, previousNodeRun: undefined },
{ previousNode: 'node2', previousNodeOutput: 1, previousNodeRun: undefined },
{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 },
{ previousNode: 'node2', previousNodeOutput: 0, previousNodeRun: 0 },
],
},
},
});
expect(waitingExecution).toEqual({});
expect(waitingExecutionSource).toEqual({});
});
test('the trigger, node1 and node2 have run data', () => {
// ARRANGE
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { node: 'trigger' } }])],
[node1.name]: [toITaskData([{ data: { node: 'node1' } }])],
[node2.name]: [toITaskData([{ data: { node: 'node2' } }])],
};
const pinData: IPinData = {};
const startNodes = new Set([merge]);
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(2);
expect(nodeExecutionStack[0]).toEqual({
node: merge,
data: { main: [[{ json: { node: 'node1' } }], [{ json: { node: 'node2' } }]] },
source: {
main: [
{ previousNode: 'node1', previousNodeOutput: 0, previousNodeRun: 0 },
{ previousNode: 'node2', previousNodeOutput: 0, previousNodeRun: 0 },
],
},
});
expect(nodeExecutionStack[1]).toEqual({
node: merge,
data: { main: [[{ json: { node: 'trigger' } }]] },
source: {
main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }],
},
});
expect(waitingExecution).toEqual({});
expect(waitingExecutionSource).toEqual({});
});
});
});
describe('addWaitingExecution', () => {
test('allow adding data partially', () => {
const waitingExecution: IWaitingForExecution = {};
const nodeName1 = 'node 1';
const nodeName2 = 'node 2';
const executionData: INodeExecutionData[] = [{ json: { item: 1 } }, { json: { item: 2 } }];
// adding the data for the second input index first
{
addWaitingExecution(
waitingExecution,
nodeName1,
1, // runIndex
NodeConnectionType.Main,
1, // inputIndex
executionData,
);
expect(waitingExecution).toEqual({
[nodeName1]: {
// runIndex
1: {
[NodeConnectionType.Main]: [undefined, executionData],
},
},
});
}
// adding the data for the first input
{
addWaitingExecution(
waitingExecution,
nodeName1,
1, // runIndex
NodeConnectionType.Main,
0, // inputIndex
executionData,
);
expect(waitingExecution).toEqual({
[nodeName1]: {
// runIndex
1: {
[NodeConnectionType.Main]: [executionData, executionData],
},
},
});
}
// adding data for another node connection type
{
addWaitingExecution(
waitingExecution,
nodeName1,
1, // runIndex
NodeConnectionType.AiMemory,
0, // inputIndex
executionData,
);
expect(waitingExecution).toEqual({
[nodeName1]: {
// runIndex
1: {
[NodeConnectionType.Main]: [executionData, executionData],
[NodeConnectionType.AiMemory]: [executionData],
},
},
});
}
// adding data for another run
{
addWaitingExecution(
waitingExecution,
nodeName1,
0, // runIndex
NodeConnectionType.AiChain,
0, // inputIndex
executionData,
);
expect(waitingExecution).toEqual({
[nodeName1]: {
// runIndex
0: {
[NodeConnectionType.AiChain]: [executionData],
},
1: {
[NodeConnectionType.Main]: [executionData, executionData],
[NodeConnectionType.AiMemory]: [executionData],
},
},
});
}
// adding data for another node
{
addWaitingExecution(
waitingExecution,
nodeName2,
0, // runIndex
NodeConnectionType.Main,
2, // inputIndex
executionData,
);
expect(waitingExecution).toEqual({
[nodeName1]: {
// runIndex
0: {
[NodeConnectionType.AiChain]: [executionData],
},
1: {
[NodeConnectionType.Main]: [executionData, executionData],
[NodeConnectionType.AiMemory]: [executionData],
},
},
[nodeName2]: {
// runIndex
0: {
[NodeConnectionType.Main]: [undefined, undefined, executionData],
},
},
});
}
// allow adding null
{
addWaitingExecution(
waitingExecution,
nodeName2,
0, // runIndex
NodeConnectionType.Main,
0, // inputIndex
null,
);
expect(waitingExecution).toEqual({
[nodeName2]: {
// runIndex
0: {
[NodeConnectionType.Main]: [null, undefined, executionData],
},
},
[nodeName1]: {
// runIndex
0: {
[NodeConnectionType.AiChain]: [executionData],
},
1: {
[NodeConnectionType.Main]: [executionData, executionData],
[NodeConnectionType.AiMemory]: [executionData],
},
},
});
}
});
});
describe('addWaitingExecutionSource', () => {
test('allow adding data partially', () => {
const waitingExecutionSource: IWaitingForExecutionSource = {};
const nodeName1 = 'node 1';
const nodeName2 = 'node 2';
const sourceData: ISourceData = {
previousNode: 'node 0',
previousNodeRun: 0,
previousNodeOutput: 0,
};
// adding the data for the second input index first
{
addWaitingExecutionSource(
waitingExecutionSource,
nodeName1,
1, // runIndex
NodeConnectionType.Main,
1, // inputIndex
sourceData,
);
expect(waitingExecutionSource).toEqual({
[nodeName1]: {
// runIndex
1: {
[NodeConnectionType.Main]: [undefined, sourceData],
},
},
});
}
// adding the data for the first input
{
addWaitingExecutionSource(
waitingExecutionSource,
nodeName1,
1, // runIndex
NodeConnectionType.Main,
0, // inputIndex
sourceData,
);
expect(waitingExecutionSource).toEqual({
[nodeName1]: {
// runIndex
1: {
[NodeConnectionType.Main]: [sourceData, sourceData],
},
},
});
}
// adding data for another node connection type
{
addWaitingExecutionSource(
waitingExecutionSource,
nodeName1,
1, // runIndex
NodeConnectionType.AiMemory,
0, // inputIndex
sourceData,
);
expect(waitingExecutionSource).toEqual({
[nodeName1]: {
// runIndex
1: {
[NodeConnectionType.Main]: [sourceData, sourceData],
[NodeConnectionType.AiMemory]: [sourceData],
},
},
});
}
// adding data for another run
{
addWaitingExecutionSource(
waitingExecutionSource,
nodeName1,
0, // runIndex
NodeConnectionType.AiChain,
0, // inputIndex
sourceData,
);
expect(waitingExecutionSource).toEqual({
[nodeName1]: {
// runIndex
0: {
[NodeConnectionType.AiChain]: [sourceData],
},
1: {
[NodeConnectionType.Main]: [sourceData, sourceData],
[NodeConnectionType.AiMemory]: [sourceData],
},
},
});
}
// adding data for another node
{
addWaitingExecutionSource(
waitingExecutionSource,
nodeName2,
0, // runIndex
NodeConnectionType.Main,
2, // inputIndex
sourceData,
);
expect(waitingExecutionSource).toEqual({
[nodeName1]: {
// runIndex
0: {
[NodeConnectionType.AiChain]: [sourceData],
},
1: {
[NodeConnectionType.Main]: [sourceData, sourceData],
[NodeConnectionType.AiMemory]: [sourceData],
},
},
[nodeName2]: {
// runIndex
0: {
[NodeConnectionType.Main]: [undefined, undefined, sourceData],
},
},
});
}
// allow adding null
{
addWaitingExecutionSource(
waitingExecutionSource,
nodeName2,
0, // runIndex
NodeConnectionType.Main,
0, // inputIndex
null,
);
expect(waitingExecutionSource).toEqual({
[nodeName1]: {
// runIndex
0: {
[NodeConnectionType.AiChain]: [sourceData],
},
1: {
[NodeConnectionType.Main]: [sourceData, sourceData],
[NodeConnectionType.AiMemory]: [sourceData],
},
},
[nodeName2]: {
// runIndex
0: {
[NodeConnectionType.Main]: [null, undefined, sourceData],
},
},
});
}
});
});

View file

@ -20,3 +20,26 @@ export function getIncomingData(
return runData[nodeName][runIndex].data[connectionType][outputIndex];
}
function getRunIndexLength(runData: IRunData, nodeName: string) {
return runData[nodeName]?.length ?? 0;
}
export function getIncomingDataFromAnyRun(
runData: IRunData,
nodeName: string,
connectionType: NodeConnectionType,
outputIndex: number,
): { data: INodeExecutionData[]; runIndex: number } | undefined {
const maxRunIndexes = getRunIndexLength(runData, nodeName);
for (let runIndex = 0; runIndex < maxRunIndexes; runIndex++) {
const data = getIncomingData(runData, nodeName, runIndex, connectionType, outputIndex);
if (data && data.length > 0) {
return { data, runIndex };
}
}
return undefined;
}

View file

@ -13,6 +13,25 @@ function sortByInputIndexThenByName(
}
}
type SourceConnectionGroup = {
/**
* This is true if all connections have data. If any connection does not have
* data it false.
*
* This is interesting to decide if a node should be put on the execution
* stack of the waiting stack in the execution engine.
*/
complete: boolean;
connections: GraphConnection[];
};
function newGroup(): SourceConnectionGroup {
return {
complete: true,
connections: [],
};
}
/**
* Groups incoming connections to the node. The groups contain one connection
* per input, if possible, with run data or pinned data.
@ -58,55 +77,87 @@ function sortByInputIndexThenByName(
*
* Since `source1` has no run data and no pinned data it's skipped in favor of
* `source2` for the for input.
* It will become it's own group that is marked as `complete: false`
*
* So this will return 1 group:
* 1. source2 and source3
* So this will return 2 group:
* 1. source2 and source3, `complete: true`
* 2. source1, `complete: false`
*/
export function getSourceDataGroups(
graph: DirectedGraph,
node: INode,
runData: IRunData,
pinnedData: IPinData,
): GraphConnection[][] {
): SourceConnectionGroup[] {
const connections = graph.getConnections({ to: node });
const sortedConnectionsWithData = [];
const sortedConnectionsWithoutData = [];
for (const connection of connections) {
const hasData = runData[connection.from.name] || pinnedData[connection.from.name];
if (hasData) {
sortedConnectionsWithData.push(connection);
} else {
sortedConnectionsWithoutData.push(connection);
}
}
if (sortedConnectionsWithData.length === 0 && sortedConnectionsWithoutData.length === 0) {
return [];
}
sortedConnectionsWithData.sort(sortByInputIndexThenByName);
sortedConnectionsWithoutData.sort(sortByInputIndexThenByName);
const groups: GraphConnection[][] = [];
let currentGroup: GraphConnection[] = [];
let currentInputIndex = -1;
const groups: SourceConnectionGroup[] = [];
let currentGroup = newGroup();
let currentInputIndex =
Math.min(
...sortedConnectionsWithData.map((c) => c.inputIndex),
...sortedConnectionsWithoutData.map((c) => c.inputIndex),
) - 1;
while (sortedConnectionsWithData.length > 0 || sortedConnectionsWithoutData.length > 0) {
currentInputIndex++;
while (sortedConnectionsWithData.length > 0) {
const connectionWithDataIndex = sortedConnectionsWithData.findIndex(
// eslint-disable-next-line @typescript-eslint/no-loop-func
(c) => c.inputIndex > currentInputIndex,
(c) => c.inputIndex === currentInputIndex,
);
const connection: GraphConnection | undefined =
sortedConnectionsWithData[connectionWithDataIndex];
if (connection === undefined) {
groups.push(currentGroup);
currentGroup = [];
currentInputIndex = -1;
if (connectionWithDataIndex >= 0) {
const connection = sortedConnectionsWithData[connectionWithDataIndex];
currentGroup.connections.push(connection);
sortedConnectionsWithData.splice(connectionWithDataIndex, 1);
continue;
}
currentInputIndex = connection.inputIndex;
currentGroup.push(connection);
const connectionWithoutDataIndex = sortedConnectionsWithoutData.findIndex(
// eslint-disable-next-line @typescript-eslint/no-loop-func
(c) => c.inputIndex === currentInputIndex,
);
if (connectionWithDataIndex >= 0) {
sortedConnectionsWithData.splice(connectionWithDataIndex, 1);
if (connectionWithoutDataIndex >= 0) {
const connection = sortedConnectionsWithoutData[connectionWithoutDataIndex];
currentGroup.connections.push(connection);
currentGroup.complete = false;
sortedConnectionsWithoutData.splice(connectionWithoutDataIndex, 1);
continue;
}
groups.push(currentGroup);
currentGroup = newGroup();
currentInputIndex =
Math.min(
...sortedConnectionsWithData.map((c) => c.inputIndex),
...sortedConnectionsWithoutData.map((c) => c.inputIndex),
) - 1;
}
groups.push(currentGroup);

View file

@ -13,9 +13,47 @@ import {
} from 'n8n-workflow';
import type { DirectedGraph } from './DirectedGraph';
import { getIncomingData } from './getIncomingData';
import { getIncomingDataFromAnyRun } from './getIncomingData';
import { getSourceDataGroups } from './getSourceDataGroups';
export function addWaitingExecution(
waitingExecution: IWaitingForExecution,
nodeName: string,
runIndex: number,
inputType: NodeConnectionType,
inputIndex: number,
executionData: INodeExecutionData[] | null,
) {
const waitingExecutionObject = waitingExecution[nodeName] ?? {};
const taskDataConnections = waitingExecutionObject[runIndex] ?? {};
const executionDataList = taskDataConnections[inputType] ?? [];
executionDataList[inputIndex] = executionData;
taskDataConnections[inputType] = executionDataList;
waitingExecutionObject[runIndex] = taskDataConnections;
waitingExecution[nodeName] = waitingExecutionObject;
}
export function addWaitingExecutionSource(
waitingExecutionSource: IWaitingForExecutionSource,
nodeName: string,
runIndex: number,
inputType: NodeConnectionType,
inputIndex: number,
sourceData: ISourceData | null,
) {
const waitingExecutionSourceObject = waitingExecutionSource[nodeName] ?? {};
const taskDataConnectionsSource = waitingExecutionSourceObject[runIndex] ?? {};
const sourceDataList = taskDataConnectionsSource[inputType] ?? [];
sourceDataList[inputIndex] = sourceData;
taskDataConnectionsSource[inputType] = sourceDataList;
waitingExecutionSourceObject[runIndex] = taskDataConnectionsSource;
waitingExecutionSource[nodeName] = waitingExecutionSourceObject;
}
/**
* Recreates the node execution stack, waiting executions and waiting
* execution sources from a directed graph, start nodes, the destination node,
@ -33,7 +71,6 @@ import { getSourceDataGroups } from './getSourceDataGroups';
export function recreateNodeExecutionStack(
graph: DirectedGraph,
startNodes: Set<INode>,
destinationNode: INode,
runData: IRunData,
pinData: IPinData,
): {
@ -59,9 +96,6 @@ export function recreateNodeExecutionStack(
const waitingExecution: IWaitingForExecution = {};
const waitingExecutionSource: IWaitingForExecutionSource = {};
// TODO: Don't hard code this!
const runIndex = 0;
for (const startNode of startNodes) {
const incomingStartNodeConnections = graph
.getDirectParentConnections(startNode)
@ -84,89 +118,94 @@ export function recreateNodeExecutionStack(
const sourceDataSets = getSourceDataGroups(graph, startNode, runData, pinData);
for (const sourceData of sourceDataSets) {
incomingData = [];
if (sourceData.complete) {
// All incoming connections have data, so let's put the node on the
// stack!
incomingData = [];
incomingSourceData = { main: [] };
incomingSourceData = { main: [] };
for (const incomingConnection of sourceData) {
const node = incomingConnection.from;
for (const incomingConnection of sourceData.connections) {
let runIndex = 0;
const sourceNode = incomingConnection.from;
if (pinData[node.name]) {
incomingData.push(pinData[node.name]);
} else {
a.ok(
runData[node.name],
`Start node(${incomingConnection.to.name}) has an incoming connection with no run or pinned data. This is not supported. The connection in question is "${node.name}->${startNode.name}". Are you sure the start nodes come from the "findStartNodes" function?`,
);
if (pinData[sourceNode.name]) {
incomingData.push(pinData[sourceNode.name]);
} else {
a.ok(
runData[sourceNode.name],
`Start node(${incomingConnection.to.name}) has an incoming connection with no run or pinned data. This is not supported. The connection in question is "${sourceNode.name}->${startNode.name}". Are you sure the start nodes come from the "findStartNodes" function?`,
);
const nodeIncomingData = getIncomingData(
const nodeIncomingData = getIncomingDataFromAnyRun(
runData,
sourceNode.name,
incomingConnection.type,
incomingConnection.outputIndex,
);
if (nodeIncomingData) {
runIndex = nodeIncomingData.runIndex;
incomingData.push(nodeIncomingData.data);
}
}
incomingSourceData.main.push({
previousNode: incomingConnection.from.name,
previousNodeOutput: incomingConnection.outputIndex,
previousNodeRun: runIndex,
});
}
const executeData: IExecuteData = {
node: startNode,
data: { main: incomingData },
source: incomingSourceData,
};
nodeExecutionStack.push(executeData);
} else {
const nodeName = startNode.name;
const nextRunIndex = waitingExecution[nodeName]
? Object.keys(waitingExecution[nodeName]).length
: 0;
for (const incomingConnection of sourceData.connections) {
const sourceNode = incomingConnection.from;
const maybeNodeIncomingData = getIncomingDataFromAnyRun(
runData,
node.name,
runIndex,
sourceNode.name,
incomingConnection.type,
incomingConnection.outputIndex,
);
const nodeIncomingData = maybeNodeIncomingData?.data ?? null;
if (nodeIncomingData) {
incomingData.push(nodeIncomingData);
addWaitingExecution(
waitingExecution,
nodeName,
nextRunIndex,
incomingConnection.type,
incomingConnection.inputIndex,
nodeIncomingData,
);
addWaitingExecutionSource(
waitingExecutionSource,
nodeName,
nextRunIndex,
incomingConnection.type,
incomingConnection.inputIndex,
nodeIncomingData
? {
previousNode: incomingConnection.from.name,
previousNodeRun: nextRunIndex,
previousNodeOutput: incomingConnection.outputIndex,
}
: null,
);
}
}
incomingSourceData.main.push({
previousNode: incomingConnection.from.name,
previousNodeOutput: incomingConnection.outputIndex,
previousNodeRun: 0,
});
}
const executeData: IExecuteData = {
node: startNode,
data: { main: incomingData },
source: incomingSourceData,
};
nodeExecutionStack.push(executeData);
}
}
// TODO: Do we need this?
if (destinationNode) {
const destinationNodeName = destinationNode.name;
// Check if the destinationNode has to be added as waiting
// because some input data is already fully available
const incomingDestinationNodeConnections = graph
.getDirectParentConnections(destinationNode)
.filter((c) => c.type === NodeConnectionType.Main);
if (incomingDestinationNodeConnections !== undefined) {
for (const connection of incomingDestinationNodeConnections) {
if (waitingExecution[destinationNodeName] === undefined) {
waitingExecution[destinationNodeName] = {};
waitingExecutionSource[destinationNodeName] = {};
}
if (waitingExecution[destinationNodeName][runIndex] === undefined) {
waitingExecution[destinationNodeName][runIndex] = {};
waitingExecutionSource[destinationNodeName][runIndex] = {};
}
if (waitingExecution[destinationNodeName][runIndex][connection.type] === undefined) {
waitingExecution[destinationNodeName][runIndex][connection.type] = [];
waitingExecutionSource[destinationNodeName][runIndex][connection.type] = [];
}
if (runData[connection.from.name] !== undefined) {
// Input data exists so add as waiting
// incomingDataDestination.push(runData[connection.node!][runIndex].data![connection.type][connection.index]);
waitingExecution[destinationNodeName][runIndex][connection.type].push(
runData[connection.from.name][runIndex].data![connection.type][connection.inputIndex],
);
waitingExecutionSource[destinationNodeName][runIndex][connection.type].push({
previousNode: connection.from.name,
previousNodeOutput: connection.inputIndex || undefined,
previousNodeRun: runIndex || undefined,
} as ISourceData);
} else {
waitingExecution[destinationNodeName][runIndex][connection.type].push(null);
waitingExecutionSource[destinationNodeName][runIndex][connection.type].push(null);
}
}
}
}

View file

@ -363,7 +363,7 @@ export class WorkflowExecute {
// 7. Recreate Execution Stack
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(subgraph, startNodes, destination, runData, pinData ?? {});
recreateNodeExecutionStack(subgraph, new Set(startNodes), runData, pinData ?? {});
// 8. Execute
this.status = 'running';