refactor(core): Extract disabled node filtering out of findSubgraph (#11941)

This commit is contained in:
Danny Martini 2024-11-29 12:58:53 +01:00 committed by GitHub
parent c5d6a74623
commit 40a41dd192
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 224 additions and 111 deletions

View file

@ -470,6 +470,12 @@ export class DirectedGraph {
return graph;
}
clone() {
return new DirectedGraph()
.addNodes(...this.getNodes().values())
.addConnections(...this.getConnections().values());
}
private toIConnections() {
const result: IConnections = {};

View file

@ -0,0 +1,124 @@
// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/
// If you update the tests, please update the diagrams as well.
// If you add a test, please create a new diagram.
//
// Map
// 0 means the output has no run data
// 1 means the output has run data
// ►► denotes the node that the user wants to execute to
// XX denotes that the node is disabled
// PD denotes that the node has pinned data
import { NodeConnectionType } from 'n8n-workflow';
import { createNodeData } from './helpers';
import { DirectedGraph } from '../DirectedGraph';
import { filterDisabledNodes } from '../filterDisabledNodes';
describe('filterDisabledNodes', () => {
// XX
// ┌───────┐ ┌────────┐ ►►
// │ ├────────►│ │ ┌───────────┐
// │trigger│ │disabled├─────►│destination│
// │ ├────────►│ │ └───────────┘
// └───────┘ └────────┘
// turns into
// ┌───────┐ ►►
// │ │ ┌───────────┐
// │trigger├─────►│destination│
// │ │ └───────────┘
// └───────┘
test('filter disabled nodes', () => {
const trigger = createNodeData({ name: 'trigger' });
const disabled = createNodeData({ name: 'disabled', disabled: true });
const destination = createNodeData({ name: 'destination' });
const graph = new DirectedGraph()
.addNodes(trigger, disabled, destination)
.addConnections({ from: trigger, to: disabled }, { from: disabled, to: destination });
const subgraph = filterDisabledNodes(graph);
expect(subgraph).toEqual(
new DirectedGraph()
.addNodes(trigger, destination)
.addConnections({ from: trigger, to: destination }),
);
});
// XX XX
// ┌───────┐ ┌─────┐ ┌─────┐ ┌───────────┐
// │trigger├────►│node1├────►│node2├────►│destination│
// └───────┘ └─────┘ └─────┘ └───────────┘
// turns into
// ┌───────┐ ┌───────────┐
// │trigger├────►│destination│
// └───────┘ └───────────┘
test('filter multiple disabled nodes in a row', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const disabledNode1 = createNodeData({ name: 'disabledNode1', disabled: true });
const disabledNode2 = createNodeData({ name: 'disabledNode2', disabled: true });
const destination = createNodeData({ name: 'destination' });
const graph = new DirectedGraph()
.addNodes(trigger, disabledNode1, disabledNode2, destination)
.addConnections(
{ from: trigger, to: disabledNode1 },
{ from: disabledNode1, to: disabledNode2 },
{ from: disabledNode2, to: destination },
);
// ACT
const subgraph = filterDisabledNodes(graph);
// ASSERT
expect(subgraph).toEqual(
new DirectedGraph()
.addNodes(trigger, destination)
.addConnections({ from: trigger, to: destination }),
);
});
describe('root nodes', () => {
// XX
// ┌───────┐ ┌────┐ ┌───────────┐
// │trigger├───►root├───►destination│
// └───────┘ └──▲─┘ └───────────┘
// │AiLanguageModel
// ┌┴──────┐
// │aiModel│
// └───────┘
// turns into
// ┌───────┐ ┌───────────┐
// │trigger├────────────►destination│
// └───────┘ └───────────┘
test('filter disabled root nodes', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const root = createNodeData({ name: 'root', disabled: true });
const aiModel = createNodeData({ name: 'ai_model' });
const destination = createNodeData({ name: 'destination' });
const graph = new DirectedGraph()
.addNodes(trigger, root, aiModel, destination)
.addConnections(
{ from: trigger, to: root },
{ from: aiModel, type: NodeConnectionType.AiLanguageModel, to: root },
{ from: root, to: destination },
);
// ACT
const subgraph = filterDisabledNodes(graph);
// ASSERT
expect(subgraph).toEqual(
new DirectedGraph()
// The model is still in the graph, but orphaned. This is ok for
// partial executions as findSubgraph will remove orphaned nodes.
.addNodes(trigger, destination, aiModel)
.addConnections({ from: trigger, to: destination }),
);
});
});
});

View file

@ -79,70 +79,6 @@ describe('findSubgraph', () => {
);
});
// XX
// ┌───────┐ ┌────────┐ ►►
// │ ├────────►│ │ ┌───────────┐
// │trigger│ │disabled├─────►│destination│
// │ ├────────►│ │ └───────────┘
// └───────┘ └────────┘
// turns into
// ┌───────┐ ►►
// │ │ ┌───────────┐
// │trigger├─────►│destination│
// │ │ └───────────┘
// └───────┘
test('skip disabled nodes', () => {
const trigger = createNodeData({ name: 'trigger' });
const disabled = createNodeData({ name: 'disabled', disabled: true });
const destination = createNodeData({ name: 'destination' });
const graph = new DirectedGraph()
.addNodes(trigger, disabled, destination)
.addConnections({ from: trigger, to: disabled }, { from: disabled, to: destination });
const subgraph = findSubgraph({ graph, destination, trigger });
expect(subgraph).toEqual(
new DirectedGraph()
.addNodes(trigger, destination)
.addConnections({ from: trigger, to: destination }),
);
});
// XX XX
// ┌───────┐ ┌─────┐ ┌─────┐ ┌───────────┐
// │trigger├────►│node1├────►│node2├────►│destination│
// └───────┘ └─────┘ └─────┘ └───────────┘
// turns into
// ┌───────┐ ┌───────────┐
// │trigger├────►│destination│
// └───────┘ └───────────┘
test('skip multiple disabled nodes', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const disabledNode1 = createNodeData({ name: 'disabledNode1', disabled: true });
const disabledNode2 = createNodeData({ name: 'disabledNode2', disabled: true });
const destination = createNodeData({ name: 'destination' });
const graph = new DirectedGraph()
.addNodes(trigger, disabledNode1, disabledNode2, destination)
.addConnections(
{ from: trigger, to: disabledNode1 },
{ from: disabledNode1, to: disabledNode2 },
{ from: disabledNode2, to: destination },
);
// ACT
const subgraph = findSubgraph({ graph, destination, trigger });
// ASSERT
expect(subgraph).toEqual(
new DirectedGraph()
.addNodes(trigger, destination)
.addConnections({ from: trigger, to: destination }),
);
});
// ►►
// ┌───────┐ ┌─────┐ ┌─────┐
// │Trigger├───┬──►│Node1├───┬─►│Node2│
@ -291,36 +227,29 @@ describe('findSubgraph', () => {
expect(subgraph.getNodes().size).toBe(0);
});
// ┌───────┐ ┌───────────┐
// │trigger├────────────►destination│
// └───────┘ └───────────┘
//
// XX
// ┌───────┐ ┌────┐ ┌───────────┐
// │trigger├───►root├───►destination│
// └───────┘ └──▲─┘ └───────────┘
// │AiLanguageModel
// ┌┴──────┐
// ┌───────┐
// │aiModel│
// └───────┘
// turns into
// ┌───────┐ ┌───────────┐
// │trigger├────────────►destination│
// └───────┘ └───────────┘
test('skip disabled root nodes', () => {
test('remove orphaned nodes', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const root = createNodeData({ name: 'root', disabled: true });
const aiModel = createNodeData({ name: 'ai_model' });
const destination = createNodeData({ name: 'destination' });
const graph = new DirectedGraph()
.addNodes(trigger, root, aiModel, destination)
.addConnections(
{ from: trigger, to: root },
{ from: aiModel, type: NodeConnectionType.AiLanguageModel, to: root },
{ from: root, to: destination },
);
.addNodes(trigger, aiModel, destination)
.addConnections({ from: trigger, to: destination });
// ACT
const subgraph = findSubgraph({ graph, destination: root, trigger });
const subgraph = findSubgraph({ graph, destination, trigger });
// ASSERT
expect(subgraph).toEqual(

View file

@ -0,0 +1,18 @@
import { NodeConnectionType } from 'n8n-workflow';
import type { DirectedGraph } from './DirectedGraph';
export function filterDisabledNodes(graph: DirectedGraph): DirectedGraph {
const filteredGraph = graph.clone();
for (const node of filteredGraph.getNodes().values()) {
if (node.disabled) {
filteredGraph.removeNode(node, {
reconnectConnections: true,
skipConnectionFn: (c) => c.type !== NodeConnectionType.Main,
});
}
}
return filteredGraph;
}

View file

@ -21,7 +21,7 @@ function findSubgraphRecursive(
return;
}
let parentConnections = graph.getDirectParentConnections(current);
const parentConnections = graph.getDirectParentConnections(current);
// If the current node has no parents, dont keep this branch.
if (parentConnections.length === 0) {
@ -46,27 +46,6 @@ function findSubgraphRecursive(
return;
}
// If the current node is disabled, dont keep this node, but keep the
// branch.
// Take every incoming connection and connect it to every node that is
// connected to the current nodes first output
if (current.disabled) {
// The last segment on the current branch is still pointing to the removed
// node, so let's remove it.
currentBranch.pop();
// The node is replaced by a set of new connections, connecting the parents
// and children of it directly. In the recursive call below we'll follow
// them further.
parentConnections = graph.removeNode(current, {
reconnectConnections: true,
// If the node has non-Main connections we don't want to rewire those.
// Otherwise we'd end up connecting AI utilities to nodes that don't
// support them.
skipConnectionFn: (c) => c.type !== NodeConnectionType.Main,
});
}
// Recurse on each parent.
for (const parentConnection of parentConnections) {
// Skip parents that are connected via non-Main connection types. They are
@ -84,8 +63,7 @@ function findSubgraphRecursive(
}
/**
* Find all nodes that can lead from the trigger to the destination node,
* ignoring disabled nodes.
* Find all nodes that can lead from the trigger to the destination node.
*
* The algorithm is:
* Start with Destination Node
@ -95,12 +73,8 @@ function findSubgraphRecursive(
* 3. if the current node is the destination node again, dont keep this
* branch
* 4. if the current node was already visited, keep this branch
* 5. if the current node is disabled, dont keep this node, but keep the
* branch
* - take every incoming connection and connect it to every node that is
* connected to the current nodes first output
* 6. Recurse on each parent
* 7. Re-add all connections that don't use the `Main` connections type.
* 5. Recurse on each parent
* 6. Re-add all connections that don't use the `Main` connections type.
* Theses are used by nodes called root nodes and they are not part of the
* dataflow in the graph they are utility nodes, like the AI model used in a
* lang chain node.

View file

@ -5,3 +5,4 @@ export { findSubgraph } from './findSubgraph';
export { recreateNodeExecutionStack } from './recreateNodeExecutionStack';
export { cleanRunData } from './cleanRunData';
export { handleCycles } from './handleCycles';
export { filterDisabledNodes } from './filterDisabledNodes';

View file

@ -59,6 +59,7 @@ import {
cleanRunData,
recreateNodeExecutionStack,
handleCycles,
filterDisabledNodes,
} from './PartialExecutionUtils';
export class WorkflowExecute {
@ -347,7 +348,7 @@ export class WorkflowExecute {
// 2. Find the Subgraph
const graph = DirectedGraph.fromWorkflow(workflow);
const subgraph = findSubgraph({ graph, destination, trigger });
const subgraph = findSubgraph({ graph: filterDisabledNodes(graph), destination, trigger });
const filteredNodes = subgraph.getNodes();
// 3. Find the Start Nodes

View file

@ -1,3 +1,14 @@
// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/
// If you update the tests, please update the diagrams as well.
// If you add a test, please create a new diagram.
//
// Map
// 0 means the output has no run data
// 1 means the output has run data
// ►► denotes the node that the user wants to execute to
// XX denotes that the node is disabled
// PD denotes that the node has pinned data
import type { IPinData, IRun, IRunData, WorkflowTestData } from 'n8n-workflow';
import {
ApplicationError,
@ -247,6 +258,7 @@ describe('WorkflowExecute', () => {
[node2.name]: [toITaskData([{ data: { name: node2.name } }])],
};
const dirtyNodeNames = [node1.name];
const destinationNode = node2.name;
jest.spyOn(workflowExecute, 'processRunExecutionData').mockImplementationOnce(jest.fn());
@ -256,7 +268,7 @@ describe('WorkflowExecute', () => {
runData,
pinData,
dirtyNodeNames,
'node2',
destinationNode,
);
// ASSERT
@ -264,5 +276,53 @@ describe('WorkflowExecute', () => {
expect(fullRunData.data.resultData.runData).toHaveProperty(trigger.name);
expect(fullRunData.data.resultData.runData).not.toHaveProperty(node1.name);
});
// XX ►►
// ┌───────┐1 ┌─────┐1 ┌─────┐
// │trigger├──────►node1├──────►node2│
// └───────┘ └─────┘ └─────┘
test('removes disabled nodes from the workflow', async () => {
// ARRANGE
const waitPromise = createDeferredPromise<IRun>();
const nodeExecutionOrder: string[] = [];
const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise, nodeExecutionOrder);
const workflowExecute = new WorkflowExecute(additionalData, 'manual');
const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' });
const node1 = createNodeData({ name: 'node1', disabled: true });
const node2 = createNodeData({ name: 'node2' });
const workflow = new DirectedGraph()
.addNodes(trigger, node1, node2)
.addConnections({ from: trigger, to: node1 }, { from: node1, to: node2 })
.toWorkflow({ name: '', active: false, nodeTypes });
const pinData: IPinData = {};
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { name: trigger.name } }])],
[node1.name]: [toITaskData([{ data: { name: node1.name } }])],
[node2.name]: [toITaskData([{ data: { name: node2.name } }])],
};
const dirtyNodeNames: string[] = [];
const destinationNode = node2.name;
const processRunExecutionDataSpy = jest
.spyOn(workflowExecute, 'processRunExecutionData')
.mockImplementationOnce(jest.fn());
// ACT
await workflowExecute.runPartialWorkflow2(
workflow,
runData,
pinData,
dirtyNodeNames,
destinationNode,
);
// ASSERT
expect(processRunExecutionDataSpy).toHaveBeenCalledTimes(1);
const nodes = Object.keys(processRunExecutionDataSpy.mock.calls[0][0].nodes);
expect(nodes).toContain(trigger.name);
expect(nodes).toContain(node2.name);
expect(nodes).not.toContain(node1.name);
});
});
});