This commit is contained in:
Danny Martini 2024-09-10 15:04:08 +02:00
parent 2a084f96f8
commit e26f2b8494
No known key found for this signature in database
13 changed files with 893 additions and 135 deletions

View file

@ -0,0 +1,19 @@
# execution stack notation.md
ES = Execution Stack
WN = Waiting Nodes
WNS = Waiting Node Sources
(ES[], WN[], WNS[])
([t], [ ], [ ])
([n1, n2], [ ], [ ])
([n2], [n3], [n1])
([n3], [ ], [ ]) <--
([n4], [ ], [ ])
# start node notation
SN = Start Node
SNS = Start Node Source

View file

@ -33,7 +33,7 @@ const config = {
}, {}),
setupFilesAfterEnv: ['jest-expect-message'],
collectCoverage: process.env.COVERAGE_ENABLED === 'true',
coverageReporters: ['text-summary'],
coverageReporters: ['html'],
collectCoverageFrom: ['src/**/*.ts'],
};

View file

@ -9,7 +9,7 @@
// XX denotes that the node is disabled
// PD denotes that the node has pinned data
import { type IPinData, type IRunData } from 'n8n-workflow';
import { IRun, type IPinData, type IRunData } from 'n8n-workflow';
import { createNodeData, toITaskData } from './helpers';
import { findStartNodes, isDirty } from '../findStartNodes';
import { DirectedGraph } from '../DirectedGraph';
@ -369,4 +369,38 @@ describe('findStartNodes', () => {
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node2);
});
// ┌─────┐1 ►►
// ┌─►│Node1┼──┐ ┌─────┐
// ┌───────┐1│ └─────┘ └──►│ │
// │Trigger├─┤ │Node3│
// └───────┘ │ ┌─────┐0 ┌──►│ │
// └─►│Node2├──┘ └─────┘
// └─────┘
test('foo', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const node3 = createNodeData({ name: 'node3' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2, node3)
.addConnections(
{ from: trigger, to: node1 },
{ from: trigger, to: node2 },
{ from: node1, to: node3, inputIndex: 0 },
{ from: node2, to: node3, inputIndex: 1 },
);
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: {} }])],
[node1.name]: [toITaskData([{ data: {} }])],
};
// ACT
const startNodes = findStartNodes(graph, trigger, node3, runData);
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node2);
});
});

View file

@ -12,6 +12,7 @@ import { NodeConnectionType, type IRunData } from 'n8n-workflow';
import { DirectedGraph } from '../DirectedGraph';
import { createNodeData, toITaskData } from './helpers';
import { getSourceDataGroups } from '../getSourceDataGroups';
import { inspect } from 'util';
describe('getSourceDataGroups', () => {
//┌───────┐1
@ -51,15 +52,15 @@ describe('getSourceDataGroups', () => {
expect(groups).toHaveLength(2);
const group1 = groups[0];
expect(group1).toHaveLength(2);
expect(group1[0]).toEqual({
expect(group1.connections).toHaveLength(2);
expect(group1.connections[0]).toEqual({
from: source1,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
expect(group1[1]).toEqual({
expect(group1.connections[1]).toEqual({
from: source3,
outputIndex: 0,
type: NodeConnectionType.Main,
@ -68,8 +69,8 @@ describe('getSourceDataGroups', () => {
});
const group2 = groups[1];
expect(group2).toHaveLength(1);
expect(group2[0]).toEqual({
expect(group2.connections).toHaveLength(1);
expect(group2.connections[0]).toEqual({
from: source2,
outputIndex: 0,
type: NodeConnectionType.Main,
@ -115,15 +116,15 @@ describe('getSourceDataGroups', () => {
expect(groups).toHaveLength(2);
const group1 = groups[0];
expect(group1).toHaveLength(2);
expect(group1[0]).toEqual({
expect(group1.connections).toHaveLength(2);
expect(group1.connections[0]).toEqual({
from: source1,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
expect(group1[1]).toEqual({
expect(group1.connections[1]).toEqual({
from: source3,
outputIndex: 0,
type: NodeConnectionType.Main,
@ -132,8 +133,8 @@ describe('getSourceDataGroups', () => {
});
const group2 = groups[1];
expect(group2).toHaveLength(1);
expect(group2[0]).toEqual({
expect(group2.connections).toHaveLength(1);
expect(group2.connections[0]).toEqual({
from: source2,
outputIndex: 0,
type: NodeConnectionType.Main,
@ -178,15 +179,15 @@ describe('getSourceDataGroups', () => {
expect(groups).toHaveLength(1);
const group1 = groups[0];
expect(group1).toHaveLength(2);
expect(group1[0]).toEqual({
expect(group1.connections).toHaveLength(2);
expect(group1.connections[0]).toEqual({
from: source2,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
expect(group1[1]).toEqual({
expect(group1.connections[1]).toEqual({
from: source3,
outputIndex: 0,
type: NodeConnectionType.Main,
@ -194,4 +195,79 @@ describe('getSourceDataGroups', () => {
to: node,
});
});
// ┌─────┐1 ►►
// ┌─►│Node1┼──┐ ┌─────┐
// ┌───────┐1│ └─────┘ └──►│ │
// │Trigger├─┤ │Node3│
// └───────┘ │ ┌─────┐0 ┌──►│ │
// └─►│Node2├──┘ └─────┘
// └─────┘
test('foo', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const node3 = createNodeData({ name: 'node3' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2, node3)
.addConnections(
{ from: trigger, to: node1 },
{ from: trigger, to: node2 },
{ from: node1, to: node3, inputIndex: 0 },
{ from: node2, to: node3, inputIndex: 1 },
);
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { nodeName: 'trigger' } }])],
[node1.name]: [toITaskData([{ data: { nodeName: 'node1' } }])],
};
const pinData: IPinData = {};
// ACT
const groups = getSourceDataGroups(graph, node3, runData, pinData);
// ASSERT
expect(groups).toHaveLength(1);
const group1 = groups[0];
expect(group1.connections).toHaveLength(2);
expect(group1.complete).toEqual(false);
});
// ┌─────┐0 ►►
// ┌─►│Node1┼──┐ ┌─────┐
// ┌───────┐1│ └─────┘ └──►│ │
// │Trigger├─┤ │Node3│
// └───────┘ │ ┌─────┐1 ┌──►│ │
// └─►│Node2├──┘ └─────┘
// └─────┘
test('foo2', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const node3 = createNodeData({ name: 'node3' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2, node3)
.addConnections(
{ from: trigger, to: node1 },
{ from: trigger, to: node2 },
{ from: node1, to: node3, inputIndex: 0 },
{ from: node2, to: node3, inputIndex: 1 },
);
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { nodeName: 'trigger' } }])],
[node2.name]: [toITaskData([{ data: { nodeName: 'node2' } }])],
};
const pinData: IPinData = {};
// ACT
const groups = getSourceDataGroups(graph, node3, runData, pinData);
// ASSERT
console.log(inspect(groups, { colors: true, sorted: true, depth: null, compact: true }));
expect(groups).toHaveLength(1);
const group1 = groups[0];
expect(group1.connections).toHaveLength(2);
expect(group1.complete).toEqual(false);
});
});

View file

@ -9,8 +9,20 @@
// XX denotes that the node is disabled
// PD denotes that the node has pinned data
import { recreateNodeExecutionStack } from '@/PartialExecutionUtils/recreateNodeExecutionStack';
import { type IPinData, type IRunData } from 'n8n-workflow';
import {
addWaitingExecution,
addWaitingExecutionSource,
recreateNodeExecutionStack,
} from '@/PartialExecutionUtils/recreateNodeExecutionStack';
import {
INodeExecutionData,
ISourceData,
IWaitingForExecution,
IWaitingForExecutionSource,
NodeConnectionType,
type IPinData,
type IRunData,
} from 'n8n-workflow';
import { AssertionError } from 'assert';
import { DirectedGraph } from '../DirectedGraph';
import { findSubgraph } from '../findSubgraph';
@ -39,7 +51,7 @@ describe('recreateNodeExecutionStack', () => {
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData);
recreateNodeExecutionStack(workflow, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(1);
@ -91,7 +103,7 @@ describe('recreateNodeExecutionStack', () => {
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData);
recreateNodeExecutionStack(workflow, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(1);
@ -127,7 +139,7 @@ describe('recreateNodeExecutionStack', () => {
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData);
recreateNodeExecutionStack(workflow, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(1);
@ -174,9 +186,9 @@ describe('recreateNodeExecutionStack', () => {
const pinData = {};
// ACT & ASSERT
expect(() =>
recreateNodeExecutionStack(graph, startNodes, node2, runData, pinData),
).toThrowError(AssertionError);
expect(() => recreateNodeExecutionStack(graph, startNodes, runData, pinData)).toThrowError(
AssertionError,
);
});
// ►►
@ -212,10 +224,9 @@ describe('recreateNodeExecutionStack', () => {
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, startNodes, node3, runData, pinData);
recreateNodeExecutionStack(graph, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toEqual([
{
data: { main: [[{ json: { value: 1 } }]] },
@ -297,7 +308,7 @@ describe('recreateNodeExecutionStack', () => {
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, startNodes, node3, runData, pinData);
recreateNodeExecutionStack(graph, startNodes, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(1);
@ -330,4 +341,466 @@ describe('recreateNodeExecutionStack', () => {
},
});
});
// ┌─────┐1 ►►
// ┌─►│Node1┼──┐ ┌─────┐
// ┌───────┐1│ └─────┘ └──►│ │
// │Trigger├─┤ │Node3│
// └───────┘ │ ┌─────┐0 ┌──►│ │
// └─►│Node2├──┘ └─────┘
// └─────┘
test('foo', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const node3 = createNodeData({ name: 'node3' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2, node3)
.addConnections(
{ from: trigger, to: node1 },
{ from: trigger, to: node2 },
{ from: node1, to: node3, inputIndex: 0 },
{ from: node2, to: node3, inputIndex: 1 },
);
const startNodes = [node2, node3];
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { nodeName: 'trigger' } }])],
[node1.name]: [toITaskData([{ data: { nodeName: 'node1' } }])],
};
const pinData: IPinData = {};
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, startNodes, runData, pinData);
// ASSERT
// nodeExecutionStack [ { nodeName: 'Set Input 2', sourceName: [ 'When clicking "Execute Workflow"' ] } ]
// waitingExecution { Merge:
// { '0': { main: [ [ { json: { test: 123 }, pairedItem: { item: 0 } } ], null ] } } }
// waitingExecutionSource { Merge:
// { '0':
// { main:
// [ { previousNode: 'Set Input 1',
// previousNodeOutput: undefined,
// previousNodeRun: undefined },
// null ] } } }
expect(nodeExecutionStack).toHaveLength(1);
expect(nodeExecutionStack[0]).toEqual({
node: node2,
data: { main: [[{ json: { nodeName: 'trigger' } }]] },
source: {
main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }],
},
});
expect(waitingExecution).toEqual({
node3: {
'0': {
main: [[{ json: { nodeName: 'node1' } }], null],
},
},
});
expect(waitingExecutionSource).toEqual({
node3: {
'0': {
main: [
{
previousNode: 'node1',
previousNodeOutput: 0,
previousNodeRun: 0,
},
null,
],
},
},
});
});
// ┌─────┐0 ►►
// ┌─►│Node1┼──┐ ┌─────┐
// ┌───────┐1│ └─────┘ └──►│ │
// │Trigger├─┤ │Node3│
// └───────┘ │ ┌─────┐1 ┌──►│ │
// └─►│Node2├──┘ └─────┘
// └─────┘
test('foo2', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const node3 = createNodeData({ name: 'node3' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2, node3)
.addConnections(
{ from: trigger, to: node1 },
{ from: trigger, to: node2 },
{ from: node1, to: node3, inputIndex: 0 },
{ from: node2, to: node3, inputIndex: 1 },
);
const startNodes = [node2, node3];
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { nodeName: 'trigger' } }])],
[node2.name]: [toITaskData([{ data: { nodeName: 'node2' } }])],
};
const pinData: IPinData = {};
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, startNodes, runData, pinData);
// ASSERT
// nodeExecutionStack [ { nodeName: 'Set Input 2', sourceName: [ 'When clicking "Execute Workflow"' ] } ]
// waitingExecution { Merge:
// { '0': { main: [ [ { json: { test: 123 }, pairedItem: { item: 0 } } ], null ] } } }
// waitingExecutionSource { Merge:
// { '0':
// { main:
// [ { previousNode: 'Set Input 1',
// previousNodeOutput: undefined,
// previousNodeRun: undefined },
// null ] } } }
expect(nodeExecutionStack).toHaveLength(1);
expect(nodeExecutionStack[0]).toEqual({
node: node2,
data: { main: [[{ json: { nodeName: 'trigger' } }]] },
source: {
main: [{ previousNode: 'trigger', previousNodeOutput: 0, previousNodeRun: 0 }],
},
});
expect(waitingExecution).toEqual({
node3: {
'0': {
main: [null, [{ json: { nodeName: 'node2' } }]],
},
},
});
expect(waitingExecutionSource).toEqual({
node3: {
'0': {
main: [
null,
{
previousNode: 'node2',
previousNodeOutput: 0,
previousNodeRun: 0,
},
],
},
},
});
});
});
describe('addWaitingExecution', () => {
test('allow adding data partially', () => {
const waitingExecution: IWaitingForExecution = {};
const nodeName1 = 'node 1';
const nodeName2 = 'node 2';
const executionData: INodeExecutionData[] = [{ json: { item: 1 } }, { json: { item: 2 } }];
// adding the data for the second input index first
{
addWaitingExecution(
waitingExecution,
nodeName1,
1, // runIndex
NodeConnectionType.Main,
1, // inputIndex
executionData,
);
expect(waitingExecution).toEqual({
[nodeName1]: {
// runIndex
1: {
[NodeConnectionType.Main]: [undefined, executionData],
},
},
});
}
// adding the data for the first input
{
addWaitingExecution(
waitingExecution,
nodeName1,
1, // runIndex
NodeConnectionType.Main,
0, // inputIndex
executionData,
);
expect(waitingExecution).toEqual({
[nodeName1]: {
// runIndex
1: {
[NodeConnectionType.Main]: [executionData, executionData],
},
},
});
}
// adding data for another node connection type
{
addWaitingExecution(
waitingExecution,
nodeName1,
1, // runIndex
NodeConnectionType.AiMemory,
0, // inputIndex
executionData,
);
expect(waitingExecution).toEqual({
[nodeName1]: {
// runIndex
1: {
[NodeConnectionType.Main]: [executionData, executionData],
[NodeConnectionType.AiMemory]: [executionData],
},
},
});
}
// adding data for another run
{
addWaitingExecution(
waitingExecution,
nodeName1,
0, // runIndex
NodeConnectionType.AiChain,
0, // inputIndex
executionData,
);
expect(waitingExecution).toEqual({
[nodeName1]: {
// runIndex
0: {
[NodeConnectionType.AiChain]: [executionData],
},
1: {
[NodeConnectionType.Main]: [executionData, executionData],
[NodeConnectionType.AiMemory]: [executionData],
},
},
});
}
// adding data for another node
{
addWaitingExecution(
waitingExecution,
nodeName2,
0, // runIndex
NodeConnectionType.Main,
2, // inputIndex
executionData,
);
expect(waitingExecution).toEqual({
[nodeName1]: {
// runIndex
0: {
[NodeConnectionType.AiChain]: [executionData],
},
1: {
[NodeConnectionType.Main]: [executionData, executionData],
[NodeConnectionType.AiMemory]: [executionData],
},
},
[nodeName2]: {
// runIndex
0: {
[NodeConnectionType.Main]: [undefined, undefined, executionData],
},
},
});
}
// allow adding null
{
addWaitingExecution(
waitingExecution,
nodeName2,
0, // runIndex
NodeConnectionType.Main,
0, // inputIndex
null,
);
expect(waitingExecution).toEqual({
[nodeName2]: {
// runIndex
0: {
[NodeConnectionType.Main]: [null, undefined, executionData],
},
},
[nodeName1]: {
// runIndex
0: {
[NodeConnectionType.AiChain]: [executionData],
},
1: {
[NodeConnectionType.Main]: [executionData, executionData],
[NodeConnectionType.AiMemory]: [executionData],
},
},
});
}
});
});
describe('addWaitingExecutionSource', () => {
test('allow adding data partially', () => {
const waitingExecutionSource: IWaitingForExecutionSource = {};
const nodeName1 = 'node 1';
const nodeName2 = 'node 2';
const sourceData: ISourceData = {
previousNode: 'node 0',
previousNodeRun: 0,
previousNodeOutput: 0,
};
// adding the data for the second input index first
{
addWaitingExecutionSource(
waitingExecutionSource,
nodeName1,
1, // runIndex
NodeConnectionType.Main,
1, // inputIndex
sourceData,
);
expect(waitingExecutionSource).toEqual({
[nodeName1]: {
// runIndex
1: {
[NodeConnectionType.Main]: [undefined, sourceData],
},
},
});
}
// adding the data for the first input
{
addWaitingExecutionSource(
waitingExecutionSource,
nodeName1,
1, // runIndex
NodeConnectionType.Main,
0, // inputIndex
sourceData,
);
expect(waitingExecutionSource).toEqual({
[nodeName1]: {
// runIndex
1: {
[NodeConnectionType.Main]: [sourceData, sourceData],
},
},
});
}
// adding data for another node connection type
{
addWaitingExecutionSource(
waitingExecutionSource,
nodeName1,
1, // runIndex
NodeConnectionType.AiMemory,
0, // inputIndex
sourceData,
);
expect(waitingExecutionSource).toEqual({
[nodeName1]: {
// runIndex
1: {
[NodeConnectionType.Main]: [sourceData, sourceData],
[NodeConnectionType.AiMemory]: [sourceData],
},
},
});
}
// adding data for another run
{
addWaitingExecutionSource(
waitingExecutionSource,
nodeName1,
0, // runIndex
NodeConnectionType.AiChain,
0, // inputIndex
sourceData,
);
expect(waitingExecutionSource).toEqual({
[nodeName1]: {
// runIndex
0: {
[NodeConnectionType.AiChain]: [sourceData],
},
1: {
[NodeConnectionType.Main]: [sourceData, sourceData],
[NodeConnectionType.AiMemory]: [sourceData],
},
},
});
}
// adding data for another node
{
addWaitingExecutionSource(
waitingExecutionSource,
nodeName2,
0, // runIndex
NodeConnectionType.Main,
2, // inputIndex
sourceData,
);
expect(waitingExecutionSource).toEqual({
[nodeName1]: {
// runIndex
0: {
[NodeConnectionType.AiChain]: [sourceData],
},
1: {
[NodeConnectionType.Main]: [sourceData, sourceData],
[NodeConnectionType.AiMemory]: [sourceData],
},
},
[nodeName2]: {
// runIndex
0: {
[NodeConnectionType.Main]: [undefined, undefined, sourceData],
},
},
});
}
// allow adding null
{
addWaitingExecutionSource(
waitingExecutionSource,
nodeName2,
0, // runIndex
NodeConnectionType.Main,
0, // inputIndex
null,
);
expect(waitingExecutionSource).toEqual({
[nodeName1]: {
// runIndex
0: {
[NodeConnectionType.AiChain]: [sourceData],
},
1: {
[NodeConnectionType.Main]: [sourceData, sourceData],
[NodeConnectionType.AiMemory]: [sourceData],
},
},
[nodeName2]: {
// runIndex
0: {
[NodeConnectionType.Main]: [null, undefined, sourceData],
},
},
});
}
});
});

View file

@ -55,6 +55,20 @@ function findStartNodesRecursive(
startNodes: Set<INode>,
seen: Set<INode>,
): Set<INode> {
//// TODO: find a consistent way to identify triggers
//const isTrigger = false;
//
//// if the current node is not a trigger
//if (!isTrigger) {
// //and has no input data (on all connections)
// const parents = graph.getDirectParents(current);
// const allParentsHaveData = parents.every((c) => runData[c.from.name] || pinData[c.from.name]);
//
// if (!allParentsHaveData) {
// return startNodes;
// }
//}
const nodeIsDirty = isDirty(current, runData, pinData);
// If the current node is dirty stop following this branch, we found a start

View file

@ -1,4 +1,3 @@
import * as a from 'assert';
import type { INodeExecutionData, IRunData, NodeConnectionType } from 'n8n-workflow';
export function getIncomingData(
@ -7,16 +6,6 @@ export function getIncomingData(
runIndex: number,
connectionType: NodeConnectionType,
outputIndex: number,
): INodeExecutionData[] | null | undefined {
a.ok(runData[nodeName], `Can't find node with name '${nodeName}' in runData.`);
a.ok(
runData[nodeName][runIndex],
`Can't find a run for index '${runIndex}' for node name '${nodeName}'`,
);
a.ok(
runData[nodeName][runIndex].data,
`Can't find data for index '${runIndex}' for node name '${nodeName}'`,
);
return runData[nodeName][runIndex].data[connectionType][outputIndex];
): INodeExecutionData[] | null {
return runData[nodeName]?.[runIndex]?.data?.[connectionType][outputIndex] ?? null;
}

View file

@ -13,6 +13,20 @@ function sortByInputIndexThenByName(
}
}
type Group = {
complete: boolean;
connections: GraphConnection[];
};
function newGroup() {
return {
complete: true,
connections: [],
};
}
// TODO: There must be a nicer way to write this function
/**
* Groups incoming connections to the node. The groups contain one connection
* per input, if possible, with run data or pinned data.
@ -67,46 +81,69 @@ export function getSourceDataGroups(
node: INode,
runData: IRunData,
pinnedData: IPinData,
): GraphConnection[][] {
): Group[] {
const connections = graph.getConnections({ to: node });
const sortedConnectionsWithData = [];
const sortedConnectionsWithoutData = [];
for (const connection of connections) {
const hasData = runData[connection.from.name] || pinnedData[connection.from.name];
if (hasData) {
sortedConnectionsWithData.push(connection);
} else {
sortedConnectionsWithoutData.push(connection);
}
}
sortedConnectionsWithData.sort(sortByInputIndexThenByName);
sortedConnectionsWithoutData.sort(sortByInputIndexThenByName);
const groups: GraphConnection[][] = [];
let currentGroup: GraphConnection[] = [];
const groups: Group[] = [];
let currentGroup: Group = newGroup();
let currentInputIndex = -1;
while (sortedConnectionsWithData.length > 0) {
const connectionWithDataIndex = sortedConnectionsWithData.findIndex(
// eslint-disable-next-line @typescript-eslint/no-loop-func
(c) => c.inputIndex > currentInputIndex,
);
const connection: GraphConnection | undefined =
sortedConnectionsWithData[connectionWithDataIndex];
while (sortedConnectionsWithData.length > 0 || sortedConnectionsWithoutData.length > 0) {
debugger;
currentInputIndex++;
if (connection === undefined) {
groups.push(currentGroup);
currentGroup = [];
currentInputIndex = -1;
continue;
{
const connectionWithDataIndex = sortedConnectionsWithData.findIndex(
// eslint-disable-next-line @typescript-eslint/no-loop-func
(c) => c.inputIndex === currentInputIndex,
);
if (connectionWithDataIndex >= 0) {
const connection = sortedConnectionsWithData[connectionWithDataIndex];
currentGroup.connections.push(connection);
sortedConnectionsWithData.splice(connectionWithDataIndex, 1);
continue;
}
}
currentInputIndex = connection.inputIndex;
currentGroup.push(connection);
{
const connectionWithoutDataIndex = sortedConnectionsWithoutData.findIndex(
// eslint-disable-next-line @typescript-eslint/no-loop-func
(c) => c.inputIndex === currentInputIndex,
);
if (connectionWithDataIndex >= 0) {
sortedConnectionsWithData.splice(connectionWithDataIndex, 1);
if (connectionWithoutDataIndex >= 0) {
const connection = sortedConnectionsWithoutData[connectionWithoutDataIndex];
currentGroup.connections.push(connection);
currentGroup.complete = false;
sortedConnectionsWithoutData.splice(connectionWithoutDataIndex, 1);
continue;
}
}
groups.push(currentGroup);
currentGroup = newGroup();
currentInputIndex = -1;
}
groups.push(currentGroup);

View file

@ -16,6 +16,46 @@ import type { DirectedGraph } from './DirectedGraph';
import { getIncomingData } from './getIncomingData';
import { getSourceDataGroups } from './getSourceDataGroups';
export function addWaitingExecution(
waitingExecution: IWaitingForExecution,
nodeName: string,
runIndex: number,
inputType: NodeConnectionType,
inputIndex: number,
executionData: INodeExecutionData[] | null,
) {
const waitingExecutionObject = waitingExecution[nodeName] ?? {};
const taskDataConnections = waitingExecutionObject[runIndex] ?? {};
const executionDataList = taskDataConnections[inputType] ?? [];
executionDataList[inputIndex] = executionData;
taskDataConnections[inputType] = executionDataList;
waitingExecutionObject[runIndex] = taskDataConnections;
waitingExecution[nodeName] = waitingExecutionObject;
}
export function addWaitingExecutionSource(
waitingExecutionSource: IWaitingForExecutionSource,
nodeName: string,
runIndex: number,
inputType: NodeConnectionType,
inputIndex: number,
sourceData: ISourceData | null,
) {
const waitingExecutionSourceObject = waitingExecutionSource[nodeName] ?? {};
const taskDataConnectionsSource = waitingExecutionSourceObject[runIndex] ?? {};
const sourceDataList = taskDataConnectionsSource[inputType] ?? [];
sourceDataList[inputIndex] = sourceData;
taskDataConnectionsSource[inputType] = sourceDataList;
waitingExecutionSourceObject[runIndex] = taskDataConnectionsSource;
waitingExecutionSource[nodeName] = waitingExecutionSourceObject;
}
// TODO: What about paired items?
// TODO: clean up this function. It's too complex.
/**
* Recreates the node execution stack, waiting executions and waiting
* execution sources from a directed graph, start nodes, the destination node,
@ -30,10 +70,10 @@ import { getSourceDataGroups } from './getSourceDataGroups';
* necessary to be able to execute the destination node accurately, e.g. as
* close as possible to what would happen in a production execution.
*/
// eslint-disable-next-line complexity
export function recreateNodeExecutionStack(
graph: DirectedGraph,
startNodes: INode[],
destinationNode: INode,
runData: IRunData,
pinData: IPinData,
): {
@ -83,89 +123,95 @@ export function recreateNodeExecutionStack(
} else {
const sourceDataSets = getSourceDataGroups(graph, startNode, runData, pinData);
console.log(
'sourceDataSets',
sourceDataSets
.map((group) => group.connections.map((c) => `${c.from.name} - ${c.to.name}`))
.join('; '),
);
for (const sourceData of sourceDataSets) {
incomingData = [];
if (sourceData.complete) {
// All incoming connections have data, so let's put the node on the
// stack!
incomingData = [];
incomingSourceData = { main: [] };
incomingSourceData = { main: [] };
for (const incomingConnection of sourceData) {
const node = incomingConnection.from;
for (const incomingConnection of sourceData.connections) {
const sourceNode = incomingConnection.from;
if (pinData[node.name]) {
incomingData.push(pinData[node.name]);
} else {
a.ok(
runData[node.name],
`Start node(${incomingConnection.to.name}) has an incoming connection with no run or pinned data. This is not supported. The connection in question is "${node.name}->${startNode.name}". Are you sure the start nodes come from the "findStartNodes" function?`,
);
if (pinData[sourceNode.name]) {
incomingData.push(pinData[sourceNode.name]);
} else {
a.ok(
runData[sourceNode.name],
`Start node(${incomingConnection.to.name}) has an incoming connection with no run or pinned data. This is not supported. The connection in question is "${sourceNode.name}->${startNode.name}". Are you sure the start nodes come from the "findStartNodes" function?`,
);
const nodeIncomingData = getIncomingData(
runData,
sourceNode.name,
runIndex,
incomingConnection.type,
incomingConnection.outputIndex,
);
if (nodeIncomingData) {
incomingData.push(nodeIncomingData);
}
}
incomingSourceData.main.push({
previousNode: incomingConnection.from.name,
previousNodeOutput: incomingConnection.outputIndex,
previousNodeRun: 0,
});
}
const executeData: IExecuteData = {
node: startNode,
data: { main: incomingData },
source: incomingSourceData,
};
nodeExecutionStack.push(executeData);
} else {
const nodeName = startNode.name;
for (const incomingConnection of sourceData.connections) {
const sourceNode = incomingConnection.from;
const nodeIncomingData = getIncomingData(
runData,
node.name,
sourceNode.name,
runIndex,
incomingConnection.type,
incomingConnection.outputIndex,
);
if (nodeIncomingData) {
incomingData.push(nodeIncomingData);
}
}
incomingSourceData.main.push({
previousNode: incomingConnection.from.name,
previousNodeOutput: incomingConnection.outputIndex,
previousNodeRun: 0,
});
}
const executeData: IExecuteData = {
node: startNode,
data: { main: incomingData },
source: incomingSourceData,
};
nodeExecutionStack.push(executeData);
}
}
// TODO: Do we need this?
if (destinationNode) {
const destinationNodeName = destinationNode.name;
// Check if the destinationNode has to be added as waiting
// because some input data is already fully available
const incomingDestinationNodeConnections = graph
.getDirectParents(destinationNode)
.filter((c) => c.type === NodeConnectionType.Main);
if (incomingDestinationNodeConnections !== undefined) {
for (const connection of incomingDestinationNodeConnections) {
if (waitingExecution[destinationNodeName] === undefined) {
waitingExecution[destinationNodeName] = {};
waitingExecutionSource[destinationNodeName] = {};
}
if (waitingExecution[destinationNodeName][runIndex] === undefined) {
waitingExecution[destinationNodeName][runIndex] = {};
waitingExecutionSource[destinationNodeName][runIndex] = {};
}
if (waitingExecution[destinationNodeName][runIndex][connection.type] === undefined) {
waitingExecution[destinationNodeName][runIndex][connection.type] = [];
waitingExecutionSource[destinationNodeName][runIndex][connection.type] = [];
}
if (runData[connection.from.name] !== undefined) {
// Input data exists so add as waiting
// incomingDataDestination.push(runData[connection.node!][runIndex].data![connection.type][connection.index]);
waitingExecution[destinationNodeName][runIndex][connection.type].push(
runData[connection.from.name][runIndex].data![connection.type][connection.inputIndex],
addWaitingExecution(
waitingExecution,
nodeName,
runIndex,
incomingConnection.type,
incomingConnection.inputIndex,
nodeIncomingData ?? null,
);
addWaitingExecutionSource(
waitingExecutionSource,
nodeName,
runIndex,
incomingConnection.type,
incomingConnection.inputIndex,
nodeIncomingData
? {
previousNode: incomingConnection.from.name,
previousNodeRun: runIndex,
previousNodeOutput: incomingConnection.outputIndex,
}
: null,
);
waitingExecutionSource[destinationNodeName][runIndex][connection.type].push({
previousNode: connection.from.name,
previousNodeOutput: connection.inputIndex || undefined,
previousNodeRun: runIndex || undefined,
} as ISourceData);
} else {
waitingExecution[destinationNodeName][runIndex][connection.type].push(null);
waitingExecutionSource[destinationNodeName][runIndex][connection.type].push(null);
}
}
}

View file

@ -58,6 +58,7 @@ import {
findSubgraph,
findTriggerForPartialExecution,
} from './PartialExecutionUtils';
import { inspect } from 'util';
export class WorkflowExecute {
private status: ExecutionStatus = 'new';
@ -345,13 +346,19 @@ export class WorkflowExecute {
'The destination node is not connected to any trigger. Partial executions need a trigger.',
);
}
console.log('trigger', trigger.name);
// 2. Find the Subgraph
const subgraph = findSubgraph(DirectedGraph.fromWorkflow(workflow), destinationNode, trigger);
const filteredNodes = subgraph.getNodes();
console.log('subgraph', [...subgraph.getNodes().keys()]);
// 3. Find the Start Nodes
const startNodes = findStartNodes(subgraph, trigger, destinationNode, runData);
console.log(
'startNodes',
startNodes.map((n) => n.name),
);
// 4. Detect Cycles
const cycles = findCycles(workflow);
@ -366,7 +373,7 @@ export class WorkflowExecute {
// 7. Recreate Execution Stack
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(subgraph, startNodes, destinationNode, runData, pinData ?? {});
recreateNodeExecutionStack(subgraph, startNodes, runData, pinData ?? {});
// 8. Execute
this.status = 'running';
@ -988,9 +995,38 @@ export class WorkflowExecute {
throw error;
}
console.log('-------------------START--------------------------------');
executionLoop: while (
this.runExecutionData.executionData!.nodeExecutionStack.length !== 0
) {
console.log('---------------------');
console.log(
'nodeExecutionStack',
inspect(
this.runExecutionData.executionData?.nodeExecutionStack.map((v) => ({
nodeName: v.node.name,
sourceName: v.source?.main.map((vv) => vv?.previousNode),
})),
{ depth: null, colors: true, compact: true },
),
);
console.log(
'waitingExecution',
inspect(this.runExecutionData.executionData?.waitingExecution, {
depth: null,
colors: true,
compact: true,
}),
);
console.log(
'waitingExecutionSource',
inspect(this.runExecutionData.executionData?.waitingExecutionSource, {
depth: null,
colors: true,
compact: true,
}),
);
if (
this.additionalData.executionTimeoutTimestamp !== undefined &&
Date.now() >= this.additionalData.executionTimeoutTimestamp
@ -1823,6 +1859,17 @@ export class WorkflowExecute {
}
}
console.log('---------------------');
console.log(
'nodeExecutionStack',
this.runExecutionData.executionData?.nodeExecutionStack.map((v) => ({
nodeName: v.node.name,
sourceName: v.source?.main.map((v) => v?.previousNode),
})),
);
console.log('waitingExecution', this.runExecutionData.executionData?.waitingExecution);
console.log('-------------------END--------------------------------');
return;
})()
.then(async () => {

View file

@ -2239,14 +2239,14 @@ export interface IWorkflowExecuteAdditionalData {
}
export type WorkflowExecuteMode =
| 'cli'
| 'error'
| 'cli' // unused
| 'error' // unused, but maybe used for error workflows
| 'integrated'
| 'internal'
| 'manual'
| 'retry'
| 'trigger'
| 'webhook';
| 'retry' // unused
| 'trigger' // unused
| 'webhook'; // unused
export type WorkflowActivateMode =
| 'init'

23
pull requests.md Normal file
View file

@ -0,0 +1,23 @@
This PR is rather big.
It does contain TODOs and functions that miss implementation. I'm aware of that, but still find that getting this into trunk now and creating smaller PRs for the TODOs and missing functionality will make future reviewing easier.
It's mostly new files that connect with the old code via one feature gate in the `POST /workflows/run` controller/service.
The new functionality should not have an effect on the old code.
The new code follows the spec written here: https://www.notion.so/n8n/Partial-Executions-9f24ffe8c6474eaeab51c9784ad1fd46?p=4298d34eb54f42e1baa098c9ccc50b5a&pm=s
Changes to the old code are kept to a minimum to avoid breaking the old partial execution flow. The goal is to remove all old code after the new partial executions flow saw enough testing.
I also added some comments to parts of the code that I did not immediately understand.
## Important Review Notes
It's best to start reviewing with this file: packages/core/src/WorkflowExecute.ts
Everything in `cypress/*` is only done to make the interceptors still work with `POST /workflow/run` having a query parameter now.
All the new code including the tests are in `packages/core/src/PartialExecutionUtils` neatly separated by modules that align with the aforementioned spec.
The editor code only contains small adjustments to allow for switching between the old and new flow using a key in local storage.
If you're done with all these, you pretty much reviewed 90% of the PR. Congratulations!