diff --git a/packages/core/src/PartialExecutionUtils/DirectedGraph.ts b/packages/core/src/PartialExecutionUtils/DirectedGraph.ts index e8695743a1..63d95367ab 100644 --- a/packages/core/src/PartialExecutionUtils/DirectedGraph.ts +++ b/packages/core/src/PartialExecutionUtils/DirectedGraph.ts @@ -42,6 +42,10 @@ export class DirectedGraph { private connections: Map = new Map(); + hasNode(nodeName: string) { + return this.nodes.has(nodeName); + } + getNodes() { return new Map(this.nodes.entries()); } diff --git a/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts index c8feb20e11..5d769004a5 100644 --- a/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts +++ b/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts @@ -43,8 +43,8 @@ describe('DirectedGraph', () => { }); // ┌─────┐ ┌─────┐──► null - // │node1├───►│node2| ┌─────┐ - // └─────┘ └─────┘──►│node3| + // │node1├───►│node2│ ┌─────┐ + // └─────┘ └─────┘──►│node3│ // └─────┘ // test('linear workflow with null connections', () => { @@ -472,4 +472,24 @@ describe('DirectedGraph', () => { expect(graph).toEqual(expectedGraph); }); }); + + describe('hasNode', () => { + test("returns node if it's part of the graph", () => { + // ARRANGE + const node = createNodeData({ name: 'node' }); + const graph = new DirectedGraph().addNodes(node); + + // ACT & ASSERT + expect(graph.hasNode(node.name)).toBe(true); + }); + + test('returns undefined if there is no node with that name in the graph', () => { + // ARRANGE + const node = createNodeData({ name: 'node' }); + const graph = new DirectedGraph().addNodes(node); + + // ACT & ASSERT + expect(graph.hasNode(node.name + 'foo')).toBe(false); + }); + }); }); diff --git a/packages/core/src/PartialExecutionUtils/__tests__/cleanRunData.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/cleanRunData.test.ts index 5daea46ef6..959ab78845 100644 --- a/packages/core/src/PartialExecutionUtils/__tests__/cleanRunData.test.ts +++ b/packages/core/src/PartialExecutionUtils/__tests__/cleanRunData.test.ts @@ -84,4 +84,31 @@ describe('cleanRunData', () => { // TODO: Find out if this is a desirable result in milestone 2 expect(newRunData).toEqual({}); }); + + // ┌─────┐ ┌─────┐ + // │node1├───►│node2│ + // └─────┘ └─────┘ + test('removes run data of nodes that are not in the subgraph', () => { + // ARRANGE + const node1 = createNodeData({ name: 'Node1' }); + const node2 = createNodeData({ name: 'Node2' }); + const graph = new DirectedGraph() + .addNodes(node1, node2) + .addConnections({ from: node1, to: node2 }); + // not part of the graph + const node3 = createNodeData({ name: 'Node3' }); + const runData: IRunData = { + [node1.name]: [toITaskData([{ data: { value: 1 } }])], + [node2.name]: [toITaskData([{ data: { value: 2 } }])], + [node3.name]: [toITaskData([{ data: { value: 3 } }])], + }; + + // ACT + const newRunData = cleanRunData(runData, graph, new Set([node2])); + + // ASSERT + expect(newRunData).toEqual({ + [node1.name]: [toITaskData([{ data: { value: 1 } }])], + }); + }); }); diff --git a/packages/core/src/PartialExecutionUtils/cleanRunData.ts b/packages/core/src/PartialExecutionUtils/cleanRunData.ts index bcd60c423b..6ed5db6100 100644 --- a/packages/core/src/PartialExecutionUtils/cleanRunData.ts +++ b/packages/core/src/PartialExecutionUtils/cleanRunData.ts @@ -23,5 +23,13 @@ export function cleanRunData( } } + // Remove run data for all nodes that are not part of the subgraph + for (const nodeName of Object.keys(newRunData)) { + if (!graph.hasNode(nodeName)) { + // remove run data for node that is not part of the graph + delete newRunData[nodeName]; + } + } + return newRunData; } diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index be4e1d70f3..2990d61cf8 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -354,13 +354,13 @@ export class WorkflowExecute { } // 2. Find the Subgraph - const graph = DirectedGraph.fromWorkflow(workflow); - const subgraph = findSubgraph({ graph: filterDisabledNodes(graph), destination, trigger }); - const filteredNodes = subgraph.getNodes(); + let graph = DirectedGraph.fromWorkflow(workflow); + graph = findSubgraph({ graph: filterDisabledNodes(graph), destination, trigger }); + const filteredNodes = graph.getNodes(); // 3. Find the Start Nodes runData = omit(runData, dirtyNodeNames); - let startNodes = findStartNodes({ graph: subgraph, trigger, destination, runData, pinData }); + let startNodes = findStartNodes({ graph, trigger, destination, runData, pinData }); // 4. Detect Cycles // 5. Handle Cycles @@ -371,7 +371,7 @@ export class WorkflowExecute { // 7. Recreate Execution Stack const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = - recreateNodeExecutionStack(subgraph, new Set(startNodes), runData, pinData ?? {}); + recreateNodeExecutionStack(graph, new Set(startNodes), runData, pinData ?? {}); // 8. Execute this.status = 'running'; @@ -393,7 +393,7 @@ export class WorkflowExecute { }, }; - return this.processRunExecutionData(subgraph.toWorkflow({ ...workflow })); + return this.processRunExecutionData(graph.toWorkflow({ ...workflow })); } /** diff --git a/packages/core/test/WorkflowExecute.test.ts b/packages/core/test/WorkflowExecute.test.ts index cafc26269d..f6f9bd2cbf 100644 --- a/packages/core/test/WorkflowExecute.test.ts +++ b/packages/core/test/WorkflowExecute.test.ts @@ -327,16 +327,16 @@ describe('WorkflowExecute', () => { expect(nodes).not.toContain(node1.name); }); - // ►► - // ┌────┐0 ┌─────────┐ - //┌───────┐1 │ ├──────►afterLoop│ - //│trigger├───┬──►loop│1 └─────────┘ - //└───────┘ │ │ ├─┐ - // │ └────┘ │ - // │ │ ┌──────┐1 - // │ └─►inLoop├─┐ - // │ └──────┘ │ - // └────────────────────┘ + // ►► + // ┌────┐0 ┌─────────┐ + // ┌───────┐1 │ ├──────►afterLoop│ + // │trigger├───┬──►loop│1 └─────────┘ + // └───────┘ │ │ ├─┐ + // │ └────┘ │ + // │ │ ┌──────┐1 + // │ └─►inLoop├─┐ + // │ └──────┘ │ + // └────────────────────┘ test('passes filtered run data to `recreateNodeExecutionStack`', async () => { // ARRANGE const waitPromise = createDeferredPromise(); @@ -393,5 +393,55 @@ describe('WorkflowExecute', () => { expect.any(Object), ); }); + + // ┌───────┐ ┌─────┐ + // │trigger├┬──►│node1│ + // └───────┘│ └─────┘ + // │ ┌─────┐ + // └──►│node2│ + // └─────┘ + test('passes subgraph to `cleanRunData`', async () => { + // ARRANGE + const waitPromise = createDeferredPromise(); + const nodeExecutionOrder: string[] = []; + const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise, nodeExecutionOrder); + const workflowExecute = new WorkflowExecute(additionalData, 'manual'); + + const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + const workflow = new DirectedGraph() + .addNodes(trigger, node1, node2) + .addConnections({ from: trigger, to: node1 }, { from: trigger, to: node2 }) + .toWorkflow({ name: '', active: false, nodeTypes }); + + const pinData: IPinData = {}; + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { value: 1 } }])], + [node1.name]: [toITaskData([{ data: { nodeName: node1.name } }])], + [node2.name]: [toITaskData([{ data: { nodeName: node2.name } }])], + }; + const dirtyNodeNames: string[] = []; + + jest.spyOn(workflowExecute, 'processRunExecutionData').mockImplementationOnce(jest.fn()); + const cleanRunDataSpy = jest.spyOn(partialExecutionUtils, 'cleanRunData'); + + // ACT + await workflowExecute.runPartialWorkflow2( + workflow, + runData, + pinData, + dirtyNodeNames, + node1.name, + ); + + // ASSERT + expect(cleanRunDataSpy).toHaveBeenNthCalledWith( + 1, + runData, + new DirectedGraph().addNodes(trigger, node1).addConnections({ from: trigger, to: node1 }), + new Set([node1]), + ); + }); }); });