From 2a084f96f8cb69142cbfd935bb4b3ff1153ffda9 Mon Sep 17 00:00:00 2001 From: Danny Martini Date: Wed, 18 Sep 2024 15:06:36 +0200 Subject: [PATCH] feat: Implement new partial execution logic for acyclic workflows (no-changelog) (#10256) Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> --- cypress/e2e/19-execution.cy.ts | 10 +- cypress/e2e/28-debug.cy.ts | 2 +- .../e2e/30-editor-after-route-changes.cy.ts | 2 +- cypress/pages/workflow-executions-tab.ts | 2 +- cypress/utils/executions.ts | 2 +- packages/cli/src/config/schema.ts | 9 + packages/cli/src/workflow-runner.ts | 30 +- .../workflows/workflow-execution.service.ts | 2 + .../cli/src/workflows/workflow.request.ts | 7 +- .../cli/src/workflows/workflows.controller.ts | 3 + .../PartialExecutionUtils/DirectedGraph.ts | 230 +++++++++++ .../__tests__/DirectedGraph.test.ts | 41 ++ .../__tests__/findStartNodes.test.ts | 372 ++++++++++++++++++ .../__tests__/findSubgraph.test.ts | 185 +++++++++ .../__tests__/getSourceDataGroups.test.ts | 197 ++++++++++ .../__tests__/helpers.ts | 99 +++++ .../recreateNodeExecutionStack.test.ts | 333 ++++++++++++++++ .../__tests__/toIConnections.test.ts | 26 ++ .../__tests__/toITaskData.test.ts | 64 +++ .../src/PartialExecutionUtils/findCycles.ts | 6 + .../PartialExecutionUtils/findStartNodes.ts | 153 +++++++ .../src/PartialExecutionUtils/findSubgraph.ts | 113 ++++++ .../findTriggerForPartialExecution.ts | 61 +++ .../PartialExecutionUtils/getIncomingData.ts | 22 ++ .../getSourceDataGroups.ts | 115 ++++++ .../core/src/PartialExecutionUtils/index.ts | 6 + .../recreateNodeExecutionStack.ts | 180 +++++++++ packages/core/src/WorkflowExecute.ts | 86 ++++ .../src/composables/useRunWorkflow.ts | 9 +- .../editor-ui/src/stores/workflows.store.ts | 7 +- packages/workflow/src/Interfaces.ts | 13 + 31 files changed, 2367 insertions(+), 20 deletions(-) create mode 100644 packages/core/src/PartialExecutionUtils/DirectedGraph.ts create mode 100644 packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts create mode 100644 packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts create mode 100644 packages/core/src/PartialExecutionUtils/__tests__/findSubgraph.test.ts create mode 100644 packages/core/src/PartialExecutionUtils/__tests__/getSourceDataGroups.test.ts create mode 100644 packages/core/src/PartialExecutionUtils/__tests__/helpers.ts create mode 100644 packages/core/src/PartialExecutionUtils/__tests__/recreateNodeExecutionStack.test.ts create mode 100644 packages/core/src/PartialExecutionUtils/__tests__/toIConnections.test.ts create mode 100644 packages/core/src/PartialExecutionUtils/__tests__/toITaskData.test.ts create mode 100644 packages/core/src/PartialExecutionUtils/findCycles.ts create mode 100644 packages/core/src/PartialExecutionUtils/findStartNodes.ts create mode 100644 packages/core/src/PartialExecutionUtils/findSubgraph.ts create mode 100644 packages/core/src/PartialExecutionUtils/findTriggerForPartialExecution.ts create mode 100644 packages/core/src/PartialExecutionUtils/getIncomingData.ts create mode 100644 packages/core/src/PartialExecutionUtils/getSourceDataGroups.ts create mode 100644 packages/core/src/PartialExecutionUtils/index.ts create mode 100644 packages/core/src/PartialExecutionUtils/recreateNodeExecutionStack.ts diff --git a/cypress/e2e/19-execution.cy.ts b/cypress/e2e/19-execution.cy.ts index d6b8d08fd5..81e11b1b63 100644 --- a/cypress/e2e/19-execution.cy.ts +++ b/cypress/e2e/19-execution.cy.ts @@ -503,7 +503,7 @@ describe('Execution', () => { workflowPage.getters.clearExecutionDataButton().should('be.visible'); - cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun'); + cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun'); workflowPage.getters .canvasNodeByName('do something with them') @@ -525,7 +525,7 @@ describe('Execution', () => { workflowPage.getters.zoomToFitButton().click(); - cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun'); + cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun'); workflowPage.getters .canvasNodeByName('If') @@ -547,7 +547,7 @@ describe('Execution', () => { workflowPage.getters.clearExecutionDataButton().should('be.visible'); - cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun'); + cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun'); workflowPage.getters .canvasNodeByName('NoOp2') @@ -576,7 +576,7 @@ describe('Execution', () => { it('should successfully execute partial executions with nodes attached to the second output', () => { cy.createFixtureWorkflow('Test_Workflow_pairedItem_incomplete_manual_bug.json'); - cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun'); + cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun'); workflowPage.getters.zoomToFitButton().click(); workflowPage.getters.executeWorkflowButton().click(); @@ -596,7 +596,7 @@ describe('Execution', () => { it('should execute workflow partially up to the node that has issues', () => { cy.createFixtureWorkflow('Test_workflow_partial_execution_with_missing_credentials.json'); - cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun'); + cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun'); workflowPage.getters.zoomToFitButton().click(); workflowPage.getters.executeWorkflowButton().click(); diff --git a/cypress/e2e/28-debug.cy.ts b/cypress/e2e/28-debug.cy.ts index 5d2bd76cac..bc1f03c162 100644 --- a/cypress/e2e/28-debug.cy.ts +++ b/cypress/e2e/28-debug.cy.ts @@ -18,7 +18,7 @@ describe('Debug', () => { it('should be able to debug executions', () => { cy.intercept('GET', '/rest/executions?filter=*').as('getExecutions'); cy.intercept('GET', '/rest/executions/*').as('getExecution'); - cy.intercept('POST', '/rest/workflows/**/run').as('postWorkflowRun'); + cy.intercept('POST', '/rest/workflows/**/run?**').as('postWorkflowRun'); cy.signinAsOwner(); diff --git a/cypress/e2e/30-editor-after-route-changes.cy.ts b/cypress/e2e/30-editor-after-route-changes.cy.ts index 6598780676..f0381a32a2 100644 --- a/cypress/e2e/30-editor-after-route-changes.cy.ts +++ b/cypress/e2e/30-editor-after-route-changes.cy.ts @@ -142,7 +142,7 @@ describe('Editor actions should work', () => { it('after switching between Editor and Debug', () => { cy.intercept('GET', '/rest/executions?filter=*').as('getExecutions'); cy.intercept('GET', '/rest/executions/*').as('getExecution'); - cy.intercept('POST', '/rest/workflows/**/run').as('postWorkflowRun'); + cy.intercept('POST', '/rest/workflows/**/run?**').as('postWorkflowRun'); editWorkflowAndDeactivate(); workflowPage.actions.executeWorkflow(); diff --git a/cypress/pages/workflow-executions-tab.ts b/cypress/pages/workflow-executions-tab.ts index 93c9af86ff..5e8c36c055 100644 --- a/cypress/pages/workflow-executions-tab.ts +++ b/cypress/pages/workflow-executions-tab.ts @@ -35,7 +35,7 @@ export class WorkflowExecutionsTab extends BasePage { }, createManualExecutions: (count: number) => { for (let i = 0; i < count; i++) { - cy.intercept('POST', '/rest/workflows/**/run').as('workflowExecution'); + cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowExecution'); workflowPage.actions.executeWorkflow(); cy.wait('@workflowExecution'); } diff --git a/cypress/utils/executions.ts b/cypress/utils/executions.ts index eb0dbfc251..0b4814fdc9 100644 --- a/cypress/utils/executions.ts +++ b/cypress/utils/executions.ts @@ -89,7 +89,7 @@ export function runMockWorkflowExecution({ }) { const executionId = nanoid(8); - cy.intercept('POST', '/rest/workflows/**/run', { + cy.intercept('POST', '/rest/workflows/**/run?**', { statusCode: 201, body: { data: { diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 1f1132b630..0db300eaf0 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -640,4 +640,13 @@ export const schema = { env: 'N8N_PROXY_HOPS', doc: 'Number of reverse-proxies n8n is running behind', }, + + featureFlags: { + partialExecutionVersionDefault: { + format: String, + default: '0', + env: 'PARTIAL_EXECUTION_VERSION_DEFAULT', + doc: 'Set this to 1 to enable the new partial execution logic by default.', + }, + }, }; diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index aea8bfc15e..c27baa5ba1 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -109,7 +109,9 @@ export class WorkflowRunner { } } - /** Run the workflow */ + /** Run the workflow + * @param realtime This is used in queue mode to change the priority of an execution, making sure they are picked up quicker. + */ async run( data: IWorkflowExecutionDataProcess, loadStaticData?: boolean, @@ -278,6 +280,7 @@ export class WorkflowRunner { data.startNodes === undefined || data.startNodes.length === 0 ) { + // Full Execution this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, { executionId, }); @@ -294,16 +297,27 @@ export class WorkflowRunner { data.pinData, ); } else { + // Partial Execution this.logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId }); // Execute only the nodes between start and destination nodes const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); - workflowExecution = workflowExecute.runPartialWorkflow( - workflow, - data.runData, - data.startNodes, - data.destinationNode, - data.pinData, - ); + + if (data.partialExecutionVersion === '1') { + workflowExecution = workflowExecute.runPartialWorkflow2( + workflow, + data.runData, + data.destinationNode, + data.pinData, + ); + } else { + workflowExecution = workflowExecute.runPartialWorkflow( + workflow, + data.runData, + data.startNodes, + data.destinationNode, + data.pinData, + ); + } } this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); diff --git a/packages/cli/src/workflows/workflow-execution.service.ts b/packages/cli/src/workflows/workflow-execution.service.ts index 87a78f4b5e..4dc6d00f34 100644 --- a/packages/cli/src/workflows/workflow-execution.service.ts +++ b/packages/cli/src/workflows/workflow-execution.service.ts @@ -92,6 +92,7 @@ export class WorkflowExecutionService { { workflowData, runData, startNodes, destinationNode }: WorkflowRequest.ManualRunPayload, user: User, pushRef?: string, + partialExecutionVersion?: string, ) { const pinData = workflowData.pinData; const pinnedTrigger = this.selectPinnedActivatorStarter( @@ -135,6 +136,7 @@ export class WorkflowExecutionService { startNodes, workflowData, userId: user.id, + partialExecutionVersion: partialExecutionVersion ?? '0', }; const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name]; diff --git a/packages/cli/src/workflows/workflow.request.ts b/packages/cli/src/workflows/workflow.request.ts index d05b5c1dab..d45cfd14d3 100644 --- a/packages/cli/src/workflows/workflow.request.ts +++ b/packages/cli/src/workflows/workflow.request.ts @@ -43,7 +43,12 @@ export declare namespace WorkflowRequest { type NewName = AuthenticatedRequest<{}, {}, {}, { name?: string }>; - type ManualRun = AuthenticatedRequest<{ workflowId: string }, {}, ManualRunPayload>; + type ManualRun = AuthenticatedRequest< + { workflowId: string }, + {}, + ManualRunPayload, + { partialExecutionVersion?: string } + >; type Share = AuthenticatedRequest<{ workflowId: string }, {}, { shareWithIds: string[] }>; diff --git a/packages/cli/src/workflows/workflows.controller.ts b/packages/cli/src/workflows/workflows.controller.ts index 797797e386..57f46002c2 100644 --- a/packages/cli/src/workflows/workflows.controller.ts +++ b/packages/cli/src/workflows/workflows.controller.ts @@ -405,6 +405,9 @@ export class WorkflowsController { req.body, req.user, req.headers['push-ref'] as string, + req.query.partialExecutionVersion === '-1' + ? config.getEnv('featureFlags.partialExecutionVersionDefault') + : req.query.partialExecutionVersion, ); } diff --git a/packages/core/src/PartialExecutionUtils/DirectedGraph.ts b/packages/core/src/PartialExecutionUtils/DirectedGraph.ts new file mode 100644 index 0000000000..2485b895b0 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/DirectedGraph.ts @@ -0,0 +1,230 @@ +import * as a from 'assert'; +import type { IConnections, INode, WorkflowParameters } from 'n8n-workflow'; +import { NodeConnectionType, Workflow } from 'n8n-workflow'; + +export type GraphConnection = { + from: INode; + to: INode; + type: NodeConnectionType; + outputIndex: number; + inputIndex: number; +}; +// fromName-outputType-outputIndex-inputIndex-toName +type DirectedGraphKey = `${string}-${NodeConnectionType}-${number}-${number}-${string}`; + +/** + * Represents a directed graph as an adjacency list, e.g. one list for the + * vertices and one list for the edges. + * To integrate easier with the n8n codebase vertices are called nodes and + * edges are called connections. + * + * The reason why this exists next to the Workflow class is that the workflow + * class stored the graph in a deeply nested, normalized format. This format + * does not lend itself to editing the graph or build graphs incrementally. + * This closes this gap by having import and export functions: + * `fromWorkflow`, `toWorkflow`. + * + * Thus it allows to do something like this: + * ```ts + * const newWorkflow = DirectedGraph.fromWorkflow(workflow) + * .addNodes(node1, node2) + * .addConnection({ from: node1, to: node2 }) + * .toWorkflow(...workflow); + * ``` + */ +export class DirectedGraph { + private nodes: Map = new Map(); + + private connections: Map = new Map(); + + getNodes() { + return new Map(this.nodes.entries()); + } + + getConnections(filter: { to?: INode } = {}) { + const filteredCopy: GraphConnection[] = []; + + for (const connection of this.connections.values()) { + const toMatches = filter.to ? connection.to === filter.to : true; + + if (toMatches) { + filteredCopy.push(connection); + } + } + + return filteredCopy; + } + + addNode(node: INode) { + this.nodes.set(node.name, node); + return this; + } + + addNodes(...nodes: INode[]) { + for (const node of nodes) { + this.addNode(node); + } + return this; + } + + addConnection(connectionInput: { + from: INode; + to: INode; + type?: NodeConnectionType; + outputIndex?: number; + inputIndex?: number; + }) { + const { from, to } = connectionInput; + + const fromExists = this.nodes.get(from.name) === from; + const toExists = this.nodes.get(to.name) === to; + + a.ok(fromExists); + a.ok(toExists); + + const connection: GraphConnection = { + ...connectionInput, + type: connectionInput.type ?? NodeConnectionType.Main, + outputIndex: connectionInput.outputIndex ?? 0, + inputIndex: connectionInput.inputIndex ?? 0, + }; + + this.connections.set(this.makeKey(connection), connection); + return this; + } + + addConnections( + ...connectionInputs: Array<{ + from: INode; + to: INode; + type?: NodeConnectionType; + outputIndex?: number; + inputIndex?: number; + }> + ) { + for (const connectionInput of connectionInputs) { + this.addConnection(connectionInput); + } + return this; + } + + getDirectChildren(node: INode) { + const nodeExists = this.nodes.get(node.name) === node; + a.ok(nodeExists); + + const directChildren: GraphConnection[] = []; + + for (const connection of this.connections.values()) { + if (connection.from !== node) { + continue; + } + + directChildren.push(connection); + } + + return directChildren; + } + + getDirectParents(node: INode) { + const nodeExists = this.nodes.get(node.name) === node; + a.ok(nodeExists); + + const directParents: GraphConnection[] = []; + + for (const connection of this.connections.values()) { + if (connection.to !== node) { + continue; + } + + directParents.push(connection); + } + + return directParents; + } + + getConnection( + from: INode, + outputIndex: number, + type: NodeConnectionType, + inputIndex: number, + to: INode, + ): GraphConnection | undefined { + return this.connections.get( + this.makeKey({ + from, + outputIndex, + type, + inputIndex, + to, + }), + ); + } + + toWorkflow(parameters: Omit): Workflow { + return new Workflow({ + ...parameters, + nodes: [...this.nodes.values()], + connections: this.toIConnections(), + }); + } + + static fromWorkflow(workflow: Workflow): DirectedGraph { + const graph = new DirectedGraph(); + + graph.addNodes(...Object.values(workflow.nodes)); + + for (const [fromNodeName, iConnection] of Object.entries(workflow.connectionsBySourceNode)) { + const from = workflow.getNode(fromNodeName); + a.ok(from); + + for (const [outputType, outputs] of Object.entries(iConnection)) { + for (const [outputIndex, conns] of outputs.entries()) { + for (const conn of conns) { + // TODO: What's with the input type? + const { node: toNodeName, type: _inputType, index: inputIndex } = conn; + const to = workflow.getNode(toNodeName); + a.ok(to); + + graph.addConnection({ + from, + to, + // TODO: parse outputType instead of casting it + type: outputType as NodeConnectionType, + outputIndex, + inputIndex, + }); + } + } + } + } + + return graph; + } + + private toIConnections() { + const result: IConnections = {}; + + for (const connection of this.connections.values()) { + const { from, to, type, outputIndex, inputIndex } = connection; + + result[from.name] = result[from.name] ?? { + [type]: [], + }; + const resultConnection = result[from.name]; + resultConnection[type][outputIndex] = resultConnection[type][outputIndex] ?? []; + const group = resultConnection[type][outputIndex]; + + group.push({ + node: to.name, + type, + index: inputIndex, + }); + } + + return result; + } + + private makeKey(connection: GraphConnection): DirectedGraphKey { + return `${connection.from.name}-${connection.type}-${connection.outputIndex}-${connection.inputIndex}-${connection.to.name}`; + } +} diff --git a/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts new file mode 100644 index 0000000000..4049878eb2 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts @@ -0,0 +1,41 @@ +// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/ +// If you update the tests, please update the diagrams as well. +// If you add a test, please create a new diagram. +// +// Map +// 0 means the output has no run data +// 1 means the output has run data +// ►► denotes the node that the user wants to execute to +// XX denotes that the node is disabled +// PD denotes that the node has pinned data + +import { DirectedGraph } from '../DirectedGraph'; +import { createNodeData, defaultWorkflowParameter } from './helpers'; + +describe('DirectedGraph', () => { + // ┌─────┐ ┌─────┐ ┌─────┐ + // ┌─►│node1├───►│node2├──►│node3├─┐ + // │ └─────┘ └─────┘ └─────┘ │ + // │ │ + // └───────────────────────────────┘ + test('roundtrip', () => { + // ARRANGE + const node1 = createNodeData({ name: 'Node1' }); + const node2 = createNodeData({ name: 'Node2' }); + const node3 = createNodeData({ name: 'Node3' }); + + // ACT + const graph = new DirectedGraph() + .addNodes(node1, node2, node3) + .addConnections( + { from: node1, to: node2 }, + { from: node2, to: node3 }, + { from: node3, to: node1 }, + ); + + // ASSERT + expect(DirectedGraph.fromWorkflow(graph.toWorkflow({ ...defaultWorkflowParameter }))).toEqual( + graph, + ); + }); +}); diff --git a/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts new file mode 100644 index 0000000000..c830833d8d --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts @@ -0,0 +1,372 @@ +// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/ +// If you update the tests, please update the diagrams as well. +// If you add a test, please create a new diagram. +// +// Map +// 0 means the output has no run data +// 1 means the output has run data +// ►► denotes the node that the user wants to execute to +// XX denotes that the node is disabled +// PD denotes that the node has pinned data + +import { type IPinData, type IRunData } from 'n8n-workflow'; +import { createNodeData, toITaskData } from './helpers'; +import { findStartNodes, isDirty } from '../findStartNodes'; +import { DirectedGraph } from '../DirectedGraph'; + +describe('isDirty', () => { + test("if the node has pinned data it's not dirty", () => { + const node = createNodeData({ name: 'Basic Node' }); + + const pinData: IPinData = { + [node.name]: [{ json: { value: 1 } }], + }; + + expect(isDirty(node, undefined, pinData)).toBe(false); + }); + + test("if the node has run data it's not dirty", () => { + const node = createNodeData({ name: 'Basic Node' }); + + const runData: IRunData = { + [node.name]: [toITaskData([{ data: { value: 1 } }])], + }; + + expect(isDirty(node, runData)).toBe(false); + }); +}); + +describe('findStartNodes', () => { + // ►► + // ┌───────┐ + // │trigger│ + // └───────┘ + test('finds the start node if there is only a trigger', () => { + const node = createNodeData({ name: 'Basic Node' }); + const graph = new DirectedGraph().addNode(node); + + const startNodes = findStartNodes(graph, node, node); + + expect(startNodes).toHaveLength(1); + expect(startNodes[0]).toEqual(node); + }); + + // ►► + // ┌───────┐ ┌───────────┐ + // │trigger├────►│destination│ + // └───────┘ └───────────┘ + test('finds the start node in a simple graph', () => { + const trigger = createNodeData({ name: 'trigger' }); + const destination = createNodeData({ name: 'destination' }); + const graph = new DirectedGraph() + .addNodes(trigger, destination) + .addConnection({ from: trigger, to: destination }); + + // if the trigger has no run data + { + const startNodes = findStartNodes(graph, trigger, destination); + + expect(startNodes).toHaveLength(1); + expect(startNodes[0]).toEqual(trigger); + } + + // if the trigger has run data + { + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { value: 1 } }])], + }; + + const startNodes = findStartNodes(graph, trigger, destination, runData); + + expect(startNodes).toHaveLength(1); + expect(startNodes[0]).toEqual(destination); + } + }); + + // ┌───────┐ ►► + // │ │1──┐ ┌────┐ + // │trigger│ ├─►│node│ + // │ │1──┘ └────┘ + // └───────┘ + // All nodes have run data. `findStartNodes` should return node twice + // because it has 2 input connections. + test('multiple outputs', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node = createNodeData({ name: 'node' }); + const graph = new DirectedGraph() + .addNodes(trigger, node) + .addConnections( + { from: trigger, to: node, outputIndex: 0, inputIndex: 0 }, + { from: trigger, to: node, outputIndex: 1, inputIndex: 0 }, + ); + const runData: IRunData = { + [trigger.name]: [ + toITaskData([ + { data: { value: 1 }, outputIndex: 0 }, + { data: { value: 1 }, outputIndex: 1 }, + ]), + ], + [node.name]: [toITaskData([{ data: { value: 1 } }])], + }; + + // ACT + const startNodes = findStartNodes(graph, trigger, node, runData); + + // ASSERT + expect(startNodes).toHaveLength(1); + expect(startNodes[0]).toEqual(node); + }); + + // ┌─────┐ ┌─────┐ ►► + //┌───────┐ │ ├────┬────────►│ │ ┌─────┐ + //│trigger├───►│node1│ │ │node2├────┬───►│node4│ + //└───────┘ │ ├────┼────┬───►│ │ │ └─────┘ + // └─────┘ │ │ └─────┘ │ + // │ │ │ + // │ │ │ + // │ │ │ + // │ │ ┌─────┐ │ + // │ └───►│ │ │ + // │ │node3├────┘ + // └────────►│ │ + // └─────┘ + test('complex example with multiple outputs and inputs', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + const node3 = createNodeData({ name: 'node3' }); + const node4 = createNodeData({ name: 'node4' }); + const graph = new DirectedGraph() + .addNodes(trigger, node1, node2, node3, node4) + .addConnections( + { from: trigger, to: node1 }, + { from: node1, to: node2, outputIndex: 0, inputIndex: 0 }, + { from: node1, to: node2, outputIndex: 1, inputIndex: 1 }, + { from: node1, to: node3, outputIndex: 0, inputIndex: 1 }, + { from: node1, to: node3, outputIndex: 1, inputIndex: 0 }, + { from: node2, to: node4 }, + { from: node3, to: node4 }, + ); + + { + // ACT + const startNodes = findStartNodes(graph, trigger, node4); + + // ASSERT + expect(startNodes).toHaveLength(1); + // no run data means the trigger is the start node + expect(startNodes[0]).toEqual(trigger); + } + + { + // run data for everything + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { value: 1 } }])], + [node1.name]: [toITaskData([{ data: { value: 1 } }])], + [node2.name]: [toITaskData([{ data: { value: 1 } }])], + [node3.name]: [toITaskData([{ data: { value: 1 } }])], + [node4.name]: [toITaskData([{ data: { value: 1 } }])], + }; + + // ACT + const startNodes = findStartNodes(graph, trigger, node4, runData); + + // ASSERT + expect(startNodes).toHaveLength(1); + expect(startNodes[0]).toEqual(node4); + } + }); + + // ►► + // ┌───────┐1 ┌────┐ + // │ ├────────►│ │ + // │trigger│ │node│ + // │ ├────────►│ │ + // └───────┘0 └────┘ + // The merge node only gets data on one input, so the it should only be once + // in the start nodes + test('multiple connections with the first one having data', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node = createNodeData({ name: 'node' }); + + const graph = new DirectedGraph() + .addNodes(trigger, node) + .addConnections( + { from: trigger, to: node, inputIndex: 0, outputIndex: 0 }, + { from: trigger, to: node, inputIndex: 1, outputIndex: 1 }, + ); + + // ACT + const startNodes = findStartNodes(graph, trigger, node, { + [trigger.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 0 }])], + }); + + // ASSERT + expect(startNodes).toHaveLength(1); + expect(startNodes[0]).toEqual(node); + }); + + // ►► + // ┌───────┐0 ┌────┐ + // │ ├────────►│ │ + // │trigger│ │node│ + // │ ├────────►│ │ + // └───────┘1 └────┘ + // The merge node only gets data on one input, so the it should only be once + // in the start nodes + test('multiple connections with the second one having data', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node = createNodeData({ name: 'node' }); + + const graph = new DirectedGraph() + .addNodes(trigger, node) + .addConnections( + { from: trigger, to: node, inputIndex: 0, outputIndex: 0 }, + { from: trigger, to: node, inputIndex: 1, outputIndex: 1 }, + ); + + // ACT + const startNodes = findStartNodes(graph, trigger, node, { + [trigger.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 1 }])], + }); + + // ASSERT + expect(startNodes).toHaveLength(1); + expect(startNodes[0]).toEqual(node); + }); + + // ►► + // ┌───────┐1 ┌────┐ + // │ ├────────►│ │ + // │trigger│ │node│ + // │ ├────────►│ │ + // └───────┘1 └────┘ + // The merge node gets data on both inputs, so the it should be in the start + // nodes twice. + test('multiple connections with both having data', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node = createNodeData({ name: 'node' }); + + const graph = new DirectedGraph() + .addNodes(trigger, node) + .addConnections( + { from: trigger, to: node, inputIndex: 0, outputIndex: 0 }, + { from: trigger, to: node, inputIndex: 1, outputIndex: 1 }, + ); + + // ACT + const startNodes = findStartNodes(graph, trigger, node, { + [trigger.name]: [ + toITaskData([ + { data: { value: 1 }, outputIndex: 0 }, + { data: { value: 1 }, outputIndex: 1 }, + ]), + ], + }); + + // ASSERT + expect(startNodes).toHaveLength(1); + expect(startNodes[0]).toEqual(node); + }); + + // ►► + // ┌───────┐ ┌────┐ + // │ │1 ┌────►│ │ + // │trigger├───┤ │node│ + // │ │ └────►│ │ + // └───────┘ └────┘ + test('multiple connections with both having data', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + const node3 = createNodeData({ name: 'node3' }); + const graph = new DirectedGraph() + .addNodes(trigger, node1, node2, node3) + .addConnections( + { from: trigger, to: node1 }, + { from: trigger, to: node2 }, + { from: node1, to: node3 }, + { from: node2, to: node3 }, + ); + + // ACT + const startNodes = findStartNodes(graph, trigger, node3, { + [trigger.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 0 }])], + [node1.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 0 }])], + [node2.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 0 }])], + }); + + // ASSERT + expect(startNodes).toHaveLength(1); + expect(startNodes[0]).toEqual(node3); + }); + + // ►► + // ┌───────┐ ┌─────┐0 ┌─────┐ + // │ │1 │ ├────────►│ │ + // │trigger├───────►│node1│ │node2│ + // │ │ │ ├────────►│ │ + // └───────┘ └─────┘1 └─────┘ + test('multiple connections with trigger', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + + const graph = new DirectedGraph() + .addNodes(trigger, node1, node2) + .addConnections( + { from: trigger, to: node1 }, + { from: node1, to: node2, outputIndex: 0 }, + { from: node1, to: node2, outputIndex: 1 }, + ); + + // ACT + const startNodes = findStartNodes(graph, node1, node2, { + [trigger.name]: [toITaskData([{ data: { value: 1 } }])], + [node1.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 1 }])], + }); + + // ASSERT + expect(startNodes).toHaveLength(1); + expect(startNodes[0]).toEqual(node2); + }); + + // ►► + //┌───────┐1 ┌─────┐1 ┌─────┐ + //│Trigger├───┬──►│Node1├───┬─►│Node2│ + //└───────┘ │ └─────┘ │ └─────┘ + // │ │ + // └─────────────┘ + test('terminates when called with graph that contains cycles', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + const graph = new DirectedGraph() + .addNodes(trigger, node1, node2) + .addConnections( + { from: trigger, to: node1 }, + { from: node1, to: node1 }, + { from: node1, to: node2 }, + ); + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { value: 1 } }])], + [node1.name]: [toITaskData([{ data: { value: 1 } }])], + }; + const pinData: IPinData = {}; + + // ACT + const startNodes = findStartNodes(graph, trigger, node2, runData, pinData); + + // ASSERT + expect(startNodes).toHaveLength(1); + expect(startNodes[0]).toEqual(node2); + }); +}); diff --git a/packages/core/src/PartialExecutionUtils/__tests__/findSubgraph.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/findSubgraph.test.ts new file mode 100644 index 0000000000..d82f73e9e3 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/__tests__/findSubgraph.test.ts @@ -0,0 +1,185 @@ +// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/ +// If you update the tests, please update the diagrams as well. +// If you add a test, please create a new diagram. +// +// Map +// 0 means the output has no run data +// 1 means the output has run data +// ►► denotes the node that the user wants to execute to +// XX denotes that the node is disabled +// PD denotes that the node has pinned data + +import { DirectedGraph } from '../DirectedGraph'; +import { findSubgraph } from '../findSubgraph'; +import { createNodeData } from './helpers'; + +describe('findSubgraph2', () => { + // ►► + // ┌───────┐ ┌───────────┐ + // │trigger├────►│destination│ + // └───────┘ └───────────┘ + test('simple', () => { + const trigger = createNodeData({ name: 'trigger' }); + const destination = createNodeData({ name: 'destination' }); + + const graph = new DirectedGraph() + .addNodes(trigger, destination) + .addConnections({ from: trigger, to: destination }); + + const subgraph = findSubgraph(graph, destination, trigger); + + expect(subgraph).toEqual(graph); + }); + + // ►► + // ┌───────┐ ┌───────────┐ + // │ ├────────►│ │ + // │trigger│ │destination│ + // │ ├────────►│ │ + // └───────┘ └───────────┘ + test('multiple connections', () => { + const ifNode = createNodeData({ name: 'If' }); + const noOp = createNodeData({ name: 'noOp' }); + + const graph = new DirectedGraph() + .addNodes(ifNode, noOp) + .addConnections( + { from: ifNode, to: noOp, outputIndex: 0 }, + { from: ifNode, to: noOp, outputIndex: 1 }, + ); + + const subgraph = findSubgraph(graph, noOp, ifNode); + + expect(subgraph).toEqual(graph); + }); + + // ►► + // ┌───────┐ ┌───────────┐ + // │ ├────────►│ │ ┌────┐ + // │trigger│ │destination├─────►│node│ + // │ ├────────►│ │ └────┘ + // └───────┘ └───────────┘ + test('disregard nodes after destination', () => { + const trigger = createNodeData({ name: 'trigger' }); + const destination = createNodeData({ name: 'destination' }); + const node = createNodeData({ name: 'node' }); + + const graph = new DirectedGraph() + .addNodes(trigger, destination, node) + .addConnections({ from: trigger, to: destination }, { from: destination, to: node }); + + const subgraph = findSubgraph(graph, destination, trigger); + + expect(subgraph).toEqual( + new DirectedGraph() + .addNodes(trigger, destination) + .addConnections({ from: trigger, to: destination }), + ); + }); + + // XX + // ┌───────┐ ┌────────┐ ►► + // │ ├────────►│ │ ┌───────────┐ + // │trigger│ │disabled├─────►│destination│ + // │ ├────────►│ │ └───────────┘ + // └───────┘ └────────┘ + test('skip disabled nodes', () => { + const trigger = createNodeData({ name: 'trigger' }); + const disabled = createNodeData({ name: 'disabled', disabled: true }); + const destination = createNodeData({ name: 'destination' }); + + const graph = new DirectedGraph() + .addNodes(trigger, disabled, destination) + .addConnections({ from: trigger, to: disabled }, { from: disabled, to: destination }); + + const subgraph = findSubgraph(graph, destination, trigger); + + expect(subgraph).toEqual( + new DirectedGraph() + .addNodes(trigger, destination) + .addConnections({ from: trigger, to: destination }), + ); + }); + + // ►► + // ┌───────┐ ┌─────┐ ┌─────┐ + // │Trigger├───┬──►│Node1├───┬─►│Node2│ + // └───────┘ │ └─────┘ │ └─────┘ + // │ │ + // └─────────────┘ + test('terminates when called with graph that contains cycles', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + const graph = new DirectedGraph() + .addNodes(trigger, node1, node2) + .addConnections( + { from: trigger, to: node1 }, + { from: node1, to: node1 }, + { from: node1, to: node2 }, + ); + + // ACT + const subgraph = findSubgraph(graph, node2, trigger); + + // ASSERT + expect(subgraph).toEqual(graph); + }); + + // ►► + // ┌───────┐ ┌─────┐ + // │Trigger├──┬─►│Node1│ + // └───────┘ │ └─────┘ + // │ + // ┌─────┐ │ + // │Node2├────┘ + // └─────┘ + test('terminates when called with graph that contains cycles', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + const graph = new DirectedGraph() + .addNodes(trigger, node1, node2) + .addConnections({ from: trigger, to: node1 }, { from: node2, to: node1 }); + + // ACT + const subgraph = findSubgraph(graph, node1, trigger); + + // ASSERT + expect(subgraph).toEqual( + new DirectedGraph().addNodes(trigger, node1).addConnections({ from: trigger, to: node1 }), + ); + }); + + // ►► + // ┌───────┐ ┌───────────┐ ┌───────────┐ + // │Trigger├─┬─►│Destination├──►│AnotherNode├───┐ + // └───────┘ │ └───────────┘ └───────────┘ │ + // │ │ + // └──────────────────────────────────┘ + test('terminates if the destination node is part of a cycle', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const destination = createNodeData({ name: 'destination' }); + const anotherNode = createNodeData({ name: 'anotherNode' }); + const graph = new DirectedGraph() + .addNodes(trigger, destination, anotherNode) + .addConnections( + { from: trigger, to: destination }, + { from: destination, to: anotherNode }, + { from: anotherNode, to: destination }, + ); + + // ACT + const subgraph = findSubgraph(graph, destination, trigger); + + // ASSERT + expect(subgraph).toEqual( + new DirectedGraph() + .addNodes(trigger, destination) + .addConnections({ from: trigger, to: destination }), + ); + }); +}); diff --git a/packages/core/src/PartialExecutionUtils/__tests__/getSourceDataGroups.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/getSourceDataGroups.test.ts new file mode 100644 index 0000000000..737d0a2754 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/__tests__/getSourceDataGroups.test.ts @@ -0,0 +1,197 @@ +// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/ +// If you update the tests, please update the diagrams as well. +// If you add a test, please create a new diagram. +// +// Map +// 0 means the output has no run data +// 1 means the output has run data +// PD denotes that the node has pinned data + +import type { IPinData } from 'n8n-workflow'; +import { NodeConnectionType, type IRunData } from 'n8n-workflow'; +import { DirectedGraph } from '../DirectedGraph'; +import { createNodeData, toITaskData } from './helpers'; +import { getSourceDataGroups } from '../getSourceDataGroups'; + +describe('getSourceDataGroups', () => { + //┌───────┐1 + //│source1├────┐ + //└───────┘ │ ┌────┐ + //┌───────┐1 ├──►│ │ + //│source2├────┘ │node│ + //└───────┘ ┌──►│ │ + //┌───────┐1 │ └────┘ + //│source3├────┘ + //└───────┘ + it('groups sources into possibly complete sets if all of them have data', () => { + // ARRANGE + const source1 = createNodeData({ name: 'source1' }); + const source2 = createNodeData({ name: 'source2' }); + const source3 = createNodeData({ name: 'source3' }); + const node = createNodeData({ name: 'node' }); + + const graph = new DirectedGraph() + .addNodes(source1, source2, source3, node) + .addConnections( + { from: source1, to: node, inputIndex: 0 }, + { from: source2, to: node, inputIndex: 0 }, + { from: source3, to: node, inputIndex: 1 }, + ); + const runData: IRunData = { + [source1.name]: [toITaskData([{ data: { value: 1 } }])], + [source2.name]: [toITaskData([{ data: { value: 1 } }])], + [source3.name]: [toITaskData([{ data: { value: 1 } }])], + }; + const pinnedData: IPinData = {}; + + // ACT + const groups = getSourceDataGroups(graph, node, runData, pinnedData); + + // ASSERT + expect(groups).toHaveLength(2); + + const group1 = groups[0]; + expect(group1).toHaveLength(2); + expect(group1[0]).toEqual({ + from: source1, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 0, + to: node, + }); + expect(group1[1]).toEqual({ + from: source3, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 1, + to: node, + }); + + const group2 = groups[1]; + expect(group2).toHaveLength(1); + expect(group2[0]).toEqual({ + from: source2, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 0, + to: node, + }); + }); + + //┌───────┐PD + //│source1├────┐ + //└───────┘ │ ┌────┐ + //┌───────┐PD ├──►│ │ + //│source2├────┘ │node│ + //└───────┘ ┌──►│ │ + //┌───────┐PD │ └────┘ + //│source3├────┘ + //└───────┘ + it('groups sources into possibly complete sets if all of them have data', () => { + // ARRANGE + const source1 = createNodeData({ name: 'source1' }); + const source2 = createNodeData({ name: 'source2' }); + const source3 = createNodeData({ name: 'source3' }); + const node = createNodeData({ name: 'node' }); + + const graph = new DirectedGraph() + .addNodes(source1, source2, source3, node) + .addConnections( + { from: source1, to: node, inputIndex: 0 }, + { from: source2, to: node, inputIndex: 0 }, + { from: source3, to: node, inputIndex: 1 }, + ); + const runData: IRunData = {}; + const pinnedData: IPinData = { + [source1.name]: [{ json: { value: 1 } }], + [source2.name]: [{ json: { value: 2 } }], + [source3.name]: [{ json: { value: 3 } }], + }; + + // ACT + const groups = getSourceDataGroups(graph, node, runData, pinnedData); + + // ASSERT + expect(groups).toHaveLength(2); + + const group1 = groups[0]; + expect(group1).toHaveLength(2); + expect(group1[0]).toEqual({ + from: source1, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 0, + to: node, + }); + expect(group1[1]).toEqual({ + from: source3, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 1, + to: node, + }); + + const group2 = groups[1]; + expect(group2).toHaveLength(1); + expect(group2[0]).toEqual({ + from: source2, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 0, + to: node, + }); + }); + + //┌───────┐0 + //│source1├────┐ + //└───────┘ │ ┌────┐ + //┌───────┐1 ├──►│ │ + //│source2├────┘ │node│ + //└───────┘ ┌──►│ │ + //┌───────┐1 │ └────┘ + //│source3├────┘ + //└───────┘ + it('groups sources into possibly complete sets if all of them have data', () => { + // ARRANGE + const source1 = createNodeData({ name: 'source1' }); + const source2 = createNodeData({ name: 'source2' }); + const source3 = createNodeData({ name: 'source3' }); + const node = createNodeData({ name: 'node' }); + + const graph = new DirectedGraph() + .addNodes(source1, source2, source3, node) + .addConnections( + { from: source1, to: node, inputIndex: 0 }, + { from: source2, to: node, inputIndex: 0 }, + { from: source3, to: node, inputIndex: 1 }, + ); + const runData: IRunData = { + [source2.name]: [toITaskData([{ data: { value: 1 } }])], + [source3.name]: [toITaskData([{ data: { value: 1 } }])], + }; + const pinnedData: IPinData = {}; + + // ACT + const groups = getSourceDataGroups(graph, node, runData, pinnedData); + + // ASSERT + expect(groups).toHaveLength(1); + + const group1 = groups[0]; + expect(group1).toHaveLength(2); + expect(group1[0]).toEqual({ + from: source2, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 0, + to: node, + }); + expect(group1[1]).toEqual({ + from: source3, + outputIndex: 0, + type: NodeConnectionType.Main, + inputIndex: 1, + to: node, + }); + }); +}); diff --git a/packages/core/src/PartialExecutionUtils/__tests__/helpers.ts b/packages/core/src/PartialExecutionUtils/__tests__/helpers.ts new file mode 100644 index 0000000000..72b1efa30c --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/__tests__/helpers.ts @@ -0,0 +1,99 @@ +import { NodeConnectionType } from 'n8n-workflow'; +import type { INodeParameters, INode, ITaskData, IDataObject, IConnections } from 'n8n-workflow'; + +interface StubNode { + name: string; + parameters?: INodeParameters; + disabled?: boolean; +} + +export function createNodeData(stubData: StubNode): INode { + return { + name: stubData.name, + parameters: stubData.parameters ?? {}, + type: 'test.set', + typeVersion: 1, + id: 'uuid-1234', + position: [100, 100], + disabled: stubData.disabled ?? false, + }; +} + +type TaskData = { + data: IDataObject; + outputIndex?: number; + nodeConnectionType?: NodeConnectionType; +}; + +export function toITaskData(taskData: TaskData[]): ITaskData { + const result: ITaskData = { + executionStatus: 'success', + executionTime: 0, + startTime: 0, + source: [], + data: {}, + }; + + // NOTE: Here to make TS happy. + result.data = result.data ?? {}; + for (const taskDatum of taskData) { + const type = taskDatum.nodeConnectionType ?? NodeConnectionType.Main; + const outputIndex = taskDatum.outputIndex ?? 0; + + result.data[type] = result.data[type] ?? []; + const dataConnection = result.data[type]; + dataConnection[outputIndex] = [{ json: taskDatum.data }]; + } + + for (const [type, dataConnection] of Object.entries(result.data)) { + for (const [index, maybe] of dataConnection.entries()) { + result.data[type][index] = maybe ?? null; + } + } + + return result; +} + +export const nodeTypes = { + getByName: jest.fn(), + getByNameAndVersion: jest.fn(), + getKnownTypes: jest.fn(), +}; + +export const defaultWorkflowParameter = { + active: false, + nodeTypes, +}; + +type Connection = { + from: INode; + to: INode; + type?: NodeConnectionType; + outputIndex?: number; + inputIndex?: number; +}; + +export function toIConnections(connections: Connection[]): IConnections { + const result: IConnections = {}; + + for (const connection of connections) { + const type = connection.type ?? NodeConnectionType.Main; + const outputIndex = connection.outputIndex ?? 0; + const inputIndex = connection.inputIndex ?? 0; + + result[connection.from.name] = result[connection.from.name] ?? { + [type]: [], + }; + const resultConnection = result[connection.from.name]; + resultConnection[type][outputIndex] = resultConnection[type][outputIndex] ?? []; + const group = resultConnection[type][outputIndex]; + + group.push({ + node: connection.to.name, + type, + index: inputIndex, + }); + } + + return result; +} diff --git a/packages/core/src/PartialExecutionUtils/__tests__/recreateNodeExecutionStack.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/recreateNodeExecutionStack.test.ts new file mode 100644 index 0000000000..42cbe1f5ff --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/__tests__/recreateNodeExecutionStack.test.ts @@ -0,0 +1,333 @@ +// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/ +// If you update the tests, please update the diagrams as well. +// If you add a test, please create a new diagram. +// +// Map +// 0 means the output has no run data +// 1 means the output has run data +// ►► denotes the node that the user wants to execute to +// XX denotes that the node is disabled +// PD denotes that the node has pinned data + +import { recreateNodeExecutionStack } from '@/PartialExecutionUtils/recreateNodeExecutionStack'; +import { type IPinData, type IRunData } from 'n8n-workflow'; +import { AssertionError } from 'assert'; +import { DirectedGraph } from '../DirectedGraph'; +import { findSubgraph } from '../findSubgraph'; +import { createNodeData, toITaskData } from './helpers'; + +describe('recreateNodeExecutionStack', () => { + // ►► + // ┌───────┐1 ┌────┐ + // │Trigger├──────►│Node│ + // └───────┘ └────┘ + test('all nodes except destination node have data', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node = createNodeData({ name: 'node' }); + + const graph = new DirectedGraph() + .addNodes(trigger, node) + .addConnections({ from: trigger, to: node }); + + const workflow = findSubgraph(graph, node, trigger); + const startNodes = [node]; + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { value: 1 } }])], + }; + const pinData = {}; + + // ACT + const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData); + + // ASSERT + expect(nodeExecutionStack).toHaveLength(1); + expect(nodeExecutionStack).toEqual([ + { + data: { main: [[{ json: { value: 1 } }]] }, + node, + source: { + main: [ + { + // TODO: not part of ISourceDate, but maybe it should be? + //currentNodeInput: 0, + previousNode: 'trigger', + previousNodeOutput: 0, + previousNodeRun: 0, + }, + ], + }, + }, + ]); + + expect(waitingExecution).toEqual({ node: { '0': { main: [[{ json: { value: 1 } }]] } } }); + expect(waitingExecutionSource).toEqual({ + node: { + '0': { + main: [ + { previousNode: 'trigger', previousNodeOutput: undefined, previousNodeRun: undefined }, + ], + }, + }, + }); + }); + + // ►► + // ┌───────┐0 ┌────┐ + // │Trigger├──────►│Node│ + // └───────┘ └────┘ + test('no nodes have data', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node = createNodeData({ name: 'node' }); + + const workflow = new DirectedGraph() + .addNodes(trigger, node) + .addConnections({ from: trigger, to: node }); + const startNodes = [trigger]; + const runData: IRunData = {}; + const pinData: IPinData = {}; + + // ACT + const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData); + + // ASSERT + expect(nodeExecutionStack).toHaveLength(1); + expect(nodeExecutionStack).toEqual([ + { + data: { main: [[{ json: {} }]] }, + node: trigger, + source: null, + }, + ]); + + expect(waitingExecution).toEqual({ node: { '0': { main: [null] } } }); + expect(waitingExecutionSource).toEqual({ node: { '0': { main: [null] } } }); + }); + + // PinData ►► + // ┌───────┐1 ┌────┐ + // │Trigger├──────►│Node│ + // └───────┘ └────┘ + test('node before destination node has pinned data', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node = createNodeData({ name: 'node' }); + + const workflow = new DirectedGraph() + .addNodes(trigger, node) + .addConnections({ from: trigger, to: node }); + const startNodes = [node]; + const runData: IRunData = {}; + const pinData: IPinData = { + [trigger.name]: [{ json: { value: 1 } }], + }; + + // ACT + const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData); + + // ASSERT + expect(nodeExecutionStack).toHaveLength(1); + expect(nodeExecutionStack).toEqual([ + { + data: { main: [[{ json: { value: 1 } }]] }, + node, + source: { + main: [ + { + // TODO: not part of ISourceDate, but maybe it should be? + //currentNodeInput: 0, + previousNode: trigger.name, + previousNodeRun: 0, + previousNodeOutput: 0, + }, + ], + }, + }, + ]); + + expect(waitingExecution).toEqual({ node: { '0': { main: [null] } } }); + expect(waitingExecutionSource).toEqual({ node: { '0': { main: [null] } } }); + }); + + // XX ►► + // ┌───────┐1 ┌─────┐ ┌─────┐ + // │Trigger├─────►│Node1├──────►│Node2│ + // └───────┘ └─────┘ └─────┘ + test('throws if a disabled node is found', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node1 = createNodeData({ name: 'node1', disabled: true }); + const node2 = createNodeData({ name: 'node2' }); + + const graph = new DirectedGraph() + .addNodes(trigger, node1, node2) + .addConnections({ from: trigger, to: node1 }, { from: node1, to: node2 }); + + const startNodes = [node2]; + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { value: 1 } }])], + }; + const pinData = {}; + + // ACT & ASSERT + expect(() => + recreateNodeExecutionStack(graph, startNodes, node2, runData, pinData), + ).toThrowError(AssertionError); + }); + + // ►► + // ┌───────┐1 ┌─────┐1 ┌─────┐ + // │Trigger├──┬──►│Node1├──┬───►│Node3│ + // └───────┘ │ └─────┘ │ └─────┘ + // │ │ + // │ ┌─────┐1 │ + // └──►│Node2├──┘ + // └─────┘ + test('multiple incoming connections', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + const node3 = createNodeData({ name: 'node3' }); + const graph = new DirectedGraph() + .addNodes(trigger, node1, node2, node3) + .addConnections( + { from: trigger, to: node1 }, + { from: trigger, to: node2 }, + { from: node1, to: node3 }, + { from: node2, to: node3 }, + ); + + const startNodes = [node3]; + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { value: 1 } }])], + [node1.name]: [toITaskData([{ data: { value: 1 } }])], + [node2.name]: [toITaskData([{ data: { value: 1 } }])], + }; + const pinData = {}; + + // ACT + const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + recreateNodeExecutionStack(graph, startNodes, node3, runData, pinData); + + // ASSERT + + expect(nodeExecutionStack).toEqual([ + { + data: { main: [[{ json: { value: 1 } }]] }, + node: node3, + source: { + main: [ + { + // TODO: not part of ISourceDate, but maybe it should be? + //currentNodeInput: 0, + previousNode: 'node1', + previousNodeOutput: 0, + previousNodeRun: 0, + }, + ], + }, + }, + { + data: { main: [[{ json: { value: 1 } }]] }, + node: node3, + source: { + main: [ + { + // TODO: not part of ISourceDate, but maybe it should be? + //currentNodeInput: 0, + previousNode: 'node2', + previousNodeOutput: 0, + previousNodeRun: 0, + }, + ], + }, + }, + ]); + + expect(waitingExecution).toEqual({ + node3: { '0': { main: [[{ json: { value: 1 } }], [{ json: { value: 1 } }]] } }, + }); + expect(waitingExecutionSource).toEqual({ + node3: { + '0': { + main: [ + { previousNode: 'node1', previousNodeOutput: undefined, previousNodeRun: undefined }, + { previousNode: 'node2', previousNodeOutput: undefined, previousNodeRun: undefined }, + ], + }, + }, + }); + }); + + // ┌─────┐1 ►► + // ┌─►│node1├───┐ ┌─────┐ + // ┌───────┐1 │ └─────┘ └──►│ │ + // │Trigger├──┤ │node3│ + // └───────┘ │ ┌─────┐1 ┌──►│ │ + // └─►│node2├───┘ └─────┘ + // └─────┘ + test('multiple inputs', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const node1 = createNodeData({ name: 'node1' }); + const node2 = createNodeData({ name: 'node2' }); + const node3 = createNodeData({ name: 'node3' }); + const graph = new DirectedGraph() + .addNodes(trigger, node1, node2, node3) + .addConnections( + { from: trigger, to: node1 }, + { from: trigger, to: node2 }, + { from: node1, to: node3, inputIndex: 0 }, + { from: node2, to: node3, inputIndex: 1 }, + ); + const startNodes = [node3]; + const runData: IRunData = { + [trigger.name]: [toITaskData([{ data: { value: 1 } }])], + [node1.name]: [toITaskData([{ data: { value: 1 } }])], + [node2.name]: [toITaskData([{ data: { value: 1 } }])], + }; + const pinData: IPinData = { + [trigger.name]: [{ json: { value: 1 } }], + }; + + // ACT + const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + recreateNodeExecutionStack(graph, startNodes, node3, runData, pinData); + + // ASSERT + expect(nodeExecutionStack).toHaveLength(1); + expect(nodeExecutionStack[0]).toEqual({ + data: { main: [[{ json: { value: 1 } }], [{ json: { value: 1 } }]] }, + node: node3, + source: { + main: [ + { previousNode: 'node1', previousNodeOutput: 0, previousNodeRun: 0 }, + { previousNode: 'node2', previousNodeOutput: 0, previousNodeRun: 0 }, + ], + }, + }); + + expect(waitingExecution).toEqual({ + node3: { + '0': { + main: [[{ json: { value: 1 } }]], + }, + }, + }); + expect(waitingExecutionSource).toEqual({ + node3: { + '0': { + main: [ + { previousNode: 'node1', previousNodeOutput: undefined, previousNodeRun: undefined }, + { previousNode: 'node2', previousNodeOutput: 1, previousNodeRun: undefined }, + ], + }, + }, + }); + }); +}); diff --git a/packages/core/src/PartialExecutionUtils/__tests__/toIConnections.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/toIConnections.test.ts new file mode 100644 index 0000000000..a2524bf3ce --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/__tests__/toIConnections.test.ts @@ -0,0 +1,26 @@ +import { NodeConnectionType } from 'n8n-workflow'; +import { createNodeData, toIConnections } from './helpers'; + +test('toIConnections', () => { + const node1 = createNodeData({ name: 'Basic Node 1' }); + const node2 = createNodeData({ name: 'Basic Node 2' }); + + expect( + toIConnections([{ from: node1, to: node2, type: NodeConnectionType.Main, outputIndex: 0 }]), + ).toEqual({ + [node1.name]: { + // output group + main: [ + // first output + [ + // first connection + { + node: node2.name, + type: NodeConnectionType.Main, + index: 0, + }, + ], + ], + }, + }); +}); diff --git a/packages/core/src/PartialExecutionUtils/__tests__/toITaskData.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/toITaskData.test.ts new file mode 100644 index 0000000000..e255836339 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/__tests__/toITaskData.test.ts @@ -0,0 +1,64 @@ +import { NodeConnectionType } from 'n8n-workflow'; +import { toITaskData } from './helpers'; + +test('toITaskData', function () { + expect(toITaskData([{ data: { value: 1 } }])).toEqual({ + executionStatus: 'success', + executionTime: 0, + source: [], + startTime: 0, + data: { + main: [[{ json: { value: 1 } }]], + }, + }); + + expect(toITaskData([{ data: { value: 1 }, outputIndex: 1 }])).toEqual({ + executionStatus: 'success', + executionTime: 0, + source: [], + startTime: 0, + data: { + main: [null, [{ json: { value: 1 } }]], + }, + }); + + expect( + toITaskData([ + { data: { value: 1 }, outputIndex: 1, nodeConnectionType: NodeConnectionType.AiAgent }, + ]), + ).toEqual({ + executionStatus: 'success', + executionTime: 0, + source: [], + startTime: 0, + data: { + [NodeConnectionType.AiAgent]: [null, [{ json: { value: 1 } }]], + }, + }); + + expect( + toITaskData([ + { data: { value: 1 }, outputIndex: 0 }, + { data: { value: 2 }, outputIndex: 1 }, + ]), + ).toEqual({ + executionStatus: 'success', + executionTime: 0, + startTime: 0, + source: [], + data: { + main: [ + [ + { + json: { value: 1 }, + }, + ], + [ + { + json: { value: 2 }, + }, + ], + ], + }, + }); +}); diff --git a/packages/core/src/PartialExecutionUtils/findCycles.ts b/packages/core/src/PartialExecutionUtils/findCycles.ts new file mode 100644 index 0000000000..388518ae52 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/findCycles.ts @@ -0,0 +1,6 @@ +import type { Workflow } from 'n8n-workflow'; + +export function findCycles(_workflow: Workflow) { + // TODO: implement depth first search or Tarjan's Algorithm + return []; +} diff --git a/packages/core/src/PartialExecutionUtils/findStartNodes.ts b/packages/core/src/PartialExecutionUtils/findStartNodes.ts new file mode 100644 index 0000000000..910045d709 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/findStartNodes.ts @@ -0,0 +1,153 @@ +import type { INode, IPinData, IRunData } from 'n8n-workflow'; +import type { DirectedGraph } from './DirectedGraph'; +import { getIncomingData } from './getIncomingData'; + +/** + * A node is dirty if either of the following is true: + * - it's properties or options changed since last execution (not implemented yet) + * - one of it's parents is disabled + * - it has an error (not implemented yet) + * - it neither has run data nor pinned data + */ +export function isDirty(node: INode, runData: IRunData = {}, pinData: IPinData = {}): boolean { + // TODO: implement + const propertiesOrOptionsChanged = false; + + if (propertiesOrOptionsChanged) { + return true; + } + + // TODO: implement + const parentNodeGotDisabled = false; + + if (parentNodeGotDisabled) { + return true; + } + + // TODO: implement + const hasAnError = false; + + if (hasAnError) { + return true; + } + + const hasPinnedData = pinData[node.name] !== undefined; + + if (hasPinnedData) { + return false; + } + + const hasRunData = runData?.[node.name]; + + if (hasRunData) { + return false; + } + + return true; +} + +function findStartNodesRecursive( + graph: DirectedGraph, + current: INode, + destination: INode, + runData: IRunData, + pinData: IPinData, + startNodes: Set, + seen: Set, +): Set { + const nodeIsDirty = isDirty(current, runData, pinData); + + // If the current node is dirty stop following this branch, we found a start + // node. + if (nodeIsDirty) { + startNodes.add(current); + + return startNodes; + } + + // If the current node is the destination node stop following this branch, we + // found a start node. + if (current === destination) { + startNodes.add(current); + return startNodes; + } + + // If we detect a cycle stop following the branch, there is no start node on + // this branch. + if (seen.has(current)) { + return startNodes; + } + + // Recurse with every direct child that is part of the sub graph. + const outGoingConnections = graph.getDirectChildren(current); + for (const outGoingConnection of outGoingConnections) { + const nodeRunData = getIncomingData( + runData, + outGoingConnection.from.name, + // NOTE: It's always 0 until I fix the bug that removes the run data for + // old runs. The FE only sends data for one run for each node. + 0, + outGoingConnection.type, + outGoingConnection.outputIndex, + ); + + // If the node has multiple outputs, only follow the outputs that have run data. + const hasNoRunData = + nodeRunData === null || nodeRunData === undefined || nodeRunData.length === 0; + if (hasNoRunData) { + continue; + } + + findStartNodesRecursive( + graph, + outGoingConnection.to, + destination, + runData, + pinData, + startNodes, + new Set(seen).add(current), + ); + } + + return startNodes; +} + +/** + * The start node is the node from which a partial execution starts. The start + * node will be executed or re-executed. + * The nodes are found by traversing the graph from the trigger to the + * destination and finding the earliest dirty nodes on every branch. + * + * The algorithm is: + * Starting from the trigger node. + * + * 1. if the current node is not a trigger and has no input data (on all + * connections) (not implemented yet, possibly not necessary) + * - stop following this branch, there is no start node on this branch + * 2. If the current node is dirty, or is the destination node + * - stop following this branch, we found a start node + * 3. If we detect a cycle + * - stop following the branch, there is no start node on this branch + * 4. Recurse with every direct child that is part of the sub graph + */ +export function findStartNodes( + graph: DirectedGraph, + trigger: INode, + destination: INode, + runData: IRunData = {}, + pinData: IPinData = {}, +): INode[] { + const startNodes = findStartNodesRecursive( + graph, + trigger, + destination, + runData, + pinData, + // start nodes found + new Set(), + // seen + new Set(), + ); + + return [...startNodes]; +} diff --git a/packages/core/src/PartialExecutionUtils/findSubgraph.ts b/packages/core/src/PartialExecutionUtils/findSubgraph.ts new file mode 100644 index 0000000000..2b1ceb2998 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/findSubgraph.ts @@ -0,0 +1,113 @@ +import type { INode } from 'n8n-workflow'; +import type { GraphConnection } from './DirectedGraph'; +import { DirectedGraph } from './DirectedGraph'; + +function findSubgraphRecursive( + graph: DirectedGraph, + destinationNode: INode, + current: INode, + trigger: INode, + newGraph: DirectedGraph, + currentBranch: GraphConnection[], +) { + // If the current node is the chosen trigger keep this branch. + if (current === trigger) { + for (const connection of currentBranch) { + newGraph.addNodes(connection.from, connection.to); + newGraph.addConnection(connection); + } + + return; + } + + let parentConnections = graph.getDirectParents(current); + + // If the current node has no parents, don’t keep this branch. + if (parentConnections.length === 0) { + return; + } + + // If the current node is the destination node again, don’t keep this branch. + const isCycleWithDestinationNode = + current === destinationNode && currentBranch.some((c) => c.to === destinationNode); + if (isCycleWithDestinationNode) { + return; + } + + // If the current node was already visited, keep this branch. + const isCycleWithCurrentNode = currentBranch.some((c) => c.to === current); + if (isCycleWithCurrentNode) { + // TODO: write function that adds nodes when adding connections + for (const connection of currentBranch) { + newGraph.addNodes(connection.from, connection.to); + newGraph.addConnection(connection); + } + return; + } + + // If the current node is disabled, don’t keep this node, but keep the + // branch. + // Take every incoming connection and connect it to every node that is + // connected to the current node’s first output + if (current.disabled) { + const incomingConnections = graph.getDirectParents(current); + const outgoingConnections = graph + .getDirectChildren(current) + // NOTE: When a node is disabled only the first output gets data + .filter((connection) => connection.outputIndex === 0); + + parentConnections = []; + + for (const incomingConnection of incomingConnections) { + for (const outgoingConnection of outgoingConnections) { + const newConnection = { + ...incomingConnection, + to: outgoingConnection.to, + inputIndex: outgoingConnection.inputIndex, + }; + + parentConnections.push(newConnection); + currentBranch.pop(); + currentBranch.push(newConnection); + } + } + } + + // Recurse on each parent. + for (const parentConnection of parentConnections) { + findSubgraphRecursive(graph, destinationNode, parentConnection.from, trigger, newGraph, [ + ...currentBranch, + parentConnection, + ]); + } +} + +/** + * Find all nodes that can lead from the trigger to the destination node, + * ignoring disabled nodes. + * + * The algorithm is: + * Start with Destination Node + * + * 1. if the current node is the chosen trigger keep this branch + * 2. if the current node has no parents, don’t keep this branch + * 3. if the current node is the destination node again, don’t keep this + * branch + * 4. if the current node was already visited, keep this branch + * 5. if the current node is disabled, don’t keep this node, but keep the + * branch + * - take every incoming connection and connect it to every node that is + * connected to the current node’s first output + * 6. Recurse on each parent + */ +export function findSubgraph( + graph: DirectedGraph, + destinationNode: INode, + trigger: INode, +): DirectedGraph { + const newGraph = new DirectedGraph(); + + findSubgraphRecursive(graph, destinationNode, destinationNode, trigger, newGraph, []); + + return newGraph; +} diff --git a/packages/core/src/PartialExecutionUtils/findTriggerForPartialExecution.ts b/packages/core/src/PartialExecutionUtils/findTriggerForPartialExecution.ts new file mode 100644 index 0000000000..baae6e7304 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/findTriggerForPartialExecution.ts @@ -0,0 +1,61 @@ +import type { INode, Workflow } from 'n8n-workflow'; +import * as assert from 'assert/strict'; + +function findAllParentTriggers(workflow: Workflow, destinationNodeName: string) { + const parentNodes = workflow + .getParentNodes(destinationNodeName) + .map((name) => { + const node = workflow.getNode(name); + + // We got the node name from `workflow.getParentNodes`. The node must + // exist. + assert.ok(node); + + return { + node, + nodeType: workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion), + }; + }) + .filter((value) => value !== null) + .filter(({ nodeType }) => nodeType.description.group.includes('trigger')) + .map(({ node }) => node); + + return parentNodes; +} + +// TODO: write unit tests for this +// TODO: rewrite this using DirectedGraph instead of workflow. +export function findTriggerForPartialExecution( + workflow: Workflow, + destinationNodeName: string, +): INode | undefined { + const parentTriggers = findAllParentTriggers(workflow, destinationNodeName).filter( + (trigger) => !trigger.disabled, + ); + const pinnedTriggers = parentTriggers + // TODO: add the other filters here from `findAllPinnedActivators`, see + // copy below. + .filter((trigger) => workflow.pinData?.[trigger.name]) + // TODO: Make this sorting more predictable + // Put nodes which names end with 'webhook' first, while also reversing the + // order they had in the original array. + .sort((n) => (n.type.endsWith('webhook') ? -1 : 1)); + + if (pinnedTriggers.length) { + return pinnedTriggers[0]; + } else { + return parentTriggers[0]; + } +} + +//function findAllPinnedActivators(workflow: Workflow, pinData?: IPinData) { +// return Object.values(workflow.nodes) +// .filter( +// (node) => +// !node.disabled && +// pinData?.[node.name] && +// ['trigger', 'webhook'].some((suffix) => node.type.toLowerCase().endsWith(suffix)) && +// node.type !== 'n8n-nodes-base.respondToWebhook', +// ) +// .sort((a) => (a.type.endsWith('webhook') ? -1 : 1)); +//} diff --git a/packages/core/src/PartialExecutionUtils/getIncomingData.ts b/packages/core/src/PartialExecutionUtils/getIncomingData.ts new file mode 100644 index 0000000000..2f5f22cd35 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/getIncomingData.ts @@ -0,0 +1,22 @@ +import * as a from 'assert'; +import type { INodeExecutionData, IRunData, NodeConnectionType } from 'n8n-workflow'; + +export function getIncomingData( + runData: IRunData, + nodeName: string, + runIndex: number, + connectionType: NodeConnectionType, + outputIndex: number, +): INodeExecutionData[] | null | undefined { + a.ok(runData[nodeName], `Can't find node with name '${nodeName}' in runData.`); + a.ok( + runData[nodeName][runIndex], + `Can't find a run for index '${runIndex}' for node name '${nodeName}'`, + ); + a.ok( + runData[nodeName][runIndex].data, + `Can't find data for index '${runIndex}' for node name '${nodeName}'`, + ); + + return runData[nodeName][runIndex].data[connectionType][outputIndex]; +} diff --git a/packages/core/src/PartialExecutionUtils/getSourceDataGroups.ts b/packages/core/src/PartialExecutionUtils/getSourceDataGroups.ts new file mode 100644 index 0000000000..58f8f2f745 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/getSourceDataGroups.ts @@ -0,0 +1,115 @@ +import { type INode, type IPinData, type IRunData } from 'n8n-workflow'; + +import type { GraphConnection, DirectedGraph } from './DirectedGraph'; + +function sortByInputIndexThenByName( + connection1: GraphConnection, + connection2: GraphConnection, +): number { + if (connection1.inputIndex === connection2.inputIndex) { + return connection1.from.name.localeCompare(connection2.from.name); + } else { + return connection1.inputIndex - connection2.inputIndex; + } +} + +/** + * Groups incoming connections to the node. The groups contain one connection + * per input, if possible, with run data or pinned data. + * + * The purpose of this is to get as many complete sets of data for executing + * nodes with multiple inputs. + * + * # Example 1: + * ┌───────┐1 + * │source1├────┐ + * └───────┘ │ ┌────┐ + * ┌───────┐1 ├──►│ │ + * │source2├────┘ │node│ + * └───────┘ ┌──►│ │ + * ┌───────┐1 │ └────┘ + * │source3├────┘ + * └───────┘ + * + * Given this workflow, and assuming all sources have run data or pinned data, + * it's possible to run `node` with the data of `source1` and `source3` and + * then one more time with the data from `source2`. + * + * It would also be possible to run `node` with the data of `source2` and + * `source3` and then one more time with the data from `source1`. + * + * To improve the determinism of this the connections are sorted by input and + * then by from-node name. + * + * So this will return 2 groups: + * 1. source1 and source3 + * 2. source2 + * + * # Example 2: + * ┌───────┐0 + * │source1├────┐ + * └───────┘ │ ┌────┐ + * ┌───────┐1 ├──►│ │ + * │source2├────┘ │node│ + * └───────┘ ┌──►│ │ + * ┌───────┐1 │ └────┘ + * │source3├────┘ + * └───────┘ + * + * Since `source1` has no run data and no pinned data it's skipped in favor of + * `source2` for the for input. + * + * So this will return 1 group: + * 1. source2 and source3 + */ +export function getSourceDataGroups( + graph: DirectedGraph, + node: INode, + runData: IRunData, + pinnedData: IPinData, +): GraphConnection[][] { + const connections = graph.getConnections({ to: node }); + + const sortedConnectionsWithData = []; + + for (const connection of connections) { + const hasData = runData[connection.from.name] || pinnedData[connection.from.name]; + + if (hasData) { + sortedConnectionsWithData.push(connection); + } + } + + sortedConnectionsWithData.sort(sortByInputIndexThenByName); + + const groups: GraphConnection[][] = []; + let currentGroup: GraphConnection[] = []; + let currentInputIndex = -1; + + while (sortedConnectionsWithData.length > 0) { + const connectionWithDataIndex = sortedConnectionsWithData.findIndex( + // eslint-disable-next-line @typescript-eslint/no-loop-func + (c) => c.inputIndex > currentInputIndex, + ); + const connection: GraphConnection | undefined = + sortedConnectionsWithData[connectionWithDataIndex]; + + if (connection === undefined) { + groups.push(currentGroup); + currentGroup = []; + currentInputIndex = -1; + continue; + } + + currentInputIndex = connection.inputIndex; + currentGroup.push(connection); + + if (connectionWithDataIndex >= 0) { + sortedConnectionsWithData.splice(connectionWithDataIndex, 1); + } + } + + groups.push(currentGroup); + + return groups; +} diff --git a/packages/core/src/PartialExecutionUtils/index.ts b/packages/core/src/PartialExecutionUtils/index.ts new file mode 100644 index 0000000000..6a6f1a233a --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/index.ts @@ -0,0 +1,6 @@ +export { DirectedGraph } from './DirectedGraph'; +export { findTriggerForPartialExecution } from './findTriggerForPartialExecution'; +export { findStartNodes } from './findStartNodes'; +export { findSubgraph } from './findSubgraph'; +export { findCycles } from './findCycles'; +export { recreateNodeExecutionStack } from './recreateNodeExecutionStack'; diff --git a/packages/core/src/PartialExecutionUtils/recreateNodeExecutionStack.ts b/packages/core/src/PartialExecutionUtils/recreateNodeExecutionStack.ts new file mode 100644 index 0000000000..b1e3334440 --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/recreateNodeExecutionStack.ts @@ -0,0 +1,180 @@ +import { + NodeConnectionType, + type IExecuteData, + type INode, + type INodeExecutionData, + type IPinData, + type IRunData, + type ISourceData, + type ITaskDataConnectionsSource, + type IWaitingForExecution, + type IWaitingForExecutionSource, +} from 'n8n-workflow'; + +import * as a from 'assert/strict'; +import type { DirectedGraph } from './DirectedGraph'; +import { getIncomingData } from './getIncomingData'; +import { getSourceDataGroups } from './getSourceDataGroups'; + +/** + * Recreates the node execution stack, waiting executions and waiting + * execution sources from a directed graph, start nodes, the destination node, + * run and pinned data. + * + * This function aims to be able to recreate the internal state of the + * WorkflowExecute class at any point of time during an execution based on the + * data that is already available. Specifically it will recreate the + * `WorkflowExecute.runExecutionData.executionData` properties. + * + * This allows "restarting" an execution and having it only execute what's + * necessary to be able to execute the destination node accurately, e.g. as + * close as possible to what would happen in a production execution. + */ +export function recreateNodeExecutionStack( + graph: DirectedGraph, + startNodes: INode[], + destinationNode: INode, + runData: IRunData, + pinData: IPinData, +): { + nodeExecutionStack: IExecuteData[]; + waitingExecution: IWaitingForExecution; + waitingExecutionSource: IWaitingForExecutionSource; +} { + // Validate invariants. + + // The graph needs to be free of disabled nodes. If it's not it hasn't been + // passed through findSubgraph2. + for (const node of graph.getNodes().values()) { + a.notEqual( + node.disabled, + true, + `Graph contains disabled nodes. This is not supported. Make sure to pass the graph through "findSubgraph2" before calling "recreateNodeExecutionStack". The node in question is "${node.name}"`, + ); + } + + // Initialize the nodeExecutionStack and waitingExecution with + // the data from runData + const nodeExecutionStack: IExecuteData[] = []; + const waitingExecution: IWaitingForExecution = {}; + const waitingExecutionSource: IWaitingForExecutionSource = {}; + + // TODO: Don't hard code this! + const runIndex = 0; + + for (const startNode of startNodes) { + const incomingStartNodeConnections = graph + .getDirectParents(startNode) + .filter((c) => c.type === NodeConnectionType.Main); + + let incomingData: INodeExecutionData[][] = []; + let incomingSourceData: ITaskDataConnectionsSource | null = null; + + if (incomingStartNodeConnections.length === 0) { + incomingData.push([{ json: {} }]); + + const executeData: IExecuteData = { + node: startNode, + data: { main: incomingData }, + source: incomingSourceData, + }; + + nodeExecutionStack.push(executeData); + } else { + const sourceDataSets = getSourceDataGroups(graph, startNode, runData, pinData); + + for (const sourceData of sourceDataSets) { + incomingData = []; + + incomingSourceData = { main: [] }; + + for (const incomingConnection of sourceData) { + const node = incomingConnection.from; + + if (pinData[node.name]) { + incomingData.push(pinData[node.name]); + } else { + a.ok( + runData[node.name], + `Start node(${incomingConnection.to.name}) has an incoming connection with no run or pinned data. This is not supported. The connection in question is "${node.name}->${startNode.name}". Are you sure the start nodes come from the "findStartNodes" function?`, + ); + + const nodeIncomingData = getIncomingData( + runData, + node.name, + runIndex, + incomingConnection.type, + incomingConnection.outputIndex, + ); + + if (nodeIncomingData) { + incomingData.push(nodeIncomingData); + } + } + + incomingSourceData.main.push({ + previousNode: incomingConnection.from.name, + previousNodeOutput: incomingConnection.outputIndex, + previousNodeRun: 0, + }); + } + + const executeData: IExecuteData = { + node: startNode, + data: { main: incomingData }, + source: incomingSourceData, + }; + + nodeExecutionStack.push(executeData); + } + } + + // TODO: Do we need this? + if (destinationNode) { + const destinationNodeName = destinationNode.name; + // Check if the destinationNode has to be added as waiting + // because some input data is already fully available + const incomingDestinationNodeConnections = graph + .getDirectParents(destinationNode) + .filter((c) => c.type === NodeConnectionType.Main); + if (incomingDestinationNodeConnections !== undefined) { + for (const connection of incomingDestinationNodeConnections) { + if (waitingExecution[destinationNodeName] === undefined) { + waitingExecution[destinationNodeName] = {}; + waitingExecutionSource[destinationNodeName] = {}; + } + if (waitingExecution[destinationNodeName][runIndex] === undefined) { + waitingExecution[destinationNodeName][runIndex] = {}; + waitingExecutionSource[destinationNodeName][runIndex] = {}; + } + if (waitingExecution[destinationNodeName][runIndex][connection.type] === undefined) { + waitingExecution[destinationNodeName][runIndex][connection.type] = []; + waitingExecutionSource[destinationNodeName][runIndex][connection.type] = []; + } + + if (runData[connection.from.name] !== undefined) { + // Input data exists so add as waiting + // incomingDataDestination.push(runData[connection.node!][runIndex].data![connection.type][connection.index]); + waitingExecution[destinationNodeName][runIndex][connection.type].push( + runData[connection.from.name][runIndex].data![connection.type][connection.inputIndex], + ); + waitingExecutionSource[destinationNodeName][runIndex][connection.type].push({ + previousNode: connection.from.name, + previousNodeOutput: connection.inputIndex || undefined, + previousNodeRun: runIndex || undefined, + } as ISourceData); + } else { + waitingExecution[destinationNodeName][runIndex][connection.type].push(null); + waitingExecutionSource[destinationNodeName][runIndex][connection.type].push(null); + } + } + } + } + } + + return { + nodeExecutionStack, + waitingExecution, + waitingExecutionSource, + }; +} diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index fe4d60587b..e0898225e9 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -49,6 +49,16 @@ import { import get from 'lodash/get'; import * as NodeExecuteFunctions from './NodeExecuteFunctions'; +import * as assert from 'assert/strict'; +import { recreateNodeExecutionStack } from './PartialExecutionUtils/recreateNodeExecutionStack'; +import { + DirectedGraph, + findCycles, + findStartNodes, + findSubgraph, + findTriggerForPartialExecution, +} from './PartialExecutionUtils'; + export class WorkflowExecute { private status: ExecutionStatus = 'new'; @@ -305,6 +315,82 @@ export class WorkflowExecute { return this.processRunExecutionData(workflow); } + // IMPORTANT: Do not add "async" to this function, it will then convert the + // PCancelable to a regular Promise and does so not allow canceling + // active executions anymore + // eslint-disable-next-line @typescript-eslint/promise-function-async + runPartialWorkflow2( + workflow: Workflow, + runData: IRunData, + destinationNodeName?: string, + pinData?: IPinData, + ): PCancelable { + // TODO: Refactor the call-site to make `destinationNodeName` a required + // after removing the old partial execution flow. + assert.ok( + destinationNodeName, + 'a destinationNodeName is required for the new partial execution flow', + ); + + const destinationNode = workflow.getNode(destinationNodeName); + assert.ok( + destinationNode, + `Could not find a node with the name ${destinationNodeName} in the workflow.`, + ); + + // 1. Find the Trigger + const trigger = findTriggerForPartialExecution(workflow, destinationNodeName); + if (trigger === undefined) { + throw new ApplicationError( + 'The destination node is not connected to any trigger. Partial executions need a trigger.', + ); + } + + // 2. Find the Subgraph + const subgraph = findSubgraph(DirectedGraph.fromWorkflow(workflow), destinationNode, trigger); + const filteredNodes = subgraph.getNodes(); + + // 3. Find the Start Nodes + const startNodes = findStartNodes(subgraph, trigger, destinationNode, runData); + + // 4. Detect Cycles + const cycles = findCycles(workflow); + + // 5. Handle Cycles + if (cycles.length) { + // TODO: handle + } + + // 6. Clean Run Data + // TODO: + + // 7. Recreate Execution Stack + const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + recreateNodeExecutionStack(subgraph, startNodes, destinationNode, runData, pinData ?? {}); + + // 8. Execute + this.status = 'running'; + this.runExecutionData = { + startData: { + destinationNode: destinationNodeName, + runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name), + }, + resultData: { + runData, + pinData, + }, + executionData: { + contextData: {}, + nodeExecutionStack, + metadata: {}, + waitingExecution, + waitingExecutionSource, + }, + }; + + return this.processRunExecutionData(subgraph.toWorkflow({ ...workflow })); + } + /** * Executes the hook with the given name * diff --git a/packages/editor-ui/src/composables/useRunWorkflow.ts b/packages/editor-ui/src/composables/useRunWorkflow.ts index 5e06e1d203..a6caeafc77 100644 --- a/packages/editor-ui/src/composables/useRunWorkflow.ts +++ b/packages/editor-ui/src/composables/useRunWorkflow.ts @@ -34,6 +34,7 @@ import { useI18n } from '@/composables/useI18n'; import { get } from 'lodash-es'; import { useExecutionsStore } from '@/stores/executions.store'; import type { PushPayload } from '@n8n/api-types'; +import { useLocalStorage } from '@vueuse/core'; export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType }) { const nodeHelpers = useNodeHelpers(); @@ -213,9 +214,15 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType & { settings: NonNullable } = { name: '', @@ -99,6 +100,10 @@ let cachedWorkflow: Workflow | null = null; export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => { const uiStore = useUIStore(); + // -1 means the backend chooses the default + // 0 is the old flow + // 1 is the new flow + const partialExecutionVersion = useLocalStorage('PartialExecution.version', -1); const workflow = ref(createEmptyWorkflow()); const usedCredentials = ref>({}); @@ -1390,7 +1395,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => { return await makeRestApiRequest( rootStore.restApiContext, 'POST', - `/workflows/${startRunData.workflowData.id}/run`, + `/workflows/${startRunData.workflowData.id}/run?partialExecutionVersion=${partialExecutionVersion.value}`, startRunData as unknown as IDataObject, ); } catch (error) { diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 17f24c3096..0d56f79009 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -2150,6 +2150,10 @@ export interface IWorkflowExecutionDataProcess { destinationNode?: string; restartExecutionId?: string; executionMode: WorkflowExecuteMode; + /** + * The data that is sent in the body of the webhook that started this + * execution. + */ executionData?: IRunExecutionData; runData?: IRunData; pinData?: IPinData; @@ -2159,6 +2163,15 @@ export interface IWorkflowExecutionDataProcess { workflowData: IWorkflowBase; userId?: string; projectId?: string; + /** + * Defines which version of the partial execution flow is used. + * Possible values are: + * 0 - use the old flow + * 1 - use the new flow + * -1 - the backend chooses which flow based on the environment variable + * PARTIAL_EXECUTION_VERSION_DEFAULT + */ + partialExecutionVersion?: string; } export interface ExecuteWorkflowOptions {