fix(core): Clean run data correctly for the new partial execution flow (no-changelog) (#10951)

This commit is contained in:
Danny Martini 2024-09-25 10:34:00 +02:00 committed by GitHub
parent a6cfb3b0c5
commit 36c472ee0a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 191 additions and 3 deletions

View file

@ -125,6 +125,32 @@ export class DirectedGraph {
return directChildren;
}
private getChildrenRecursive(node: INode, children: Set<INode>) {
const directChildren = this.getDirectChildren(node);
for (const directChild of directChildren) {
// Break out if we found a cycle.
if (children.has(directChild.to)) {
continue;
}
children.add(directChild.to);
this.getChildrenRecursive(directChild.to, children);
}
return children;
}
/**
* Returns all nodes that are children of the node that is passed as an
* argument.
*
* If the node being passed in is a child of itself (e.g. is part of a
* cylce), the return set will contain it as well.
*/
getChildren(node: INode) {
return this.getChildrenRecursive(node, new Set());
}
getDirectParents(node: INode) {
const nodeExists = this.nodes.get(node.name) === node;
a.ok(nodeExists);

View file

@ -38,4 +38,52 @@ describe('DirectedGraph', () => {
graph,
);
});
describe('getChildren', () => {
// ┌─────┐ ┌─────┐ ┌─────┐
// │node1├───►│node2├──►│node3│
// └─────┘ └─────┘ └─────┘
test('returns all children', () => {
// ARRANGE
const node1 = createNodeData({ name: 'Node1' });
const node2 = createNodeData({ name: 'Node2' });
const node3 = createNodeData({ name: 'Node3' });
const graph = new DirectedGraph()
.addNodes(node1, node2, node3)
.addConnections({ from: node1, to: node2 }, { from: node2, to: node3 });
// ACT
const children = graph.getChildren(node1);
// ASSERT
expect(children.size).toBe(2);
expect(children).toEqual(new Set([node2, node3]));
});
// ┌─────┐ ┌─────┐ ┌─────┐
// ┌─►│node1├───►│node2├──►│node3├─┐
// │ └─────┘ └─────┘ └─────┘ │
// │ │
// └───────────────────────────────┘
test('terminates when finding a cycle', () => {
// ARRANGE
const node1 = createNodeData({ name: 'Node1' });
const node2 = createNodeData({ name: 'Node2' });
const node3 = createNodeData({ name: 'Node3' });
const graph = new DirectedGraph()
.addNodes(node1, node2, node3)
.addConnections(
{ from: node1, to: node2 },
{ from: node2, to: node3 },
{ from: node3, to: node1 },
);
// ACT
const children = graph.getChildren(node1);
// ASSERT
expect(children.size).toBe(3);
expect(children).toEqual(new Set([node1, node2, node3]));
});
});
});

View file

@ -0,0 +1,86 @@
import type { IRunData } from 'n8n-workflow';
import { cleanRunData } from '../cleanRunData';
import { DirectedGraph } from '../DirectedGraph';
import { createNodeData, toITaskData } from './helpers';
describe('cleanRunData', () => {
// ┌─────┐ ┌─────┐ ┌─────┐
// │node1├───►│node2├──►│node3│
// └─────┘ └─────┘ └─────┘
test('deletes all run data of all children and the node being passed in', () => {
// ARRANGE
const node1 = createNodeData({ name: 'Node1' });
const node2 = createNodeData({ name: 'Node2' });
const node3 = createNodeData({ name: 'Node3' });
const graph = new DirectedGraph()
.addNodes(node1, node2, node3)
.addConnections({ from: node1, to: node2 }, { from: node2, to: node3 });
const runData: IRunData = {
[node1.name]: [toITaskData([{ data: { value: 1 } }])],
[node2.name]: [toITaskData([{ data: { value: 2 } }])],
[node3.name]: [toITaskData([{ data: { value: 3 } }])],
};
// ACT
const newRunData = cleanRunData(runData, graph, [node1]);
// ASSERT
expect(newRunData).toEqual({});
});
// ┌─────┐ ┌─────┐ ┌─────┐
// │node1├───►│node2├──►│node3│
// └─────┘ └─────┘ └─────┘
test('retains the run data of parent nodes of the node being passed in', () => {
// ARRANGE
const node1 = createNodeData({ name: 'Node1' });
const node2 = createNodeData({ name: 'Node2' });
const node3 = createNodeData({ name: 'Node3' });
const graph = new DirectedGraph()
.addNodes(node1, node2, node3)
.addConnections({ from: node1, to: node2 }, { from: node2, to: node3 });
const runData: IRunData = {
[node1.name]: [toITaskData([{ data: { value: 1 } }])],
[node2.name]: [toITaskData([{ data: { value: 2 } }])],
[node3.name]: [toITaskData([{ data: { value: 3 } }])],
};
// ACT
const newRunData = cleanRunData(runData, graph, [node2]);
// ASSERT
expect(newRunData).toEqual({ [node1.name]: runData[node1.name] });
});
// ┌─────┐ ┌─────┐ ┌─────┐
// ┌─►│node1├───►│node2├──►│node3├─┐
// │ └─────┘ └─────┘ └─────┘ │
// │ │
// └───────────────────────────────┘
test('terminates when finding a cycle', () => {
// ARRANGE
const node1 = createNodeData({ name: 'Node1' });
const node2 = createNodeData({ name: 'Node2' });
const node3 = createNodeData({ name: 'Node3' });
const graph = new DirectedGraph()
.addNodes(node1, node2, node3)
.addConnections(
{ from: node1, to: node2 },
{ from: node2, to: node3 },
{ from: node3, to: node1 },
);
const runData: IRunData = {
[node1.name]: [toITaskData([{ data: { value: 1 } }])],
[node2.name]: [toITaskData([{ data: { value: 2 } }])],
[node3.name]: [toITaskData([{ data: { value: 3 } }])],
};
// ACT
const newRunData = cleanRunData(runData, graph, [node2]);
// ASSERT
// TODO: Find out if this is a desirable result in milestone 2
expect(newRunData).toEqual({});
});
});

View file

@ -0,0 +1,26 @@
import type { INode, IRunData } from 'n8n-workflow';
import type { DirectedGraph } from './DirectedGraph';
/**
* Returns new run data that does not contain data for any node that is a child
* of any start node.
* This does not mutate the `runData` being passed in.
*/
export function cleanRunData(
runData: IRunData,
graph: DirectedGraph,
startNodes: INode[],
): IRunData {
const newRunData: IRunData = { ...runData };
for (const startNode of startNodes) {
delete newRunData[startNode.name];
const children = graph.getChildren(startNode);
for (const child of children) {
delete newRunData[child.name];
}
}
return newRunData;
}

View file

@ -58,6 +58,7 @@ import {
findSubgraph,
findTriggerForPartialExecution,
} from './PartialExecutionUtils';
import { cleanRunData } from './PartialExecutionUtils/cleanRunData';
export class WorkflowExecute {
private status: ExecutionStatus = 'new';
@ -347,7 +348,8 @@ export class WorkflowExecute {
}
// 2. Find the Subgraph
const subgraph = findSubgraph(DirectedGraph.fromWorkflow(workflow), destinationNode, trigger);
const graph = DirectedGraph.fromWorkflow(workflow);
const subgraph = findSubgraph(graph, destinationNode, trigger);
const filteredNodes = subgraph.getNodes();
// 3. Find the Start Nodes
@ -362,7 +364,7 @@ export class WorkflowExecute {
}
// 6. Clean Run Data
// TODO:
const newRunData: IRunData = cleanRunData(runData, graph, startNodes);
// 7. Recreate Execution Stack
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
@ -376,7 +378,7 @@ export class WorkflowExecute {
runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name),
},
resultData: {
runData,
runData: newRunData,
pinData,
},
executionData: {