fix(core): Remove run data of nodes unrelated to the current partial execution (#12099)

This commit is contained in:
Danny Martini 2024-12-09 13:15:17 +01:00 committed by GitHub
parent 516f3b7b4b
commit c4e4d37a87
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 127 additions and 18 deletions

View file

@ -42,6 +42,10 @@ export class DirectedGraph {
private connections: Map<DirectedGraphKey, GraphConnection> = new Map();
hasNode(nodeName: string) {
return this.nodes.has(nodeName);
}
getNodes() {
return new Map(this.nodes.entries());
}

View file

@ -43,8 +43,8 @@ describe('DirectedGraph', () => {
});
// ┌─────┐ ┌─────┐──► null
// │node1├───►│node2| ┌─────┐
// └─────┘ └─────┘──►│node3|
// │node1├───►│node2 ┌─────┐
// └─────┘ └─────┘──►│node3
// └─────┘
//
test('linear workflow with null connections', () => {
@ -472,4 +472,24 @@ describe('DirectedGraph', () => {
expect(graph).toEqual(expectedGraph);
});
});
describe('hasNode', () => {
test("returns node if it's part of the graph", () => {
// ARRANGE
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph().addNodes(node);
// ACT & ASSERT
expect(graph.hasNode(node.name)).toBe(true);
});
test('returns undefined if there is no node with that name in the graph', () => {
// ARRANGE
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph().addNodes(node);
// ACT & ASSERT
expect(graph.hasNode(node.name + 'foo')).toBe(false);
});
});
});

View file

@ -84,4 +84,31 @@ describe('cleanRunData', () => {
// TODO: Find out if this is a desirable result in milestone 2
expect(newRunData).toEqual({});
});
// ┌─────┐ ┌─────┐
// │node1├───►│node2│
// └─────┘ └─────┘
test('removes run data of nodes that are not in the subgraph', () => {
// ARRANGE
const node1 = createNodeData({ name: 'Node1' });
const node2 = createNodeData({ name: 'Node2' });
const graph = new DirectedGraph()
.addNodes(node1, node2)
.addConnections({ from: node1, to: node2 });
// not part of the graph
const node3 = createNodeData({ name: 'Node3' });
const runData: IRunData = {
[node1.name]: [toITaskData([{ data: { value: 1 } }])],
[node2.name]: [toITaskData([{ data: { value: 2 } }])],
[node3.name]: [toITaskData([{ data: { value: 3 } }])],
};
// ACT
const newRunData = cleanRunData(runData, graph, new Set([node2]));
// ASSERT
expect(newRunData).toEqual({
[node1.name]: [toITaskData([{ data: { value: 1 } }])],
});
});
});

View file

@ -23,5 +23,13 @@ export function cleanRunData(
}
}
// Remove run data for all nodes that are not part of the subgraph
for (const nodeName of Object.keys(newRunData)) {
if (!graph.hasNode(nodeName)) {
// remove run data for node that is not part of the graph
delete newRunData[nodeName];
}
}
return newRunData;
}

View file

@ -354,13 +354,13 @@ export class WorkflowExecute {
}
// 2. Find the Subgraph
const graph = DirectedGraph.fromWorkflow(workflow);
const subgraph = findSubgraph({ graph: filterDisabledNodes(graph), destination, trigger });
const filteredNodes = subgraph.getNodes();
let graph = DirectedGraph.fromWorkflow(workflow);
graph = findSubgraph({ graph: filterDisabledNodes(graph), destination, trigger });
const filteredNodes = graph.getNodes();
// 3. Find the Start Nodes
runData = omit(runData, dirtyNodeNames);
let startNodes = findStartNodes({ graph: subgraph, trigger, destination, runData, pinData });
let startNodes = findStartNodes({ graph, trigger, destination, runData, pinData });
// 4. Detect Cycles
// 5. Handle Cycles
@ -371,7 +371,7 @@ export class WorkflowExecute {
// 7. Recreate Execution Stack
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(subgraph, new Set(startNodes), runData, pinData ?? {});
recreateNodeExecutionStack(graph, new Set(startNodes), runData, pinData ?? {});
// 8. Execute
this.status = 'running';
@ -393,7 +393,7 @@ export class WorkflowExecute {
},
};
return this.processRunExecutionData(subgraph.toWorkflow({ ...workflow }));
return this.processRunExecutionData(graph.toWorkflow({ ...workflow }));
}
/**

View file

@ -327,16 +327,16 @@ describe('WorkflowExecute', () => {
expect(nodes).not.toContain(node1.name);
});
// ►►
// ┌────┐0 ┌─────────┐
//┌───────┐1 │ ├──────►afterLoop│
//│trigger├───┬──►loop│1 └─────────┘
//└───────┘ │ │ ├─┐
// │ └────┘ │
// │ │ ┌──────┐1
// │ └─►inLoop├─┐
// │ └──────┘ │
// └────────────────────┘
// ►►
// ┌────┐0 ┌─────────┐
// ┌───────┐1 │ ├──────►afterLoop│
// │trigger├───┬──►loop│1 └─────────┘
// └───────┘ │ │ ├─┐
// │ └────┘ │
// │ │ ┌──────┐1
// │ └─►inLoop├─┐
// │ └──────┘ │
// └────────────────────┘
test('passes filtered run data to `recreateNodeExecutionStack`', async () => {
// ARRANGE
const waitPromise = createDeferredPromise<IRun>();
@ -393,5 +393,55 @@ describe('WorkflowExecute', () => {
expect.any(Object),
);
});
// ┌───────┐ ┌─────┐
// │trigger├┬──►│node1│
// └───────┘│ └─────┘
// │ ┌─────┐
// └──►│node2│
// └─────┘
test('passes subgraph to `cleanRunData`', async () => {
// ARRANGE
const waitPromise = createDeferredPromise<IRun>();
const nodeExecutionOrder: string[] = [];
const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise, nodeExecutionOrder);
const workflowExecute = new WorkflowExecute(additionalData, 'manual');
const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const workflow = new DirectedGraph()
.addNodes(trigger, node1, node2)
.addConnections({ from: trigger, to: node1 }, { from: trigger, to: node2 })
.toWorkflow({ name: '', active: false, nodeTypes });
const pinData: IPinData = {};
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
[node1.name]: [toITaskData([{ data: { nodeName: node1.name } }])],
[node2.name]: [toITaskData([{ data: { nodeName: node2.name } }])],
};
const dirtyNodeNames: string[] = [];
jest.spyOn(workflowExecute, 'processRunExecutionData').mockImplementationOnce(jest.fn());
const cleanRunDataSpy = jest.spyOn(partialExecutionUtils, 'cleanRunData');
// ACT
await workflowExecute.runPartialWorkflow2(
workflow,
runData,
pinData,
dirtyNodeNames,
node1.name,
);
// ASSERT
expect(cleanRunDataSpy).toHaveBeenNthCalledWith(
1,
runData,
new DirectedGraph().addNodes(trigger, node1).addConnections({ from: trigger, to: node1 }),
new Set([node1]),
);
});
});
});