feat: Implement new partial execution logic for acyclic workflows (no-changelog) (#10256)

Co-authored-by: Tomi Turtiainen <10324676+tomi@users.noreply.github.com>
This commit is contained in:
Danny Martini 2024-09-18 15:06:36 +02:00 committed by GitHub
parent 73f89ef101
commit 2a084f96f8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 2367 additions and 20 deletions

View file

@ -503,7 +503,7 @@ describe('Execution', () => {
workflowPage.getters.clearExecutionDataButton().should('be.visible'); workflowPage.getters.clearExecutionDataButton().should('be.visible');
cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun'); cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun');
workflowPage.getters workflowPage.getters
.canvasNodeByName('do something with them') .canvasNodeByName('do something with them')
@ -525,7 +525,7 @@ describe('Execution', () => {
workflowPage.getters.zoomToFitButton().click(); workflowPage.getters.zoomToFitButton().click();
cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun'); cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun');
workflowPage.getters workflowPage.getters
.canvasNodeByName('If') .canvasNodeByName('If')
@ -547,7 +547,7 @@ describe('Execution', () => {
workflowPage.getters.clearExecutionDataButton().should('be.visible'); workflowPage.getters.clearExecutionDataButton().should('be.visible');
cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun'); cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun');
workflowPage.getters workflowPage.getters
.canvasNodeByName('NoOp2') .canvasNodeByName('NoOp2')
@ -576,7 +576,7 @@ describe('Execution', () => {
it('should successfully execute partial executions with nodes attached to the second output', () => { it('should successfully execute partial executions with nodes attached to the second output', () => {
cy.createFixtureWorkflow('Test_Workflow_pairedItem_incomplete_manual_bug.json'); cy.createFixtureWorkflow('Test_Workflow_pairedItem_incomplete_manual_bug.json');
cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun'); cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun');
workflowPage.getters.zoomToFitButton().click(); workflowPage.getters.zoomToFitButton().click();
workflowPage.getters.executeWorkflowButton().click(); workflowPage.getters.executeWorkflowButton().click();
@ -596,7 +596,7 @@ describe('Execution', () => {
it('should execute workflow partially up to the node that has issues', () => { it('should execute workflow partially up to the node that has issues', () => {
cy.createFixtureWorkflow('Test_workflow_partial_execution_with_missing_credentials.json'); cy.createFixtureWorkflow('Test_workflow_partial_execution_with_missing_credentials.json');
cy.intercept('POST', '/rest/workflows/**/run').as('workflowRun'); cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowRun');
workflowPage.getters.zoomToFitButton().click(); workflowPage.getters.zoomToFitButton().click();
workflowPage.getters.executeWorkflowButton().click(); workflowPage.getters.executeWorkflowButton().click();

View file

@ -18,7 +18,7 @@ describe('Debug', () => {
it('should be able to debug executions', () => { it('should be able to debug executions', () => {
cy.intercept('GET', '/rest/executions?filter=*').as('getExecutions'); cy.intercept('GET', '/rest/executions?filter=*').as('getExecutions');
cy.intercept('GET', '/rest/executions/*').as('getExecution'); cy.intercept('GET', '/rest/executions/*').as('getExecution');
cy.intercept('POST', '/rest/workflows/**/run').as('postWorkflowRun'); cy.intercept('POST', '/rest/workflows/**/run?**').as('postWorkflowRun');
cy.signinAsOwner(); cy.signinAsOwner();

View file

@ -142,7 +142,7 @@ describe('Editor actions should work', () => {
it('after switching between Editor and Debug', () => { it('after switching between Editor and Debug', () => {
cy.intercept('GET', '/rest/executions?filter=*').as('getExecutions'); cy.intercept('GET', '/rest/executions?filter=*').as('getExecutions');
cy.intercept('GET', '/rest/executions/*').as('getExecution'); cy.intercept('GET', '/rest/executions/*').as('getExecution');
cy.intercept('POST', '/rest/workflows/**/run').as('postWorkflowRun'); cy.intercept('POST', '/rest/workflows/**/run?**').as('postWorkflowRun');
editWorkflowAndDeactivate(); editWorkflowAndDeactivate();
workflowPage.actions.executeWorkflow(); workflowPage.actions.executeWorkflow();

View file

@ -35,7 +35,7 @@ export class WorkflowExecutionsTab extends BasePage {
}, },
createManualExecutions: (count: number) => { createManualExecutions: (count: number) => {
for (let i = 0; i < count; i++) { for (let i = 0; i < count; i++) {
cy.intercept('POST', '/rest/workflows/**/run').as('workflowExecution'); cy.intercept('POST', '/rest/workflows/**/run?**').as('workflowExecution');
workflowPage.actions.executeWorkflow(); workflowPage.actions.executeWorkflow();
cy.wait('@workflowExecution'); cy.wait('@workflowExecution');
} }

View file

@ -89,7 +89,7 @@ export function runMockWorkflowExecution({
}) { }) {
const executionId = nanoid(8); const executionId = nanoid(8);
cy.intercept('POST', '/rest/workflows/**/run', { cy.intercept('POST', '/rest/workflows/**/run?**', {
statusCode: 201, statusCode: 201,
body: { body: {
data: { data: {

View file

@ -640,4 +640,13 @@ export const schema = {
env: 'N8N_PROXY_HOPS', env: 'N8N_PROXY_HOPS',
doc: 'Number of reverse-proxies n8n is running behind', doc: 'Number of reverse-proxies n8n is running behind',
}, },
featureFlags: {
partialExecutionVersionDefault: {
format: String,
default: '0',
env: 'PARTIAL_EXECUTION_VERSION_DEFAULT',
doc: 'Set this to 1 to enable the new partial execution logic by default.',
},
},
}; };

View file

@ -109,7 +109,9 @@ export class WorkflowRunner {
} }
} }
/** Run the workflow */ /** Run the workflow
* @param realtime This is used in queue mode to change the priority of an execution, making sure they are picked up quicker.
*/
async run( async run(
data: IWorkflowExecutionDataProcess, data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean, loadStaticData?: boolean,
@ -278,6 +280,7 @@ export class WorkflowRunner {
data.startNodes === undefined || data.startNodes === undefined ||
data.startNodes.length === 0 data.startNodes.length === 0
) { ) {
// Full Execution
this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, { this.logger.debug(`Execution ID ${executionId} will run executing all nodes.`, {
executionId, executionId,
}); });
@ -294,16 +297,27 @@ export class WorkflowRunner {
data.pinData, data.pinData,
); );
} else { } else {
// Partial Execution
this.logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId }); this.logger.debug(`Execution ID ${executionId} is a partial execution.`, { executionId });
// Execute only the nodes between start and destination nodes // Execute only the nodes between start and destination nodes
const workflowExecute = new WorkflowExecute(additionalData, data.executionMode); const workflowExecute = new WorkflowExecute(additionalData, data.executionMode);
workflowExecution = workflowExecute.runPartialWorkflow(
workflow, if (data.partialExecutionVersion === '1') {
data.runData, workflowExecution = workflowExecute.runPartialWorkflow2(
data.startNodes, workflow,
data.destinationNode, data.runData,
data.pinData, data.destinationNode,
); data.pinData,
);
} else {
workflowExecution = workflowExecute.runPartialWorkflow(
workflow,
data.runData,
data.startNodes,
data.destinationNode,
data.pinData,
);
}
} }
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution); this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);

View file

@ -92,6 +92,7 @@ export class WorkflowExecutionService {
{ workflowData, runData, startNodes, destinationNode }: WorkflowRequest.ManualRunPayload, { workflowData, runData, startNodes, destinationNode }: WorkflowRequest.ManualRunPayload,
user: User, user: User,
pushRef?: string, pushRef?: string,
partialExecutionVersion?: string,
) { ) {
const pinData = workflowData.pinData; const pinData = workflowData.pinData;
const pinnedTrigger = this.selectPinnedActivatorStarter( const pinnedTrigger = this.selectPinnedActivatorStarter(
@ -135,6 +136,7 @@ export class WorkflowExecutionService {
startNodes, startNodes,
workflowData, workflowData,
userId: user.id, userId: user.id,
partialExecutionVersion: partialExecutionVersion ?? '0',
}; };
const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name]; const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name];

View file

@ -43,7 +43,12 @@ export declare namespace WorkflowRequest {
type NewName = AuthenticatedRequest<{}, {}, {}, { name?: string }>; type NewName = AuthenticatedRequest<{}, {}, {}, { name?: string }>;
type ManualRun = AuthenticatedRequest<{ workflowId: string }, {}, ManualRunPayload>; type ManualRun = AuthenticatedRequest<
{ workflowId: string },
{},
ManualRunPayload,
{ partialExecutionVersion?: string }
>;
type Share = AuthenticatedRequest<{ workflowId: string }, {}, { shareWithIds: string[] }>; type Share = AuthenticatedRequest<{ workflowId: string }, {}, { shareWithIds: string[] }>;

View file

@ -405,6 +405,9 @@ export class WorkflowsController {
req.body, req.body,
req.user, req.user,
req.headers['push-ref'] as string, req.headers['push-ref'] as string,
req.query.partialExecutionVersion === '-1'
? config.getEnv('featureFlags.partialExecutionVersionDefault')
: req.query.partialExecutionVersion,
); );
} }

View file

@ -0,0 +1,230 @@
import * as a from 'assert';
import type { IConnections, INode, WorkflowParameters } from 'n8n-workflow';
import { NodeConnectionType, Workflow } from 'n8n-workflow';
export type GraphConnection = {
from: INode;
to: INode;
type: NodeConnectionType;
outputIndex: number;
inputIndex: number;
};
// fromName-outputType-outputIndex-inputIndex-toName
type DirectedGraphKey = `${string}-${NodeConnectionType}-${number}-${number}-${string}`;
/**
* Represents a directed graph as an adjacency list, e.g. one list for the
* vertices and one list for the edges.
* To integrate easier with the n8n codebase vertices are called nodes and
* edges are called connections.
*
* The reason why this exists next to the Workflow class is that the workflow
* class stored the graph in a deeply nested, normalized format. This format
* does not lend itself to editing the graph or build graphs incrementally.
* This closes this gap by having import and export functions:
* `fromWorkflow`, `toWorkflow`.
*
* Thus it allows to do something like this:
* ```ts
* const newWorkflow = DirectedGraph.fromWorkflow(workflow)
* .addNodes(node1, node2)
* .addConnection({ from: node1, to: node2 })
* .toWorkflow(...workflow);
* ```
*/
export class DirectedGraph {
private nodes: Map<string, INode> = new Map();
private connections: Map<DirectedGraphKey, GraphConnection> = new Map();
getNodes() {
return new Map(this.nodes.entries());
}
getConnections(filter: { to?: INode } = {}) {
const filteredCopy: GraphConnection[] = [];
for (const connection of this.connections.values()) {
const toMatches = filter.to ? connection.to === filter.to : true;
if (toMatches) {
filteredCopy.push(connection);
}
}
return filteredCopy;
}
addNode(node: INode) {
this.nodes.set(node.name, node);
return this;
}
addNodes(...nodes: INode[]) {
for (const node of nodes) {
this.addNode(node);
}
return this;
}
addConnection(connectionInput: {
from: INode;
to: INode;
type?: NodeConnectionType;
outputIndex?: number;
inputIndex?: number;
}) {
const { from, to } = connectionInput;
const fromExists = this.nodes.get(from.name) === from;
const toExists = this.nodes.get(to.name) === to;
a.ok(fromExists);
a.ok(toExists);
const connection: GraphConnection = {
...connectionInput,
type: connectionInput.type ?? NodeConnectionType.Main,
outputIndex: connectionInput.outputIndex ?? 0,
inputIndex: connectionInput.inputIndex ?? 0,
};
this.connections.set(this.makeKey(connection), connection);
return this;
}
addConnections(
...connectionInputs: Array<{
from: INode;
to: INode;
type?: NodeConnectionType;
outputIndex?: number;
inputIndex?: number;
}>
) {
for (const connectionInput of connectionInputs) {
this.addConnection(connectionInput);
}
return this;
}
getDirectChildren(node: INode) {
const nodeExists = this.nodes.get(node.name) === node;
a.ok(nodeExists);
const directChildren: GraphConnection[] = [];
for (const connection of this.connections.values()) {
if (connection.from !== node) {
continue;
}
directChildren.push(connection);
}
return directChildren;
}
getDirectParents(node: INode) {
const nodeExists = this.nodes.get(node.name) === node;
a.ok(nodeExists);
const directParents: GraphConnection[] = [];
for (const connection of this.connections.values()) {
if (connection.to !== node) {
continue;
}
directParents.push(connection);
}
return directParents;
}
getConnection(
from: INode,
outputIndex: number,
type: NodeConnectionType,
inputIndex: number,
to: INode,
): GraphConnection | undefined {
return this.connections.get(
this.makeKey({
from,
outputIndex,
type,
inputIndex,
to,
}),
);
}
toWorkflow(parameters: Omit<WorkflowParameters, 'nodes' | 'connections'>): Workflow {
return new Workflow({
...parameters,
nodes: [...this.nodes.values()],
connections: this.toIConnections(),
});
}
static fromWorkflow(workflow: Workflow): DirectedGraph {
const graph = new DirectedGraph();
graph.addNodes(...Object.values(workflow.nodes));
for (const [fromNodeName, iConnection] of Object.entries(workflow.connectionsBySourceNode)) {
const from = workflow.getNode(fromNodeName);
a.ok(from);
for (const [outputType, outputs] of Object.entries(iConnection)) {
for (const [outputIndex, conns] of outputs.entries()) {
for (const conn of conns) {
// TODO: What's with the input type?
const { node: toNodeName, type: _inputType, index: inputIndex } = conn;
const to = workflow.getNode(toNodeName);
a.ok(to);
graph.addConnection({
from,
to,
// TODO: parse outputType instead of casting it
type: outputType as NodeConnectionType,
outputIndex,
inputIndex,
});
}
}
}
}
return graph;
}
private toIConnections() {
const result: IConnections = {};
for (const connection of this.connections.values()) {
const { from, to, type, outputIndex, inputIndex } = connection;
result[from.name] = result[from.name] ?? {
[type]: [],
};
const resultConnection = result[from.name];
resultConnection[type][outputIndex] = resultConnection[type][outputIndex] ?? [];
const group = resultConnection[type][outputIndex];
group.push({
node: to.name,
type,
index: inputIndex,
});
}
return result;
}
private makeKey(connection: GraphConnection): DirectedGraphKey {
return `${connection.from.name}-${connection.type}-${connection.outputIndex}-${connection.inputIndex}-${connection.to.name}`;
}
}

View file

@ -0,0 +1,41 @@
// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/
// If you update the tests, please update the diagrams as well.
// If you add a test, please create a new diagram.
//
// Map
// 0 means the output has no run data
// 1 means the output has run data
// ►► denotes the node that the user wants to execute to
// XX denotes that the node is disabled
// PD denotes that the node has pinned data
import { DirectedGraph } from '../DirectedGraph';
import { createNodeData, defaultWorkflowParameter } from './helpers';
describe('DirectedGraph', () => {
// ┌─────┐ ┌─────┐ ┌─────┐
// ┌─►│node1├───►│node2├──►│node3├─┐
// │ └─────┘ └─────┘ └─────┘ │
// │ │
// └───────────────────────────────┘
test('roundtrip', () => {
// ARRANGE
const node1 = createNodeData({ name: 'Node1' });
const node2 = createNodeData({ name: 'Node2' });
const node3 = createNodeData({ name: 'Node3' });
// ACT
const graph = new DirectedGraph()
.addNodes(node1, node2, node3)
.addConnections(
{ from: node1, to: node2 },
{ from: node2, to: node3 },
{ from: node3, to: node1 },
);
// ASSERT
expect(DirectedGraph.fromWorkflow(graph.toWorkflow({ ...defaultWorkflowParameter }))).toEqual(
graph,
);
});
});

View file

@ -0,0 +1,372 @@
// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/
// If you update the tests, please update the diagrams as well.
// If you add a test, please create a new diagram.
//
// Map
// 0 means the output has no run data
// 1 means the output has run data
// ►► denotes the node that the user wants to execute to
// XX denotes that the node is disabled
// PD denotes that the node has pinned data
import { type IPinData, type IRunData } from 'n8n-workflow';
import { createNodeData, toITaskData } from './helpers';
import { findStartNodes, isDirty } from '../findStartNodes';
import { DirectedGraph } from '../DirectedGraph';
describe('isDirty', () => {
test("if the node has pinned data it's not dirty", () => {
const node = createNodeData({ name: 'Basic Node' });
const pinData: IPinData = {
[node.name]: [{ json: { value: 1 } }],
};
expect(isDirty(node, undefined, pinData)).toBe(false);
});
test("if the node has run data it's not dirty", () => {
const node = createNodeData({ name: 'Basic Node' });
const runData: IRunData = {
[node.name]: [toITaskData([{ data: { value: 1 } }])],
};
expect(isDirty(node, runData)).toBe(false);
});
});
describe('findStartNodes', () => {
// ►►
// ┌───────┐
// │trigger│
// └───────┘
test('finds the start node if there is only a trigger', () => {
const node = createNodeData({ name: 'Basic Node' });
const graph = new DirectedGraph().addNode(node);
const startNodes = findStartNodes(graph, node, node);
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node);
});
// ►►
// ┌───────┐ ┌───────────┐
// │trigger├────►│destination│
// └───────┘ └───────────┘
test('finds the start node in a simple graph', () => {
const trigger = createNodeData({ name: 'trigger' });
const destination = createNodeData({ name: 'destination' });
const graph = new DirectedGraph()
.addNodes(trigger, destination)
.addConnection({ from: trigger, to: destination });
// if the trigger has no run data
{
const startNodes = findStartNodes(graph, trigger, destination);
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(trigger);
}
// if the trigger has run data
{
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
};
const startNodes = findStartNodes(graph, trigger, destination, runData);
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(destination);
}
});
// ┌───────┐ ►►
// │ │1──┐ ┌────┐
// │trigger│ ├─►│node│
// │ │1──┘ └────┘
// └───────┘
// All nodes have run data. `findStartNodes` should return node twice
// because it has 2 input connections.
test('multiple outputs', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(trigger, node)
.addConnections(
{ from: trigger, to: node, outputIndex: 0, inputIndex: 0 },
{ from: trigger, to: node, outputIndex: 1, inputIndex: 0 },
);
const runData: IRunData = {
[trigger.name]: [
toITaskData([
{ data: { value: 1 }, outputIndex: 0 },
{ data: { value: 1 }, outputIndex: 1 },
]),
],
[node.name]: [toITaskData([{ data: { value: 1 } }])],
};
// ACT
const startNodes = findStartNodes(graph, trigger, node, runData);
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node);
});
// ┌─────┐ ┌─────┐ ►►
//┌───────┐ │ ├────┬────────►│ │ ┌─────┐
//│trigger├───►│node1│ │ │node2├────┬───►│node4│
//└───────┘ │ ├────┼────┬───►│ │ │ └─────┘
// └─────┘ │ │ └─────┘ │
// │ │ │
// │ │ │
// │ │ │
// │ │ ┌─────┐ │
// │ └───►│ │ │
// │ │node3├────┘
// └────────►│ │
// └─────┘
test('complex example with multiple outputs and inputs', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const node3 = createNodeData({ name: 'node3' });
const node4 = createNodeData({ name: 'node4' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2, node3, node4)
.addConnections(
{ from: trigger, to: node1 },
{ from: node1, to: node2, outputIndex: 0, inputIndex: 0 },
{ from: node1, to: node2, outputIndex: 1, inputIndex: 1 },
{ from: node1, to: node3, outputIndex: 0, inputIndex: 1 },
{ from: node1, to: node3, outputIndex: 1, inputIndex: 0 },
{ from: node2, to: node4 },
{ from: node3, to: node4 },
);
{
// ACT
const startNodes = findStartNodes(graph, trigger, node4);
// ASSERT
expect(startNodes).toHaveLength(1);
// no run data means the trigger is the start node
expect(startNodes[0]).toEqual(trigger);
}
{
// run data for everything
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
[node1.name]: [toITaskData([{ data: { value: 1 } }])],
[node2.name]: [toITaskData([{ data: { value: 1 } }])],
[node3.name]: [toITaskData([{ data: { value: 1 } }])],
[node4.name]: [toITaskData([{ data: { value: 1 } }])],
};
// ACT
const startNodes = findStartNodes(graph, trigger, node4, runData);
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node4);
}
});
// ►►
// ┌───────┐1 ┌────┐
// │ ├────────►│ │
// │trigger│ │node│
// │ ├────────►│ │
// └───────┘0 └────┘
// The merge node only gets data on one input, so the it should only be once
// in the start nodes
test('multiple connections with the first one having data', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(trigger, node)
.addConnections(
{ from: trigger, to: node, inputIndex: 0, outputIndex: 0 },
{ from: trigger, to: node, inputIndex: 1, outputIndex: 1 },
);
// ACT
const startNodes = findStartNodes(graph, trigger, node, {
[trigger.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 0 }])],
});
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node);
});
// ►►
// ┌───────┐0 ┌────┐
// │ ├────────►│ │
// │trigger│ │node│
// │ ├────────►│ │
// └───────┘1 └────┘
// The merge node only gets data on one input, so the it should only be once
// in the start nodes
test('multiple connections with the second one having data', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(trigger, node)
.addConnections(
{ from: trigger, to: node, inputIndex: 0, outputIndex: 0 },
{ from: trigger, to: node, inputIndex: 1, outputIndex: 1 },
);
// ACT
const startNodes = findStartNodes(graph, trigger, node, {
[trigger.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 1 }])],
});
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node);
});
// ►►
// ┌───────┐1 ┌────┐
// │ ├────────►│ │
// │trigger│ │node│
// │ ├────────►│ │
// └───────┘1 └────┘
// The merge node gets data on both inputs, so the it should be in the start
// nodes twice.
test('multiple connections with both having data', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(trigger, node)
.addConnections(
{ from: trigger, to: node, inputIndex: 0, outputIndex: 0 },
{ from: trigger, to: node, inputIndex: 1, outputIndex: 1 },
);
// ACT
const startNodes = findStartNodes(graph, trigger, node, {
[trigger.name]: [
toITaskData([
{ data: { value: 1 }, outputIndex: 0 },
{ data: { value: 1 }, outputIndex: 1 },
]),
],
});
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node);
});
// ►►
// ┌───────┐ ┌────┐
// │ │1 ┌────►│ │
// │trigger├───┤ │node│
// │ │ └────►│ │
// └───────┘ └────┘
test('multiple connections with both having data', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const node3 = createNodeData({ name: 'node3' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2, node3)
.addConnections(
{ from: trigger, to: node1 },
{ from: trigger, to: node2 },
{ from: node1, to: node3 },
{ from: node2, to: node3 },
);
// ACT
const startNodes = findStartNodes(graph, trigger, node3, {
[trigger.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 0 }])],
[node1.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 0 }])],
[node2.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 0 }])],
});
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node3);
});
// ►►
// ┌───────┐ ┌─────┐0 ┌─────┐
// │ │1 │ ├────────►│ │
// │trigger├───────►│node1│ │node2│
// │ │ │ ├────────►│ │
// └───────┘ └─────┘1 └─────┘
test('multiple connections with trigger', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2)
.addConnections(
{ from: trigger, to: node1 },
{ from: node1, to: node2, outputIndex: 0 },
{ from: node1, to: node2, outputIndex: 1 },
);
// ACT
const startNodes = findStartNodes(graph, node1, node2, {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
[node1.name]: [toITaskData([{ data: { value: 1 }, outputIndex: 1 }])],
});
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node2);
});
// ►►
//┌───────┐1 ┌─────┐1 ┌─────┐
//│Trigger├───┬──►│Node1├───┬─►│Node2│
//└───────┘ │ └─────┘ │ └─────┘
// │ │
// └─────────────┘
test('terminates when called with graph that contains cycles', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2)
.addConnections(
{ from: trigger, to: node1 },
{ from: node1, to: node1 },
{ from: node1, to: node2 },
);
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
[node1.name]: [toITaskData([{ data: { value: 1 } }])],
};
const pinData: IPinData = {};
// ACT
const startNodes = findStartNodes(graph, trigger, node2, runData, pinData);
// ASSERT
expect(startNodes).toHaveLength(1);
expect(startNodes[0]).toEqual(node2);
});
});

View file

@ -0,0 +1,185 @@
// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/
// If you update the tests, please update the diagrams as well.
// If you add a test, please create a new diagram.
//
// Map
// 0 means the output has no run data
// 1 means the output has run data
// ►► denotes the node that the user wants to execute to
// XX denotes that the node is disabled
// PD denotes that the node has pinned data
import { DirectedGraph } from '../DirectedGraph';
import { findSubgraph } from '../findSubgraph';
import { createNodeData } from './helpers';
describe('findSubgraph2', () => {
// ►►
// ┌───────┐ ┌───────────┐
// │trigger├────►│destination│
// └───────┘ └───────────┘
test('simple', () => {
const trigger = createNodeData({ name: 'trigger' });
const destination = createNodeData({ name: 'destination' });
const graph = new DirectedGraph()
.addNodes(trigger, destination)
.addConnections({ from: trigger, to: destination });
const subgraph = findSubgraph(graph, destination, trigger);
expect(subgraph).toEqual(graph);
});
// ►►
// ┌───────┐ ┌───────────┐
// │ ├────────►│ │
// │trigger│ │destination│
// │ ├────────►│ │
// └───────┘ └───────────┘
test('multiple connections', () => {
const ifNode = createNodeData({ name: 'If' });
const noOp = createNodeData({ name: 'noOp' });
const graph = new DirectedGraph()
.addNodes(ifNode, noOp)
.addConnections(
{ from: ifNode, to: noOp, outputIndex: 0 },
{ from: ifNode, to: noOp, outputIndex: 1 },
);
const subgraph = findSubgraph(graph, noOp, ifNode);
expect(subgraph).toEqual(graph);
});
// ►►
// ┌───────┐ ┌───────────┐
// │ ├────────►│ │ ┌────┐
// │trigger│ │destination├─────►│node│
// │ ├────────►│ │ └────┘
// └───────┘ └───────────┘
test('disregard nodes after destination', () => {
const trigger = createNodeData({ name: 'trigger' });
const destination = createNodeData({ name: 'destination' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(trigger, destination, node)
.addConnections({ from: trigger, to: destination }, { from: destination, to: node });
const subgraph = findSubgraph(graph, destination, trigger);
expect(subgraph).toEqual(
new DirectedGraph()
.addNodes(trigger, destination)
.addConnections({ from: trigger, to: destination }),
);
});
// XX
// ┌───────┐ ┌────────┐ ►►
// │ ├────────►│ │ ┌───────────┐
// │trigger│ │disabled├─────►│destination│
// │ ├────────►│ │ └───────────┘
// └───────┘ └────────┘
test('skip disabled nodes', () => {
const trigger = createNodeData({ name: 'trigger' });
const disabled = createNodeData({ name: 'disabled', disabled: true });
const destination = createNodeData({ name: 'destination' });
const graph = new DirectedGraph()
.addNodes(trigger, disabled, destination)
.addConnections({ from: trigger, to: disabled }, { from: disabled, to: destination });
const subgraph = findSubgraph(graph, destination, trigger);
expect(subgraph).toEqual(
new DirectedGraph()
.addNodes(trigger, destination)
.addConnections({ from: trigger, to: destination }),
);
});
// ►►
// ┌───────┐ ┌─────┐ ┌─────┐
// │Trigger├───┬──►│Node1├───┬─►│Node2│
// └───────┘ │ └─────┘ │ └─────┘
// │ │
// └─────────────┘
test('terminates when called with graph that contains cycles', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2)
.addConnections(
{ from: trigger, to: node1 },
{ from: node1, to: node1 },
{ from: node1, to: node2 },
);
// ACT
const subgraph = findSubgraph(graph, node2, trigger);
// ASSERT
expect(subgraph).toEqual(graph);
});
// ►►
// ┌───────┐ ┌─────┐
// │Trigger├──┬─►│Node1│
// └───────┘ │ └─────┘
// │
// ┌─────┐ │
// │Node2├────┘
// └─────┘
test('terminates when called with graph that contains cycles', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2)
.addConnections({ from: trigger, to: node1 }, { from: node2, to: node1 });
// ACT
const subgraph = findSubgraph(graph, node1, trigger);
// ASSERT
expect(subgraph).toEqual(
new DirectedGraph().addNodes(trigger, node1).addConnections({ from: trigger, to: node1 }),
);
});
// ►►
// ┌───────┐ ┌───────────┐ ┌───────────┐
// │Trigger├─┬─►│Destination├──►│AnotherNode├───┐
// └───────┘ │ └───────────┘ └───────────┘ │
// │ │
// └──────────────────────────────────┘
test('terminates if the destination node is part of a cycle', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const destination = createNodeData({ name: 'destination' });
const anotherNode = createNodeData({ name: 'anotherNode' });
const graph = new DirectedGraph()
.addNodes(trigger, destination, anotherNode)
.addConnections(
{ from: trigger, to: destination },
{ from: destination, to: anotherNode },
{ from: anotherNode, to: destination },
);
// ACT
const subgraph = findSubgraph(graph, destination, trigger);
// ASSERT
expect(subgraph).toEqual(
new DirectedGraph()
.addNodes(trigger, destination)
.addConnections({ from: trigger, to: destination }),
);
});
});

View file

@ -0,0 +1,197 @@
// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/
// If you update the tests, please update the diagrams as well.
// If you add a test, please create a new diagram.
//
// Map
// 0 means the output has no run data
// 1 means the output has run data
// PD denotes that the node has pinned data
import type { IPinData } from 'n8n-workflow';
import { NodeConnectionType, type IRunData } from 'n8n-workflow';
import { DirectedGraph } from '../DirectedGraph';
import { createNodeData, toITaskData } from './helpers';
import { getSourceDataGroups } from '../getSourceDataGroups';
describe('getSourceDataGroups', () => {
//┌───────┐1
//│source1├────┐
//└───────┘ │ ┌────┐
//┌───────┐1 ├──►│ │
//│source2├────┘ │node│
//└───────┘ ┌──►│ │
//┌───────┐1 │ └────┘
//│source3├────┘
//└───────┘
it('groups sources into possibly complete sets if all of them have data', () => {
// ARRANGE
const source1 = createNodeData({ name: 'source1' });
const source2 = createNodeData({ name: 'source2' });
const source3 = createNodeData({ name: 'source3' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(source1, source2, source3, node)
.addConnections(
{ from: source1, to: node, inputIndex: 0 },
{ from: source2, to: node, inputIndex: 0 },
{ from: source3, to: node, inputIndex: 1 },
);
const runData: IRunData = {
[source1.name]: [toITaskData([{ data: { value: 1 } }])],
[source2.name]: [toITaskData([{ data: { value: 1 } }])],
[source3.name]: [toITaskData([{ data: { value: 1 } }])],
};
const pinnedData: IPinData = {};
// ACT
const groups = getSourceDataGroups(graph, node, runData, pinnedData);
// ASSERT
expect(groups).toHaveLength(2);
const group1 = groups[0];
expect(group1).toHaveLength(2);
expect(group1[0]).toEqual({
from: source1,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
expect(group1[1]).toEqual({
from: source3,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 1,
to: node,
});
const group2 = groups[1];
expect(group2).toHaveLength(1);
expect(group2[0]).toEqual({
from: source2,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
});
//┌───────┐PD
//│source1├────┐
//└───────┘ │ ┌────┐
//┌───────┐PD ├──►│ │
//│source2├────┘ │node│
//└───────┘ ┌──►│ │
//┌───────┐PD │ └────┘
//│source3├────┘
//└───────┘
it('groups sources into possibly complete sets if all of them have data', () => {
// ARRANGE
const source1 = createNodeData({ name: 'source1' });
const source2 = createNodeData({ name: 'source2' });
const source3 = createNodeData({ name: 'source3' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(source1, source2, source3, node)
.addConnections(
{ from: source1, to: node, inputIndex: 0 },
{ from: source2, to: node, inputIndex: 0 },
{ from: source3, to: node, inputIndex: 1 },
);
const runData: IRunData = {};
const pinnedData: IPinData = {
[source1.name]: [{ json: { value: 1 } }],
[source2.name]: [{ json: { value: 2 } }],
[source3.name]: [{ json: { value: 3 } }],
};
// ACT
const groups = getSourceDataGroups(graph, node, runData, pinnedData);
// ASSERT
expect(groups).toHaveLength(2);
const group1 = groups[0];
expect(group1).toHaveLength(2);
expect(group1[0]).toEqual({
from: source1,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
expect(group1[1]).toEqual({
from: source3,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 1,
to: node,
});
const group2 = groups[1];
expect(group2).toHaveLength(1);
expect(group2[0]).toEqual({
from: source2,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
});
//┌───────┐0
//│source1├────┐
//└───────┘ │ ┌────┐
//┌───────┐1 ├──►│ │
//│source2├────┘ │node│
//└───────┘ ┌──►│ │
//┌───────┐1 │ └────┘
//│source3├────┘
//└───────┘
it('groups sources into possibly complete sets if all of them have data', () => {
// ARRANGE
const source1 = createNodeData({ name: 'source1' });
const source2 = createNodeData({ name: 'source2' });
const source3 = createNodeData({ name: 'source3' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(source1, source2, source3, node)
.addConnections(
{ from: source1, to: node, inputIndex: 0 },
{ from: source2, to: node, inputIndex: 0 },
{ from: source3, to: node, inputIndex: 1 },
);
const runData: IRunData = {
[source2.name]: [toITaskData([{ data: { value: 1 } }])],
[source3.name]: [toITaskData([{ data: { value: 1 } }])],
};
const pinnedData: IPinData = {};
// ACT
const groups = getSourceDataGroups(graph, node, runData, pinnedData);
// ASSERT
expect(groups).toHaveLength(1);
const group1 = groups[0];
expect(group1).toHaveLength(2);
expect(group1[0]).toEqual({
from: source2,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 0,
to: node,
});
expect(group1[1]).toEqual({
from: source3,
outputIndex: 0,
type: NodeConnectionType.Main,
inputIndex: 1,
to: node,
});
});
});

View file

@ -0,0 +1,99 @@
import { NodeConnectionType } from 'n8n-workflow';
import type { INodeParameters, INode, ITaskData, IDataObject, IConnections } from 'n8n-workflow';
interface StubNode {
name: string;
parameters?: INodeParameters;
disabled?: boolean;
}
export function createNodeData(stubData: StubNode): INode {
return {
name: stubData.name,
parameters: stubData.parameters ?? {},
type: 'test.set',
typeVersion: 1,
id: 'uuid-1234',
position: [100, 100],
disabled: stubData.disabled ?? false,
};
}
type TaskData = {
data: IDataObject;
outputIndex?: number;
nodeConnectionType?: NodeConnectionType;
};
export function toITaskData(taskData: TaskData[]): ITaskData {
const result: ITaskData = {
executionStatus: 'success',
executionTime: 0,
startTime: 0,
source: [],
data: {},
};
// NOTE: Here to make TS happy.
result.data = result.data ?? {};
for (const taskDatum of taskData) {
const type = taskDatum.nodeConnectionType ?? NodeConnectionType.Main;
const outputIndex = taskDatum.outputIndex ?? 0;
result.data[type] = result.data[type] ?? [];
const dataConnection = result.data[type];
dataConnection[outputIndex] = [{ json: taskDatum.data }];
}
for (const [type, dataConnection] of Object.entries(result.data)) {
for (const [index, maybe] of dataConnection.entries()) {
result.data[type][index] = maybe ?? null;
}
}
return result;
}
export const nodeTypes = {
getByName: jest.fn(),
getByNameAndVersion: jest.fn(),
getKnownTypes: jest.fn(),
};
export const defaultWorkflowParameter = {
active: false,
nodeTypes,
};
type Connection = {
from: INode;
to: INode;
type?: NodeConnectionType;
outputIndex?: number;
inputIndex?: number;
};
export function toIConnections(connections: Connection[]): IConnections {
const result: IConnections = {};
for (const connection of connections) {
const type = connection.type ?? NodeConnectionType.Main;
const outputIndex = connection.outputIndex ?? 0;
const inputIndex = connection.inputIndex ?? 0;
result[connection.from.name] = result[connection.from.name] ?? {
[type]: [],
};
const resultConnection = result[connection.from.name];
resultConnection[type][outputIndex] = resultConnection[type][outputIndex] ?? [];
const group = resultConnection[type][outputIndex];
group.push({
node: connection.to.name,
type,
index: inputIndex,
});
}
return result;
}

View file

@ -0,0 +1,333 @@
// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/
// If you update the tests, please update the diagrams as well.
// If you add a test, please create a new diagram.
//
// Map
// 0 means the output has no run data
// 1 means the output has run data
// ►► denotes the node that the user wants to execute to
// XX denotes that the node is disabled
// PD denotes that the node has pinned data
import { recreateNodeExecutionStack } from '@/PartialExecutionUtils/recreateNodeExecutionStack';
import { type IPinData, type IRunData } from 'n8n-workflow';
import { AssertionError } from 'assert';
import { DirectedGraph } from '../DirectedGraph';
import { findSubgraph } from '../findSubgraph';
import { createNodeData, toITaskData } from './helpers';
describe('recreateNodeExecutionStack', () => {
// ►►
// ┌───────┐1 ┌────┐
// │Trigger├──────►│Node│
// └───────┘ └────┘
test('all nodes except destination node have data', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node = createNodeData({ name: 'node' });
const graph = new DirectedGraph()
.addNodes(trigger, node)
.addConnections({ from: trigger, to: node });
const workflow = findSubgraph(graph, node, trigger);
const startNodes = [node];
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
};
const pinData = {};
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(1);
expect(nodeExecutionStack).toEqual([
{
data: { main: [[{ json: { value: 1 } }]] },
node,
source: {
main: [
{
// TODO: not part of ISourceDate, but maybe it should be?
//currentNodeInput: 0,
previousNode: 'trigger',
previousNodeOutput: 0,
previousNodeRun: 0,
},
],
},
},
]);
expect(waitingExecution).toEqual({ node: { '0': { main: [[{ json: { value: 1 } }]] } } });
expect(waitingExecutionSource).toEqual({
node: {
'0': {
main: [
{ previousNode: 'trigger', previousNodeOutput: undefined, previousNodeRun: undefined },
],
},
},
});
});
// ►►
// ┌───────┐0 ┌────┐
// │Trigger├──────►│Node│
// └───────┘ └────┘
test('no nodes have data', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node = createNodeData({ name: 'node' });
const workflow = new DirectedGraph()
.addNodes(trigger, node)
.addConnections({ from: trigger, to: node });
const startNodes = [trigger];
const runData: IRunData = {};
const pinData: IPinData = {};
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(1);
expect(nodeExecutionStack).toEqual([
{
data: { main: [[{ json: {} }]] },
node: trigger,
source: null,
},
]);
expect(waitingExecution).toEqual({ node: { '0': { main: [null] } } });
expect(waitingExecutionSource).toEqual({ node: { '0': { main: [null] } } });
});
// PinData ►►
// ┌───────┐1 ┌────┐
// │Trigger├──────►│Node│
// └───────┘ └────┘
test('node before destination node has pinned data', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node = createNodeData({ name: 'node' });
const workflow = new DirectedGraph()
.addNodes(trigger, node)
.addConnections({ from: trigger, to: node });
const startNodes = [node];
const runData: IRunData = {};
const pinData: IPinData = {
[trigger.name]: [{ json: { value: 1 } }],
};
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(workflow, startNodes, node, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(1);
expect(nodeExecutionStack).toEqual([
{
data: { main: [[{ json: { value: 1 } }]] },
node,
source: {
main: [
{
// TODO: not part of ISourceDate, but maybe it should be?
//currentNodeInput: 0,
previousNode: trigger.name,
previousNodeRun: 0,
previousNodeOutput: 0,
},
],
},
},
]);
expect(waitingExecution).toEqual({ node: { '0': { main: [null] } } });
expect(waitingExecutionSource).toEqual({ node: { '0': { main: [null] } } });
});
// XX ►►
// ┌───────┐1 ┌─────┐ ┌─────┐
// │Trigger├─────►│Node1├──────►│Node2│
// └───────┘ └─────┘ └─────┘
test('throws if a disabled node is found', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1', disabled: true });
const node2 = createNodeData({ name: 'node2' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2)
.addConnections({ from: trigger, to: node1 }, { from: node1, to: node2 });
const startNodes = [node2];
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
};
const pinData = {};
// ACT & ASSERT
expect(() =>
recreateNodeExecutionStack(graph, startNodes, node2, runData, pinData),
).toThrowError(AssertionError);
});
// ►►
// ┌───────┐1 ┌─────┐1 ┌─────┐
// │Trigger├──┬──►│Node1├──┬───►│Node3│
// └───────┘ │ └─────┘ │ └─────┘
// │ │
// │ ┌─────┐1 │
// └──►│Node2├──┘
// └─────┘
test('multiple incoming connections', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const node3 = createNodeData({ name: 'node3' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2, node3)
.addConnections(
{ from: trigger, to: node1 },
{ from: trigger, to: node2 },
{ from: node1, to: node3 },
{ from: node2, to: node3 },
);
const startNodes = [node3];
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
[node1.name]: [toITaskData([{ data: { value: 1 } }])],
[node2.name]: [toITaskData([{ data: { value: 1 } }])],
};
const pinData = {};
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, startNodes, node3, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toEqual([
{
data: { main: [[{ json: { value: 1 } }]] },
node: node3,
source: {
main: [
{
// TODO: not part of ISourceDate, but maybe it should be?
//currentNodeInput: 0,
previousNode: 'node1',
previousNodeOutput: 0,
previousNodeRun: 0,
},
],
},
},
{
data: { main: [[{ json: { value: 1 } }]] },
node: node3,
source: {
main: [
{
// TODO: not part of ISourceDate, but maybe it should be?
//currentNodeInput: 0,
previousNode: 'node2',
previousNodeOutput: 0,
previousNodeRun: 0,
},
],
},
},
]);
expect(waitingExecution).toEqual({
node3: { '0': { main: [[{ json: { value: 1 } }], [{ json: { value: 1 } }]] } },
});
expect(waitingExecutionSource).toEqual({
node3: {
'0': {
main: [
{ previousNode: 'node1', previousNodeOutput: undefined, previousNodeRun: undefined },
{ previousNode: 'node2', previousNodeOutput: undefined, previousNodeRun: undefined },
],
},
},
});
});
// ┌─────┐1 ►►
// ┌─►│node1├───┐ ┌─────┐
// ┌───────┐1 │ └─────┘ └──►│ │
// │Trigger├──┤ │node3│
// └───────┘ │ ┌─────┐1 ┌──►│ │
// └─►│node2├───┘ └─────┘
// └─────┘
test('multiple inputs', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const node1 = createNodeData({ name: 'node1' });
const node2 = createNodeData({ name: 'node2' });
const node3 = createNodeData({ name: 'node3' });
const graph = new DirectedGraph()
.addNodes(trigger, node1, node2, node3)
.addConnections(
{ from: trigger, to: node1 },
{ from: trigger, to: node2 },
{ from: node1, to: node3, inputIndex: 0 },
{ from: node2, to: node3, inputIndex: 1 },
);
const startNodes = [node3];
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
[node1.name]: [toITaskData([{ data: { value: 1 } }])],
[node2.name]: [toITaskData([{ data: { value: 1 } }])],
};
const pinData: IPinData = {
[trigger.name]: [{ json: { value: 1 } }],
};
// ACT
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(graph, startNodes, node3, runData, pinData);
// ASSERT
expect(nodeExecutionStack).toHaveLength(1);
expect(nodeExecutionStack[0]).toEqual({
data: { main: [[{ json: { value: 1 } }], [{ json: { value: 1 } }]] },
node: node3,
source: {
main: [
{ previousNode: 'node1', previousNodeOutput: 0, previousNodeRun: 0 },
{ previousNode: 'node2', previousNodeOutput: 0, previousNodeRun: 0 },
],
},
});
expect(waitingExecution).toEqual({
node3: {
'0': {
main: [[{ json: { value: 1 } }]],
},
},
});
expect(waitingExecutionSource).toEqual({
node3: {
'0': {
main: [
{ previousNode: 'node1', previousNodeOutput: undefined, previousNodeRun: undefined },
{ previousNode: 'node2', previousNodeOutput: 1, previousNodeRun: undefined },
],
},
},
});
});
});

View file

@ -0,0 +1,26 @@
import { NodeConnectionType } from 'n8n-workflow';
import { createNodeData, toIConnections } from './helpers';
test('toIConnections', () => {
const node1 = createNodeData({ name: 'Basic Node 1' });
const node2 = createNodeData({ name: 'Basic Node 2' });
expect(
toIConnections([{ from: node1, to: node2, type: NodeConnectionType.Main, outputIndex: 0 }]),
).toEqual({
[node1.name]: {
// output group
main: [
// first output
[
// first connection
{
node: node2.name,
type: NodeConnectionType.Main,
index: 0,
},
],
],
},
});
});

View file

@ -0,0 +1,64 @@
import { NodeConnectionType } from 'n8n-workflow';
import { toITaskData } from './helpers';
test('toITaskData', function () {
expect(toITaskData([{ data: { value: 1 } }])).toEqual({
executionStatus: 'success',
executionTime: 0,
source: [],
startTime: 0,
data: {
main: [[{ json: { value: 1 } }]],
},
});
expect(toITaskData([{ data: { value: 1 }, outputIndex: 1 }])).toEqual({
executionStatus: 'success',
executionTime: 0,
source: [],
startTime: 0,
data: {
main: [null, [{ json: { value: 1 } }]],
},
});
expect(
toITaskData([
{ data: { value: 1 }, outputIndex: 1, nodeConnectionType: NodeConnectionType.AiAgent },
]),
).toEqual({
executionStatus: 'success',
executionTime: 0,
source: [],
startTime: 0,
data: {
[NodeConnectionType.AiAgent]: [null, [{ json: { value: 1 } }]],
},
});
expect(
toITaskData([
{ data: { value: 1 }, outputIndex: 0 },
{ data: { value: 2 }, outputIndex: 1 },
]),
).toEqual({
executionStatus: 'success',
executionTime: 0,
startTime: 0,
source: [],
data: {
main: [
[
{
json: { value: 1 },
},
],
[
{
json: { value: 2 },
},
],
],
},
});
});

View file

@ -0,0 +1,6 @@
import type { Workflow } from 'n8n-workflow';
export function findCycles(_workflow: Workflow) {
// TODO: implement depth first search or Tarjan's Algorithm
return [];
}

View file

@ -0,0 +1,153 @@
import type { INode, IPinData, IRunData } from 'n8n-workflow';
import type { DirectedGraph } from './DirectedGraph';
import { getIncomingData } from './getIncomingData';
/**
* A node is dirty if either of the following is true:
* - it's properties or options changed since last execution (not implemented yet)
* - one of it's parents is disabled
* - it has an error (not implemented yet)
* - it neither has run data nor pinned data
*/
export function isDirty(node: INode, runData: IRunData = {}, pinData: IPinData = {}): boolean {
// TODO: implement
const propertiesOrOptionsChanged = false;
if (propertiesOrOptionsChanged) {
return true;
}
// TODO: implement
const parentNodeGotDisabled = false;
if (parentNodeGotDisabled) {
return true;
}
// TODO: implement
const hasAnError = false;
if (hasAnError) {
return true;
}
const hasPinnedData = pinData[node.name] !== undefined;
if (hasPinnedData) {
return false;
}
const hasRunData = runData?.[node.name];
if (hasRunData) {
return false;
}
return true;
}
function findStartNodesRecursive(
graph: DirectedGraph,
current: INode,
destination: INode,
runData: IRunData,
pinData: IPinData,
startNodes: Set<INode>,
seen: Set<INode>,
): Set<INode> {
const nodeIsDirty = isDirty(current, runData, pinData);
// If the current node is dirty stop following this branch, we found a start
// node.
if (nodeIsDirty) {
startNodes.add(current);
return startNodes;
}
// If the current node is the destination node stop following this branch, we
// found a start node.
if (current === destination) {
startNodes.add(current);
return startNodes;
}
// If we detect a cycle stop following the branch, there is no start node on
// this branch.
if (seen.has(current)) {
return startNodes;
}
// Recurse with every direct child that is part of the sub graph.
const outGoingConnections = graph.getDirectChildren(current);
for (const outGoingConnection of outGoingConnections) {
const nodeRunData = getIncomingData(
runData,
outGoingConnection.from.name,
// NOTE: It's always 0 until I fix the bug that removes the run data for
// old runs. The FE only sends data for one run for each node.
0,
outGoingConnection.type,
outGoingConnection.outputIndex,
);
// If the node has multiple outputs, only follow the outputs that have run data.
const hasNoRunData =
nodeRunData === null || nodeRunData === undefined || nodeRunData.length === 0;
if (hasNoRunData) {
continue;
}
findStartNodesRecursive(
graph,
outGoingConnection.to,
destination,
runData,
pinData,
startNodes,
new Set(seen).add(current),
);
}
return startNodes;
}
/**
* The start node is the node from which a partial execution starts. The start
* node will be executed or re-executed.
* The nodes are found by traversing the graph from the trigger to the
* destination and finding the earliest dirty nodes on every branch.
*
* The algorithm is:
* Starting from the trigger node.
*
* 1. if the current node is not a trigger and has no input data (on all
* connections) (not implemented yet, possibly not necessary)
* - stop following this branch, there is no start node on this branch
* 2. If the current node is dirty, or is the destination node
* - stop following this branch, we found a start node
* 3. If we detect a cycle
* - stop following the branch, there is no start node on this branch
* 4. Recurse with every direct child that is part of the sub graph
*/
export function findStartNodes(
graph: DirectedGraph,
trigger: INode,
destination: INode,
runData: IRunData = {},
pinData: IPinData = {},
): INode[] {
const startNodes = findStartNodesRecursive(
graph,
trigger,
destination,
runData,
pinData,
// start nodes found
new Set(),
// seen
new Set(),
);
return [...startNodes];
}

View file

@ -0,0 +1,113 @@
import type { INode } from 'n8n-workflow';
import type { GraphConnection } from './DirectedGraph';
import { DirectedGraph } from './DirectedGraph';
function findSubgraphRecursive(
graph: DirectedGraph,
destinationNode: INode,
current: INode,
trigger: INode,
newGraph: DirectedGraph,
currentBranch: GraphConnection[],
) {
// If the current node is the chosen trigger keep this branch.
if (current === trigger) {
for (const connection of currentBranch) {
newGraph.addNodes(connection.from, connection.to);
newGraph.addConnection(connection);
}
return;
}
let parentConnections = graph.getDirectParents(current);
// If the current node has no parents, dont keep this branch.
if (parentConnections.length === 0) {
return;
}
// If the current node is the destination node again, dont keep this branch.
const isCycleWithDestinationNode =
current === destinationNode && currentBranch.some((c) => c.to === destinationNode);
if (isCycleWithDestinationNode) {
return;
}
// If the current node was already visited, keep this branch.
const isCycleWithCurrentNode = currentBranch.some((c) => c.to === current);
if (isCycleWithCurrentNode) {
// TODO: write function that adds nodes when adding connections
for (const connection of currentBranch) {
newGraph.addNodes(connection.from, connection.to);
newGraph.addConnection(connection);
}
return;
}
// If the current node is disabled, dont keep this node, but keep the
// branch.
// Take every incoming connection and connect it to every node that is
// connected to the current nodes first output
if (current.disabled) {
const incomingConnections = graph.getDirectParents(current);
const outgoingConnections = graph
.getDirectChildren(current)
// NOTE: When a node is disabled only the first output gets data
.filter((connection) => connection.outputIndex === 0);
parentConnections = [];
for (const incomingConnection of incomingConnections) {
for (const outgoingConnection of outgoingConnections) {
const newConnection = {
...incomingConnection,
to: outgoingConnection.to,
inputIndex: outgoingConnection.inputIndex,
};
parentConnections.push(newConnection);
currentBranch.pop();
currentBranch.push(newConnection);
}
}
}
// Recurse on each parent.
for (const parentConnection of parentConnections) {
findSubgraphRecursive(graph, destinationNode, parentConnection.from, trigger, newGraph, [
...currentBranch,
parentConnection,
]);
}
}
/**
* Find all nodes that can lead from the trigger to the destination node,
* ignoring disabled nodes.
*
* The algorithm is:
* Start with Destination Node
*
* 1. if the current node is the chosen trigger keep this branch
* 2. if the current node has no parents, dont keep this branch
* 3. if the current node is the destination node again, dont keep this
* branch
* 4. if the current node was already visited, keep this branch
* 5. if the current node is disabled, dont keep this node, but keep the
* branch
* - take every incoming connection and connect it to every node that is
* connected to the current nodes first output
* 6. Recurse on each parent
*/
export function findSubgraph(
graph: DirectedGraph,
destinationNode: INode,
trigger: INode,
): DirectedGraph {
const newGraph = new DirectedGraph();
findSubgraphRecursive(graph, destinationNode, destinationNode, trigger, newGraph, []);
return newGraph;
}

View file

@ -0,0 +1,61 @@
import type { INode, Workflow } from 'n8n-workflow';
import * as assert from 'assert/strict';
function findAllParentTriggers(workflow: Workflow, destinationNodeName: string) {
const parentNodes = workflow
.getParentNodes(destinationNodeName)
.map((name) => {
const node = workflow.getNode(name);
// We got the node name from `workflow.getParentNodes`. The node must
// exist.
assert.ok(node);
return {
node,
nodeType: workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion),
};
})
.filter((value) => value !== null)
.filter(({ nodeType }) => nodeType.description.group.includes('trigger'))
.map(({ node }) => node);
return parentNodes;
}
// TODO: write unit tests for this
// TODO: rewrite this using DirectedGraph instead of workflow.
export function findTriggerForPartialExecution(
workflow: Workflow,
destinationNodeName: string,
): INode | undefined {
const parentTriggers = findAllParentTriggers(workflow, destinationNodeName).filter(
(trigger) => !trigger.disabled,
);
const pinnedTriggers = parentTriggers
// TODO: add the other filters here from `findAllPinnedActivators`, see
// copy below.
.filter((trigger) => workflow.pinData?.[trigger.name])
// TODO: Make this sorting more predictable
// Put nodes which names end with 'webhook' first, while also reversing the
// order they had in the original array.
.sort((n) => (n.type.endsWith('webhook') ? -1 : 1));
if (pinnedTriggers.length) {
return pinnedTriggers[0];
} else {
return parentTriggers[0];
}
}
//function findAllPinnedActivators(workflow: Workflow, pinData?: IPinData) {
// return Object.values(workflow.nodes)
// .filter(
// (node) =>
// !node.disabled &&
// pinData?.[node.name] &&
// ['trigger', 'webhook'].some((suffix) => node.type.toLowerCase().endsWith(suffix)) &&
// node.type !== 'n8n-nodes-base.respondToWebhook',
// )
// .sort((a) => (a.type.endsWith('webhook') ? -1 : 1));
//}

View file

@ -0,0 +1,22 @@
import * as a from 'assert';
import type { INodeExecutionData, IRunData, NodeConnectionType } from 'n8n-workflow';
export function getIncomingData(
runData: IRunData,
nodeName: string,
runIndex: number,
connectionType: NodeConnectionType,
outputIndex: number,
): INodeExecutionData[] | null | undefined {
a.ok(runData[nodeName], `Can't find node with name '${nodeName}' in runData.`);
a.ok(
runData[nodeName][runIndex],
`Can't find a run for index '${runIndex}' for node name '${nodeName}'`,
);
a.ok(
runData[nodeName][runIndex].data,
`Can't find data for index '${runIndex}' for node name '${nodeName}'`,
);
return runData[nodeName][runIndex].data[connectionType][outputIndex];
}

View file

@ -0,0 +1,115 @@
import { type INode, type IPinData, type IRunData } from 'n8n-workflow';
import type { GraphConnection, DirectedGraph } from './DirectedGraph';
function sortByInputIndexThenByName(
connection1: GraphConnection,
connection2: GraphConnection,
): number {
if (connection1.inputIndex === connection2.inputIndex) {
return connection1.from.name.localeCompare(connection2.from.name);
} else {
return connection1.inputIndex - connection2.inputIndex;
}
}
/**
* Groups incoming connections to the node. The groups contain one connection
* per input, if possible, with run data or pinned data.
*
* The purpose of this is to get as many complete sets of data for executing
* nodes with multiple inputs.
*
* # Example 1:
* 1
* source1
*
* 1
* source2 node
*
* 1
* source3
*
*
* Given this workflow, and assuming all sources have run data or pinned data,
* it's possible to run `node` with the data of `source1` and `source3` and
* then one more time with the data from `source2`.
*
* It would also be possible to run `node` with the data of `source2` and
* `source3` and then one more time with the data from `source1`.
*
* To improve the determinism of this the connections are sorted by input and
* then by from-node name.
*
* So this will return 2 groups:
* 1. source1 and source3
* 2. source2
*
* # Example 2:
* 0
* source1
*
* 1
* source2 node
*
* 1
* source3
*
*
* Since `source1` has no run data and no pinned data it's skipped in favor of
* `source2` for the for input.
*
* So this will return 1 group:
* 1. source2 and source3
*/
export function getSourceDataGroups(
graph: DirectedGraph,
node: INode,
runData: IRunData,
pinnedData: IPinData,
): GraphConnection[][] {
const connections = graph.getConnections({ to: node });
const sortedConnectionsWithData = [];
for (const connection of connections) {
const hasData = runData[connection.from.name] || pinnedData[connection.from.name];
if (hasData) {
sortedConnectionsWithData.push(connection);
}
}
sortedConnectionsWithData.sort(sortByInputIndexThenByName);
const groups: GraphConnection[][] = [];
let currentGroup: GraphConnection[] = [];
let currentInputIndex = -1;
while (sortedConnectionsWithData.length > 0) {
const connectionWithDataIndex = sortedConnectionsWithData.findIndex(
// eslint-disable-next-line @typescript-eslint/no-loop-func
(c) => c.inputIndex > currentInputIndex,
);
const connection: GraphConnection | undefined =
sortedConnectionsWithData[connectionWithDataIndex];
if (connection === undefined) {
groups.push(currentGroup);
currentGroup = [];
currentInputIndex = -1;
continue;
}
currentInputIndex = connection.inputIndex;
currentGroup.push(connection);
if (connectionWithDataIndex >= 0) {
sortedConnectionsWithData.splice(connectionWithDataIndex, 1);
}
}
groups.push(currentGroup);
return groups;
}

View file

@ -0,0 +1,6 @@
export { DirectedGraph } from './DirectedGraph';
export { findTriggerForPartialExecution } from './findTriggerForPartialExecution';
export { findStartNodes } from './findStartNodes';
export { findSubgraph } from './findSubgraph';
export { findCycles } from './findCycles';
export { recreateNodeExecutionStack } from './recreateNodeExecutionStack';

View file

@ -0,0 +1,180 @@
import {
NodeConnectionType,
type IExecuteData,
type INode,
type INodeExecutionData,
type IPinData,
type IRunData,
type ISourceData,
type ITaskDataConnectionsSource,
type IWaitingForExecution,
type IWaitingForExecutionSource,
} from 'n8n-workflow';
import * as a from 'assert/strict';
import type { DirectedGraph } from './DirectedGraph';
import { getIncomingData } from './getIncomingData';
import { getSourceDataGroups } from './getSourceDataGroups';
/**
* Recreates the node execution stack, waiting executions and waiting
* execution sources from a directed graph, start nodes, the destination node,
* run and pinned data.
*
* This function aims to be able to recreate the internal state of the
* WorkflowExecute class at any point of time during an execution based on the
* data that is already available. Specifically it will recreate the
* `WorkflowExecute.runExecutionData.executionData` properties.
*
* This allows "restarting" an execution and having it only execute what's
* necessary to be able to execute the destination node accurately, e.g. as
* close as possible to what would happen in a production execution.
*/
export function recreateNodeExecutionStack(
graph: DirectedGraph,
startNodes: INode[],
destinationNode: INode,
runData: IRunData,
pinData: IPinData,
): {
nodeExecutionStack: IExecuteData[];
waitingExecution: IWaitingForExecution;
waitingExecutionSource: IWaitingForExecutionSource;
} {
// Validate invariants.
// The graph needs to be free of disabled nodes. If it's not it hasn't been
// passed through findSubgraph2.
for (const node of graph.getNodes().values()) {
a.notEqual(
node.disabled,
true,
`Graph contains disabled nodes. This is not supported. Make sure to pass the graph through "findSubgraph2" before calling "recreateNodeExecutionStack". The node in question is "${node.name}"`,
);
}
// Initialize the nodeExecutionStack and waitingExecution with
// the data from runData
const nodeExecutionStack: IExecuteData[] = [];
const waitingExecution: IWaitingForExecution = {};
const waitingExecutionSource: IWaitingForExecutionSource = {};
// TODO: Don't hard code this!
const runIndex = 0;
for (const startNode of startNodes) {
const incomingStartNodeConnections = graph
.getDirectParents(startNode)
.filter((c) => c.type === NodeConnectionType.Main);
let incomingData: INodeExecutionData[][] = [];
let incomingSourceData: ITaskDataConnectionsSource | null = null;
if (incomingStartNodeConnections.length === 0) {
incomingData.push([{ json: {} }]);
const executeData: IExecuteData = {
node: startNode,
data: { main: incomingData },
source: incomingSourceData,
};
nodeExecutionStack.push(executeData);
} else {
const sourceDataSets = getSourceDataGroups(graph, startNode, runData, pinData);
for (const sourceData of sourceDataSets) {
incomingData = [];
incomingSourceData = { main: [] };
for (const incomingConnection of sourceData) {
const node = incomingConnection.from;
if (pinData[node.name]) {
incomingData.push(pinData[node.name]);
} else {
a.ok(
runData[node.name],
`Start node(${incomingConnection.to.name}) has an incoming connection with no run or pinned data. This is not supported. The connection in question is "${node.name}->${startNode.name}". Are you sure the start nodes come from the "findStartNodes" function?`,
);
const nodeIncomingData = getIncomingData(
runData,
node.name,
runIndex,
incomingConnection.type,
incomingConnection.outputIndex,
);
if (nodeIncomingData) {
incomingData.push(nodeIncomingData);
}
}
incomingSourceData.main.push({
previousNode: incomingConnection.from.name,
previousNodeOutput: incomingConnection.outputIndex,
previousNodeRun: 0,
});
}
const executeData: IExecuteData = {
node: startNode,
data: { main: incomingData },
source: incomingSourceData,
};
nodeExecutionStack.push(executeData);
}
}
// TODO: Do we need this?
if (destinationNode) {
const destinationNodeName = destinationNode.name;
// Check if the destinationNode has to be added as waiting
// because some input data is already fully available
const incomingDestinationNodeConnections = graph
.getDirectParents(destinationNode)
.filter((c) => c.type === NodeConnectionType.Main);
if (incomingDestinationNodeConnections !== undefined) {
for (const connection of incomingDestinationNodeConnections) {
if (waitingExecution[destinationNodeName] === undefined) {
waitingExecution[destinationNodeName] = {};
waitingExecutionSource[destinationNodeName] = {};
}
if (waitingExecution[destinationNodeName][runIndex] === undefined) {
waitingExecution[destinationNodeName][runIndex] = {};
waitingExecutionSource[destinationNodeName][runIndex] = {};
}
if (waitingExecution[destinationNodeName][runIndex][connection.type] === undefined) {
waitingExecution[destinationNodeName][runIndex][connection.type] = [];
waitingExecutionSource[destinationNodeName][runIndex][connection.type] = [];
}
if (runData[connection.from.name] !== undefined) {
// Input data exists so add as waiting
// incomingDataDestination.push(runData[connection.node!][runIndex].data![connection.type][connection.index]);
waitingExecution[destinationNodeName][runIndex][connection.type].push(
runData[connection.from.name][runIndex].data![connection.type][connection.inputIndex],
);
waitingExecutionSource[destinationNodeName][runIndex][connection.type].push({
previousNode: connection.from.name,
previousNodeOutput: connection.inputIndex || undefined,
previousNodeRun: runIndex || undefined,
} as ISourceData);
} else {
waitingExecution[destinationNodeName][runIndex][connection.type].push(null);
waitingExecutionSource[destinationNodeName][runIndex][connection.type].push(null);
}
}
}
}
}
return {
nodeExecutionStack,
waitingExecution,
waitingExecutionSource,
};
}

View file

@ -49,6 +49,16 @@ import {
import get from 'lodash/get'; import get from 'lodash/get';
import * as NodeExecuteFunctions from './NodeExecuteFunctions'; import * as NodeExecuteFunctions from './NodeExecuteFunctions';
import * as assert from 'assert/strict';
import { recreateNodeExecutionStack } from './PartialExecutionUtils/recreateNodeExecutionStack';
import {
DirectedGraph,
findCycles,
findStartNodes,
findSubgraph,
findTriggerForPartialExecution,
} from './PartialExecutionUtils';
export class WorkflowExecute { export class WorkflowExecute {
private status: ExecutionStatus = 'new'; private status: ExecutionStatus = 'new';
@ -305,6 +315,82 @@ export class WorkflowExecute {
return this.processRunExecutionData(workflow); return this.processRunExecutionData(workflow);
} }
// IMPORTANT: Do not add "async" to this function, it will then convert the
// PCancelable to a regular Promise and does so not allow canceling
// active executions anymore
// eslint-disable-next-line @typescript-eslint/promise-function-async
runPartialWorkflow2(
workflow: Workflow,
runData: IRunData,
destinationNodeName?: string,
pinData?: IPinData,
): PCancelable<IRun> {
// TODO: Refactor the call-site to make `destinationNodeName` a required
// after removing the old partial execution flow.
assert.ok(
destinationNodeName,
'a destinationNodeName is required for the new partial execution flow',
);
const destinationNode = workflow.getNode(destinationNodeName);
assert.ok(
destinationNode,
`Could not find a node with the name ${destinationNodeName} in the workflow.`,
);
// 1. Find the Trigger
const trigger = findTriggerForPartialExecution(workflow, destinationNodeName);
if (trigger === undefined) {
throw new ApplicationError(
'The destination node is not connected to any trigger. Partial executions need a trigger.',
);
}
// 2. Find the Subgraph
const subgraph = findSubgraph(DirectedGraph.fromWorkflow(workflow), destinationNode, trigger);
const filteredNodes = subgraph.getNodes();
// 3. Find the Start Nodes
const startNodes = findStartNodes(subgraph, trigger, destinationNode, runData);
// 4. Detect Cycles
const cycles = findCycles(workflow);
// 5. Handle Cycles
if (cycles.length) {
// TODO: handle
}
// 6. Clean Run Data
// TODO:
// 7. Recreate Execution Stack
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
recreateNodeExecutionStack(subgraph, startNodes, destinationNode, runData, pinData ?? {});
// 8. Execute
this.status = 'running';
this.runExecutionData = {
startData: {
destinationNode: destinationNodeName,
runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name),
},
resultData: {
runData,
pinData,
},
executionData: {
contextData: {},
nodeExecutionStack,
metadata: {},
waitingExecution,
waitingExecutionSource,
},
};
return this.processRunExecutionData(subgraph.toWorkflow({ ...workflow }));
}
/** /**
* Executes the hook with the given name * Executes the hook with the given name
* *

View file

@ -34,6 +34,7 @@ import { useI18n } from '@/composables/useI18n';
import { get } from 'lodash-es'; import { get } from 'lodash-es';
import { useExecutionsStore } from '@/stores/executions.store'; import { useExecutionsStore } from '@/stores/executions.store';
import type { PushPayload } from '@n8n/api-types'; import type { PushPayload } from '@n8n/api-types';
import { useLocalStorage } from '@vueuse/core';
export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof useRouter> }) { export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof useRouter> }) {
const nodeHelpers = useNodeHelpers(); const nodeHelpers = useNodeHelpers();
@ -213,9 +214,15 @@ export function useRunWorkflow(useRunWorkflowOpts: { router: ReturnType<typeof u
}; };
}); });
// -1 means the backend chooses the default
// 0 is the old flow
// 1 is the new flow
const partialExecutionVersion = useLocalStorage('PartialExecution.version', -1);
const startRunData: IStartRunData = { const startRunData: IStartRunData = {
workflowData, workflowData,
runData: newRunData, // With the new partial execution version the backend decides what run
// data to use and what to ignore.
runData: partialExecutionVersion.value === 1 ? (runData ?? undefined) : newRunData,
startNodes, startNodes,
}; };
if ('destinationNode' in options) { if ('destinationNode' in options) {

View file

@ -72,6 +72,7 @@ import { computed, ref } from 'vue';
import { useProjectsStore } from '@/stores/projects.store'; import { useProjectsStore } from '@/stores/projects.store';
import type { ProjectSharingData } from '@/types/projects.types'; import type { ProjectSharingData } from '@/types/projects.types';
import type { PushPayload } from '@n8n/api-types'; import type { PushPayload } from '@n8n/api-types';
import { useLocalStorage } from '@vueuse/core';
const defaults: Omit<IWorkflowDb, 'id'> & { settings: NonNullable<IWorkflowDb['settings']> } = { const defaults: Omit<IWorkflowDb, 'id'> & { settings: NonNullable<IWorkflowDb['settings']> } = {
name: '', name: '',
@ -99,6 +100,10 @@ let cachedWorkflow: Workflow | null = null;
export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => { export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
const uiStore = useUIStore(); const uiStore = useUIStore();
// -1 means the backend chooses the default
// 0 is the old flow
// 1 is the new flow
const partialExecutionVersion = useLocalStorage('PartialExecution.version', -1);
const workflow = ref<IWorkflowDb>(createEmptyWorkflow()); const workflow = ref<IWorkflowDb>(createEmptyWorkflow());
const usedCredentials = ref<Record<string, IUsedCredential>>({}); const usedCredentials = ref<Record<string, IUsedCredential>>({});
@ -1390,7 +1395,7 @@ export const useWorkflowsStore = defineStore(STORES.WORKFLOWS, () => {
return await makeRestApiRequest( return await makeRestApiRequest(
rootStore.restApiContext, rootStore.restApiContext,
'POST', 'POST',
`/workflows/${startRunData.workflowData.id}/run`, `/workflows/${startRunData.workflowData.id}/run?partialExecutionVersion=${partialExecutionVersion.value}`,
startRunData as unknown as IDataObject, startRunData as unknown as IDataObject,
); );
} catch (error) { } catch (error) {

View file

@ -2150,6 +2150,10 @@ export interface IWorkflowExecutionDataProcess {
destinationNode?: string; destinationNode?: string;
restartExecutionId?: string; restartExecutionId?: string;
executionMode: WorkflowExecuteMode; executionMode: WorkflowExecuteMode;
/**
* The data that is sent in the body of the webhook that started this
* execution.
*/
executionData?: IRunExecutionData; executionData?: IRunExecutionData;
runData?: IRunData; runData?: IRunData;
pinData?: IPinData; pinData?: IPinData;
@ -2159,6 +2163,15 @@ export interface IWorkflowExecutionDataProcess {
workflowData: IWorkflowBase; workflowData: IWorkflowBase;
userId?: string; userId?: string;
projectId?: string; projectId?: string;
/**
* Defines which version of the partial execution flow is used.
* Possible values are:
* 0 - use the old flow
* 1 - use the new flow
* -1 - the backend chooses which flow based on the environment variable
* PARTIAL_EXECUTION_VERSION_DEFAULT
*/
partialExecutionVersion?: string;
} }
export interface ExecuteWorkflowOptions { export interface ExecuteWorkflowOptions {