feat(core): Handle cycles in workflows when partially executing them (#11187)
Some checks failed
Test Master / install-and-build (push) Has been cancelled
Test Master / Unit tests (18.x) (push) Has been cancelled
Test Master / Unit tests (20.x) (push) Has been cancelled
Test Master / Unit tests (22.4) (push) Has been cancelled
Test Master / Lint (push) Has been cancelled
Test Master / Notify Slack on failure (push) Has been cancelled

This commit is contained in:
Danny Martini 2024-10-18 17:30:26 +02:00 committed by GitHub
parent b4b543d41d
commit 321d6deef1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 469 additions and 53 deletions

View file

@ -286,6 +286,149 @@ export class DirectedGraph {
);
}
/**
* Returns all strongly connected components.
*
* Strongly connected components are a set of nodes where it's possible to
* reach every node from every node.
*
* Strongly connected components are mutually exclusive in directed graphs,
* e.g. they cannot overlap.
*
* The smallest strongly connected component is a single node, since it can
* reach itself from itself by not following any edges.
*
* The algorithm implement here is Tarjan's algorithm.
*
* Example:
*
* node1node2node3node5
*
*
*
* node4 node6
*
*
* The strongly connected components are
* 1. node1
* 2. node2, node4, node3
* 3. node5, node6
*
* Further reading:
* https://en.wikipedia.org/wiki/Strongly_connected_component
* https://www.youtube.com/watch?v=wUgWX0nc4NY
*/
getStronglyConnectedComponents(): Array<Set<INode>> {
let id = 0;
const visited = new Set<INode>();
const ids = new Map<INode, number>();
const lowLinkValues = new Map<INode, number>();
const stack: INode[] = [];
const stronglyConnectedComponents: Array<Set<INode>> = [];
const followNode = (node: INode) => {
if (visited.has(node)) {
return;
}
visited.add(node);
lowLinkValues.set(node, id);
ids.set(node, id);
id++;
stack.push(node);
const directChildren = this.getDirectChildConnections(node).map((c) => c.to);
for (const child of directChildren) {
followNode(child);
// if node is on stack min the low id
if (stack.includes(child)) {
const childLowLinkValue = lowLinkValues.get(child);
const ownLowLinkValue = lowLinkValues.get(node);
a.ok(childLowLinkValue !== undefined);
a.ok(ownLowLinkValue !== undefined);
const lowestLowLinkValue = Math.min(childLowLinkValue, ownLowLinkValue);
lowLinkValues.set(node, lowestLowLinkValue);
}
}
// after we visited all children, check if the low id is the same as the
// nodes id, which means we found a strongly connected component
const ownId = ids.get(node);
const ownLowLinkValue = lowLinkValues.get(node);
a.ok(ownId !== undefined);
a.ok(ownLowLinkValue !== undefined);
if (ownId === ownLowLinkValue) {
// pop from the stack until the stack is empty or we find a node that
// has a different low id
const scc: Set<INode> = new Set();
let next = stack.at(-1);
while (next && lowLinkValues.get(next) === ownId) {
stack.pop();
scc.add(next);
next = stack.at(-1);
}
if (scc.size > 0) {
stronglyConnectedComponents.push(scc);
}
}
};
for (const node of this.nodes.values()) {
followNode(node);
}
return stronglyConnectedComponents;
}
private depthFirstSearchRecursive(
from: INode,
fn: (node: INode) => boolean,
seen: Set<INode>,
): INode | undefined {
if (seen.has(from)) {
return undefined;
}
seen.add(from);
if (fn(from)) {
return from;
}
for (const childConnection of this.getDirectChildConnections(from)) {
const found = this.depthFirstSearchRecursive(childConnection.to, fn, seen);
if (found) {
return found;
}
}
return undefined;
}
/**
* Like `Array.prototype.find` but for directed graphs.
*
* Starting from, and including, the `from` node this calls the provided
* predicate function with every child node until the predicate function
* returns true.
*
* The search is depth first, meaning every branch is exhausted before the
* next branch is tried.
*
* The first node for which the predicate function returns true is returned.
*
* If the graph is exhausted and the predicate function never returned true,
* undefined is returned instead.
*/
depthFirstSearch({ from, fn }: { from: INode; fn: (node: INode) => boolean }): INode | undefined {
return this.depthFirstSearchRecursive(from, fn, new Set());
}
toWorkflow(parameters: Omit<WorkflowParameters, 'nodes' | 'connections'>): Workflow {
return new Workflow({
...parameters,

View file

@ -9,6 +9,7 @@
// XX denotes that the node is disabled
// PD denotes that the node has pinned data
import type { INode } from 'n8n-workflow';
import { NodeConnectionType } from 'n8n-workflow';
import { createNodeData, defaultWorkflowParameter } from './helpers';
@ -89,6 +90,115 @@ describe('DirectedGraph', () => {
});
});
describe('getStronglyConnectedComponents', () => {
// ┌─────┐ ┌─────┐ ┌─────┐
// │node1├───►│node2├───►│node4│
// └─────┘ └──┬──┘ └─────┘
// ▲ │
// │ │
// ┌──┴──┐ │
// │node3│◄──────┘
// └─────┘
test('find strongly connected components', () => {
// ARRANGE
const node1 = createNodeData({ name: 'Node1' });
const node2 = createNodeData({ name: 'Node2' });
const node3 = createNodeData({ name: 'Node3' });
const node4 = createNodeData({ name: 'Node4' });
const graph = new DirectedGraph()
.addNodes(node1, node2, node3, node4)
.addConnections(
{ from: node1, to: node2 },
{ from: node2, to: node3 },
{ from: node3, to: node1 },
{ from: node2, to: node4 },
);
// ACT
const stronglyConnectedComponents = graph.getStronglyConnectedComponents();
// ASSERT
expect(stronglyConnectedComponents).toHaveLength(2);
expect(stronglyConnectedComponents).toContainEqual(new Set([node4]));
expect(stronglyConnectedComponents).toContainEqual(new Set([node3, node2, node1]));
});
// ┌────┐
// ┌───────┐ │ ├─
// │trigger├──┬──►loop│
// └───────┘ │ │ ├────┐
// │ └────┘ │
// └─────────┐ │
// ┌────┐ │ │
// ┌───►node├─┘ │
// │ └────┘ │
// │ │
// └─────────────┘
test('find strongly connected components even if they use different output indexes', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const loop = createNodeData({ name: 'loop' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(trigger, loop, node)
.addConnections(
{ from: trigger, to: loop },
{ from: loop, outputIndex: 1, to: node },
{ from: node, to: loop },
);
// ACT
const stronglyConnectedComponents = graph.getStronglyConnectedComponents();
// ASSERT
expect(stronglyConnectedComponents).toHaveLength(2);
expect(stronglyConnectedComponents).toContainEqual(new Set([trigger]));
expect(stronglyConnectedComponents).toContainEqual(new Set([node, loop]));
});
});
describe('depthFirstSearch', () => {
// ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐
// │node0├───►│node1├───►│node2├───►│node4│───►│node5│
// └─────┘ └─────┘ └──┬──┘ └─────┘ └─────┘
// ▲ │
// │ │
// ┌──┴──┐ │
// │node3│◄──────┘
// └─────┘
test('calls nodes in the correct order and stops when it found the node', () => {
// ARRANGE
const node0 = createNodeData({ name: 'Node0' });
const node1 = createNodeData({ name: 'Node1' });
const node2 = createNodeData({ name: 'Node2' });
const node3 = createNodeData({ name: 'Node3' });
const node4 = createNodeData({ name: 'Node4' });
const node5 = createNodeData({ name: 'Node5' });
const graph = new DirectedGraph()
.addNodes(node0, node1, node2, node3, node4, node5)
.addConnections(
{ from: node0, to: node1 },
{ from: node1, to: node2 },
{ from: node2, to: node3 },
{ from: node3, to: node1 },
{ from: node2, to: node4 },
{ from: node4, to: node5 },
);
const fn = jest.fn().mockImplementation((node: INode) => node === node4);
// ACT
const foundNode = graph.depthFirstSearch({
from: node0,
fn,
});
// ASSERT
expect(foundNode).toBe(node4);
expect(fn).toHaveBeenCalledTimes(5);
expect(fn.mock.calls).toEqual([[node0], [node1], [node2], [node3], [node4]]);
});
});
describe('getParentConnections', () => {
// ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐
// │node1├──►│node2├──►│node3│──►│node4│

View file

@ -23,7 +23,7 @@ describe('cleanRunData', () => {
};
// ACT
const newRunData = cleanRunData(runData, graph, [node1]);
const newRunData = cleanRunData(runData, graph, new Set([node1]));
// ASSERT
expect(newRunData).toEqual({});
@ -47,7 +47,7 @@ describe('cleanRunData', () => {
};
// ACT
const newRunData = cleanRunData(runData, graph, [node2]);
const newRunData = cleanRunData(runData, graph, new Set([node2]));
// ASSERT
expect(newRunData).toEqual({ [node1.name]: runData[node1.name] });
@ -78,7 +78,7 @@ describe('cleanRunData', () => {
};
// ACT
const newRunData = cleanRunData(runData, graph, [node2]);
const newRunData = cleanRunData(runData, graph, new Set([node2]));
// ASSERT
// TODO: Find out if this is a desirable result in milestone 2

View file

@ -48,8 +48,8 @@ describe('findStartNodes', () => {
const startNodes = findStartNodes({ graph, trigger: node, destination: node });
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node);
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(node);
});
// ►►
@ -67,8 +67,8 @@ describe('findStartNodes', () => {
{
const startNodes = findStartNodes({ graph, trigger, destination });
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(trigger);
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(trigger);
}
// if the trigger has run data
@ -79,8 +79,8 @@ describe('findStartNodes', () => {
const startNodes = findStartNodes({ graph, trigger, destination, runData });
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(destination);
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(destination);
}
});
@ -115,8 +115,8 @@ describe('findStartNodes', () => {
const startNodes = findStartNodes({ graph, trigger, destination: node, runData });
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node);
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(node);
});
// ┌─────┐ ┌─────┐ ►►
@ -156,9 +156,9 @@ describe('findStartNodes', () => {
const startNodes = findStartNodes({ graph, trigger, destination: node4 });
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes.size).toBe(1);
// no run data means the trigger is the start node
expect(startNodes[0]).toEqual(trigger);
expect(startNodes).toContainEqual(trigger);
}
{
@ -175,8 +175,8 @@ describe('findStartNodes', () => {
const startNodes = findStartNodes({ graph, trigger, destination: node4, runData });
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node4);
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(node4);
}
});
@ -211,8 +211,8 @@ describe('findStartNodes', () => {
});
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node);
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(node);
});
// ►►
@ -246,8 +246,8 @@ describe('findStartNodes', () => {
});
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node);
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(node);
});
// ►►
@ -286,8 +286,8 @@ describe('findStartNodes', () => {
});
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node);
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(node);
});
// ►►
@ -324,8 +324,8 @@ describe('findStartNodes', () => {
});
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node3);
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(node3);
});
// ►►
@ -360,8 +360,8 @@ describe('findStartNodes', () => {
});
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node2);
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(node2);
});
// ►►
@ -392,7 +392,7 @@ describe('findStartNodes', () => {
const startNodes = findStartNodes({ graph, trigger, destination: node2, runData, pinData });
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node2);
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(node2);
});
});

View file

@ -0,0 +1,116 @@
// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/
// If you update the tests, please update the diagrams as well.
// If you add a test, please create a new diagram.
//
// Map
// 0 means the output has no run data
// 1 means the output has run data
// ►► denotes the node that the user wants to execute to
// XX denotes that the node is disabled
// PD denotes that the node has pinned data
import { createNodeData } from './helpers';
import { DirectedGraph } from '../DirectedGraph';
import { handleCycles } from '../handleCycles';
describe('handleCycles', () => {
// ┌────┐ ┌─────────┐
//┌───────┐ │ ├──────────►afterLoop│
//│trigger├────┬───►loop│ └─────────┘
//└───────┘ │ │ ├─┐ ►►
// │ └────┘ │ ┌──────┐
// │ └───►inLoop├────┐
// │ └──────┘ │
// │ │
// └──────────────────────────┘
test('if the start node is within a cycle it returns the start of the cycle as the new start node', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const loop = createNodeData({ name: 'loop' });
const inLoop = createNodeData({ name: 'inLoop' });
const afterLoop = createNodeData({ name: 'afterLoop' });
const graph = new DirectedGraph()
.addNodes(trigger, loop, inLoop, afterLoop)
.addConnections(
{ from: trigger, to: loop },
{ from: loop, outputIndex: 0, to: afterLoop },
{ from: loop, outputIndex: 1, to: inLoop },
{ from: inLoop, to: loop },
);
const startNodes = new Set([inLoop]);
// ACT
const newStartNodes = handleCycles(graph, startNodes, trigger);
// ASSERT
expect(newStartNodes.size).toBe(1);
expect(newStartNodes).toContainEqual(loop);
});
// ┌────┐ ┌─────────┐
//┌───────┐ │ ├──────────►afterLoop│
//│trigger├────┬───►loop│ └─────────┘
//└───────┘ │ │ ├─┐ ►►
// │ └────┘ │ ┌──────┐
// │ └───►inLoop├────┐
// │ └──────┘ │
// │ │
// └──────────────────────────┘
test('does not mutate `startNodes`', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const loop = createNodeData({ name: 'loop' });
const inLoop = createNodeData({ name: 'inLoop' });
const afterLoop = createNodeData({ name: 'afterLoop' });
const graph = new DirectedGraph()
.addNodes(trigger, loop, inLoop, afterLoop)
.addConnections(
{ from: trigger, to: loop },
{ from: loop, outputIndex: 0, to: afterLoop },
{ from: loop, outputIndex: 1, to: inLoop },
{ from: inLoop, to: loop },
);
const startNodes = new Set([inLoop]);
// ACT
handleCycles(graph, startNodes, trigger);
// ASSERT
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(inLoop);
});
// ►►
// ┌────┐ ┌─────────┐
//┌───────┐ │ ├──────────►afterLoop│
//│trigger├────┬───►loop│ └─────────┘
//└───────┘ │ │ ├─┐
// │ └────┘ │ ┌──────┐
// │ └───►inLoop├────┐
// │ └──────┘ │
// │ │
// └──────────────────────────┘
test('if the start node is not within a cycle it returns the same node as the new start node', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const loop = createNodeData({ name: 'loop' });
const inLoop = createNodeData({ name: 'inLoop' });
const afterLoop = createNodeData({ name: 'afterLoop' });
const graph = new DirectedGraph()
.addNodes(trigger, loop, inLoop, afterLoop)
.addConnections(
{ from: trigger, to: loop },
{ from: loop, outputIndex: 0, to: afterLoop },
{ from: loop, outputIndex: 1, to: inLoop },
{ from: inLoop, to: loop },
);
const startNodes = new Set([afterLoop]);
// ACT
const newStartNodes = handleCycles(graph, startNodes, trigger);
// ASSERT
expect(newStartNodes.size).toBe(1);
expect(newStartNodes).toContainEqual(afterLoop);
});
});

View file

@ -33,7 +33,7 @@ describe('recreateNodeExecutionStack', () => {
.addConnections({ from: trigger, to: node });
const workflow = findSubgraph({ graph, destination: node, trigger });
const startNodes = [node];
const startNodes = new Set([node]);
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
};
@ -87,7 +87,7 @@ describe('recreateNodeExecutionStack', () => {
const workflow = new DirectedGraph()
.addNodes(trigger, node)
.addConnections({ from: trigger, to: node });
const startNodes = [trigger];
const startNodes = new Set([trigger]);
const runData: IRunData = {};
const pinData: IPinData = {};
@ -121,7 +121,7 @@ describe('recreateNodeExecutionStack', () => {
const workflow = new DirectedGraph()
.addNodes(trigger, node)
.addConnections({ from: trigger, to: node });
const startNodes = [node];
const startNodes = new Set([node]);
const runData: IRunData = {};
const pinData: IPinData = {
[trigger.name]: [{ json: { value: 1 } }],
@ -169,7 +169,7 @@ describe('recreateNodeExecutionStack', () => {
.addNodes(trigger, node1, node2)
.addConnections({ from: trigger, to: node1 }, { from: node1, to: node2 });
const startNodes = [node2];
const startNodes = new Set([node2]);
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
};
@ -204,7 +204,7 @@ describe('recreateNodeExecutionStack', () => {
{ from: node2, to: node3 },
);
const startNodes = [node3];
const startNodes = new Set([node3]);
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
[node1.name]: [toITaskData([{ data: { value: 1 } }])],
@ -287,7 +287,7 @@ describe('recreateNodeExecutionStack', () => {
{ from: node1, to: node3, inputIndex: 0 },
{ from: node2, to: node3, inputIndex: 1 },
);
const startNodes = [node3];
const startNodes = new Set([node3]);
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
[node1.name]: [toITaskData([{ data: { value: 1 } }])],

View file

@ -10,7 +10,7 @@ import type { DirectedGraph } from './DirectedGraph';
export function cleanRunData(
runData: IRunData,
graph: DirectedGraph,
startNodes: INode[],
startNodes: Set<INode>,
): IRunData {
const newRunData: IRunData = { ...runData };

View file

@ -1,6 +0,0 @@
import type { Workflow } from 'n8n-workflow';
export function findCycles(_workflow: Workflow) {
// TODO: implement depth first search or Tarjan's Algorithm
return [];
}

View file

@ -137,7 +137,7 @@ export function findStartNodes(options: {
destination: INode;
runData?: IRunData;
pinData?: IPinData;
}): INode[] {
}): Set<INode> {
const graph = options.graph;
const trigger = options.trigger;
const destination = options.destination;
@ -156,5 +156,5 @@ export function findStartNodes(options: {
new Set(),
);
return [...startNodes];
return startNodes;
}

View file

@ -0,0 +1,56 @@
import type { INode } from 'n8n-workflow';
import * as a from 'node:assert/strict';
import type { DirectedGraph } from './DirectedGraph';
/**
* Returns a new set of start nodes.
*
* For every start node this checks if it is part of a cycle and if it is it
* replaces the start node with the start of the cycle.
*
* This is useful because it prevents executing cycles partially, e.g. figuring
* our which run of the cycle has to be repeated etc.
*/
export function handleCycles(
graph: DirectedGraph,
startNodes: Set<INode>,
trigger: INode,
): Set<INode> {
// Strongly connected components can also be nodes that are not part of a
// cycle. They form a strongly connected component of one. E.g the trigger is
// always a strongly connected component by itself because it does not have
// any inputs and thus cannot build a cycle.
//
// We're not interested in them so we filter them out.
const cycles = graph.getStronglyConnectedComponents().filter((cycle) => cycle.size >= 1);
const newStartNodes: Set<INode> = new Set(startNodes);
// For each start node, check if the node is part of a cycle and if it is
// replace the start node with the start of the cycle.
if (cycles.length === 0) {
return newStartNodes;
}
for (const startNode of startNodes) {
for (const cycle of cycles) {
const isPartOfCycle = cycle.has(startNode);
if (isPartOfCycle) {
const firstNode = graph.depthFirstSearch({
from: trigger,
fn: (node) => cycle.has(node),
});
a.ok(
firstNode,
"the trigger must be connected to the cycle, otherwise the cycle wouldn't be part of the subgraph",
);
newStartNodes.delete(startNode);
newStartNodes.add(firstNode);
}
}
}
return newStartNodes;
}

View file

@ -2,5 +2,6 @@ export { DirectedGraph } from './DirectedGraph';
export { findTriggerForPartialExecution } from './findTriggerForPartialExecution';
export { findStartNodes } from './findStartNodes';
export { findSubgraph } from './findSubgraph';
export { findCycles } from './findCycles';
export { recreateNodeExecutionStack } from './recreateNodeExecutionStack';
export { cleanRunData } from './cleanRunData';
export { handleCycles } from './handleCycles';

View file

@ -32,7 +32,7 @@ import { getSourceDataGroups } from './getSourceDataGroups';
*/
export function recreateNodeExecutionStack(
graph: DirectedGraph,
startNodes: INode[],
startNodes: Set<INode>,
destinationNode: INode,
runData: IRunData,
pinData: IPinData,

View file

@ -51,13 +51,13 @@ import PCancelable from 'p-cancelable';
import * as NodeExecuteFunctions from './NodeExecuteFunctions';
import {
DirectedGraph,
findCycles,
findStartNodes,
findSubgraph,
findTriggerForPartialExecution,
cleanRunData,
recreateNodeExecutionStack,
handleCycles,
} from './PartialExecutionUtils';
import { cleanRunData } from './PartialExecutionUtils/cleanRunData';
import { recreateNodeExecutionStack } from './PartialExecutionUtils/recreateNodeExecutionStack';
export class WorkflowExecute {
private status: ExecutionStatus = 'new';
@ -352,15 +352,11 @@ export class WorkflowExecute {
const filteredNodes = subgraph.getNodes();
// 3. Find the Start Nodes
const startNodes = findStartNodes({ graph: subgraph, trigger, destination, runData });
let startNodes = findStartNodes({ graph: subgraph, trigger, destination, runData });
// 4. Detect Cycles
const cycles = findCycles(workflow);
// 5. Handle Cycles
if (cycles.length) {
// TODO: handle
}
startNodes = handleCycles(graph, startNodes, trigger);
// 6. Clean Run Data
const newRunData: IRunData = cleanRunData(runData, graph, startNodes);