1
0
Fork 0
mirror of https://github.com/n8n-io/n8n.git synced 2025-02-02 07:01:30 -08:00

fix(core): Execute nodes after loops correctly with the new partial execution flow ()

This commit is contained in:
Danny Martini 2024-12-09 08:59:02 +01:00 committed by GitHub
parent efafeed334
commit 891dd7f995
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 224 additions and 23 deletions

View file

@ -442,4 +442,126 @@ describe('findStartNodes', () => {
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(node2);
});
describe('custom loop logic', () => {
test('if the last run of loop node has no data (null) on the done output, then the loop is the start node', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const loop = createNodeData({ name: 'loop', type: 'n8n-nodes-base.splitInBatches' });
const inLoop = createNodeData({ name: 'inLoop' });
const afterLoop = createNodeData({ name: 'afterLoop' });
const graph = new DirectedGraph()
.addNodes(trigger, loop, inLoop, afterLoop)
.addConnections(
{ from: trigger, to: loop },
{ from: loop, outputIndex: 1, to: inLoop },
{ from: inLoop, to: loop },
{ from: loop, to: afterLoop },
);
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { name: 'trigger' } }])],
[loop.name]: [
// only output on the `loop` branch, but no output on the `done`
// branch
toITaskData([{ outputIndex: 1, data: { name: 'loop' } }]),
],
[inLoop.name]: [toITaskData([{ data: { name: 'inLoop' } }])],
};
// ACT
const startNodes = findStartNodes({
graph,
trigger,
destination: afterLoop,
runData,
pinData: {},
});
// ASSERT
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(loop);
});
test('if the last run of loop node has no data (empty array) on the done output, then the loop is the start node', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const loop = createNodeData({ name: 'loop', type: 'n8n-nodes-base.splitInBatches' });
const inLoop = createNodeData({ name: 'inLoop' });
const afterLoop = createNodeData({ name: 'afterLoop' });
const graph = new DirectedGraph()
.addNodes(trigger, loop, inLoop, afterLoop)
.addConnections(
{ from: trigger, to: loop },
{ from: loop, outputIndex: 1, to: inLoop },
{ from: inLoop, to: loop },
{ from: loop, to: afterLoop },
);
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { name: 'trigger' } }])],
[loop.name]: [
// This is handcrafted because `toITaskData` does not allow inserting
// an empty array like the first element of `main` below. But the
// execution engine creates ITaskData like this.
{
executionStatus: 'success',
executionTime: 0,
startTime: 0,
source: [],
data: { main: [[], [{ json: { name: 'loop' } }]] },
},
],
[inLoop.name]: [toITaskData([{ data: { name: 'inLoop' } }])],
};
// ACT
const startNodes = findStartNodes({
graph,
trigger,
destination: afterLoop,
runData,
pinData: {},
});
// ASSERT
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(loop);
});
test('if the loop has data on the done output in the last run it does not become a start node', () => {
// ARRANGE
const trigger = createNodeData({ name: 'trigger' });
const loop = createNodeData({ name: 'loop', type: 'n8n-nodes-base.splitInBatches' });
const inLoop = createNodeData({ name: 'inLoop' });
const afterLoop = createNodeData({ name: 'afterLoop' });
const graph = new DirectedGraph()
.addNodes(trigger, loop, inLoop, afterLoop)
.addConnections(
{ from: trigger, to: loop },
{ from: loop, outputIndex: 1, to: inLoop },
{ from: inLoop, to: loop },
{ from: loop, to: afterLoop },
);
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { name: 'trigger' } }])],
[loop.name]: [
toITaskData([{ outputIndex: 1, data: { name: 'loop' } }]),
toITaskData([{ outputIndex: 0, data: { name: 'done' } }]),
],
[inLoop.name]: [toITaskData([{ data: { name: 'inLoop' } }])],
};
// ACT
const startNodes = findStartNodes({
graph,
trigger,
destination: afterLoop,
runData,
pinData: {},
});
// ASSERT
expect(startNodes.size).toBe(1);
expect(startNodes).toContainEqual(afterLoop);
});
});
});

View file

@ -5,7 +5,7 @@ interface StubNode {
name: string;
parameters?: INodeParameters;
disabled?: boolean;
type?: string;
type?: 'n8n-nodes-base.manualTrigger' | 'n8n-nodes-base.splitInBatches' | (string & {});
}
export function createNodeData(stubData: StubNode): INode {

View file

@ -1,7 +1,7 @@
import type { INode, IPinData, IRunData } from 'n8n-workflow';
import { NodeConnectionType, type INode, type IPinData, type IRunData } from 'n8n-workflow';
import type { DirectedGraph } from './DirectedGraph';
import { getIncomingData } from './getIncomingData';
import { getIncomingData, getIncomingDataFromAnyRun } from './getIncomingData';
/**
* A node is dirty if either of the following is true:
@ -73,6 +73,25 @@ function findStartNodesRecursive(
return startNodes;
}
// If the current node is a loop node, check if the `done` output has data on
// the last run. If it doesn't the loop wasn't fully executed and needs to be
// re-run from the start. Thus the loop node become the start node.
if (current.type === 'n8n-nodes-base.splitInBatches') {
const nodeRunData = getIncomingData(
runData,
current.name,
// last run
-1,
NodeConnectionType.Main,
0,
);
if (nodeRunData === null || nodeRunData.length === 0) {
startNodes.add(current);
return startNodes;
}
}
// If we detect a cycle stop following the branch, there is no start node on
// this branch.
if (seen.has(current)) {
@ -82,19 +101,16 @@ function findStartNodesRecursive(
// Recurse with every direct child that is part of the sub graph.
const outGoingConnections = graph.getDirectChildConnections(current);
for (const outGoingConnection of outGoingConnections) {
const nodeRunData = getIncomingData(
const nodeRunData = getIncomingDataFromAnyRun(
runData,
outGoingConnection.from.name,
// NOTE: It's always 0 until I fix the bug that removes the run data for
// old runs. The FE only sends data for one run for each node.
0,
outGoingConnection.type,
outGoingConnection.outputIndex,
);
// If the node has multiple outputs, only follow the outputs that have run data.
const hasNoRunData =
nodeRunData === null || nodeRunData === undefined || nodeRunData.length === 0;
nodeRunData === null || nodeRunData === undefined || nodeRunData.data.length === 0;
if (hasNoRunData) {
continue;
}

View file

@ -1,4 +1,3 @@
import * as a from 'assert';
import type { INodeExecutionData, IRunData, NodeConnectionType } from 'n8n-workflow';
export function getIncomingData(
@ -7,18 +6,8 @@ export function getIncomingData(
runIndex: number,
connectionType: NodeConnectionType,
outputIndex: number,
): INodeExecutionData[] | null | undefined {
a.ok(runData[nodeName], `Can't find node with name '${nodeName}' in runData.`);
a.ok(
runData[nodeName][runIndex],
`Can't find a run for index '${runIndex}' for node name '${nodeName}'`,
);
a.ok(
runData[nodeName][runIndex].data,
`Can't find data for index '${runIndex}' for node name '${nodeName}'`,
);
return runData[nodeName][runIndex].data[connectionType][outputIndex];
): INodeExecutionData[] | null {
return runData[nodeName]?.at(runIndex)?.data?.[connectionType].at(outputIndex) ?? null;
}
function getRunIndexLength(runData: IRunData, nodeName: string) {

View file

@ -367,7 +367,7 @@ export class WorkflowExecute {
startNodes = handleCycles(graph, startNodes, trigger);
// 6. Clean Run Data
const newRunData: IRunData = cleanRunData(runData, graph, startNodes);
runData = cleanRunData(runData, graph, startNodes);
// 7. Recreate Execution Stack
const { nodeExecutionStack, waitingExecution, waitingExecutionSource } =
@ -381,7 +381,7 @@ export class WorkflowExecute {
runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name),
},
resultData: {
runData: newRunData,
runData,
pinData,
},
executionData: {

View file

@ -9,6 +9,7 @@
// XX denotes that the node is disabled
// PD denotes that the node has pinned data
import { pick } from 'lodash';
import type { IPinData, IRun, IRunData, WorkflowTestData } from 'n8n-workflow';
import {
ApplicationError,
@ -18,6 +19,7 @@ import {
} from 'n8n-workflow';
import { DirectedGraph } from '@/PartialExecutionUtils';
import * as partialExecutionUtils from '@/PartialExecutionUtils';
import { createNodeData, toITaskData } from '@/PartialExecutionUtils/__tests__/helpers';
import { WorkflowExecute } from '@/WorkflowExecute';
@ -324,5 +326,72 @@ describe('WorkflowExecute', () => {
expect(nodes).toContain(node2.name);
expect(nodes).not.toContain(node1.name);
});
// ►►
// ┌────┐0 ┌─────────┐
//┌───────┐1 │ ├──────►afterLoop│
//│trigger├───┬──►loop│1 └─────────┘
//└───────┘ │ │ ├─┐
// │ └────┘ │
// │ │ ┌──────┐1
// │ └─►inLoop├─┐
// │ └──────┘ │
// └────────────────────┘
test('passes filtered run data to `recreateNodeExecutionStack`', async () => {
// ARRANGE
const waitPromise = createDeferredPromise<IRun>();
const nodeExecutionOrder: string[] = [];
const additionalData = Helpers.WorkflowExecuteAdditionalData(waitPromise, nodeExecutionOrder);
const workflowExecute = new WorkflowExecute(additionalData, 'manual');
const trigger = createNodeData({ name: 'trigger', type: 'n8n-nodes-base.manualTrigger' });
const loop = createNodeData({ name: 'loop', type: 'n8n-nodes-base.splitInBatches' });
const inLoop = createNodeData({ name: 'inLoop' });
const afterLoop = createNodeData({ name: 'afterLoop' });
const workflow = new DirectedGraph()
.addNodes(trigger, loop, inLoop, afterLoop)
.addConnections(
{ from: trigger, to: loop },
{ from: loop, to: afterLoop },
{ from: loop, to: inLoop, outputIndex: 1 },
{ from: inLoop, to: loop },
)
.toWorkflow({ name: '', active: false, nodeTypes });
const pinData: IPinData = {};
const runData: IRunData = {
[trigger.name]: [toITaskData([{ data: { value: 1 } }])],
[loop.name]: [toITaskData([{ data: { nodeName: loop.name }, outputIndex: 1 }])],
[inLoop.name]: [toITaskData([{ data: { nodeName: inLoop.name } }])],
};
const dirtyNodeNames: string[] = [];
jest.spyOn(workflowExecute, 'processRunExecutionData').mockImplementationOnce(jest.fn());
const recreateNodeExecutionStackSpy = jest.spyOn(
partialExecutionUtils,
'recreateNodeExecutionStack',
);
// ACT
await workflowExecute.runPartialWorkflow2(
workflow,
runData,
pinData,
dirtyNodeNames,
afterLoop.name,
);
// ASSERT
expect(recreateNodeExecutionStackSpy).toHaveBeenNthCalledWith(
1,
expect.any(DirectedGraph),
expect.any(Set),
// The run data should only contain the trigger node because the loop
// node has no data on the done branch. That means we have to rerun the
// whole loop, because we don't know how many iterations would be left.
pick(runData, trigger.name),
expect.any(Object),
);
});
});
});

View file

@ -11,6 +11,7 @@ import { ManualTrigger } from '../../../nodes-base/dist/nodes/ManualTrigger/Manu
import { Merge } from '../../../nodes-base/dist/nodes/Merge/Merge.node';
import { NoOp } from '../../../nodes-base/dist/nodes/NoOp/NoOp.node';
import { Set } from '../../../nodes-base/dist/nodes/Set/Set.node';
import { SplitInBatches } from '../../../nodes-base/dist/nodes/SplitInBatches/SplitInBatches.node';
import { Start } from '../../../nodes-base/dist/nodes/Start/Start.node';
export const predefinedNodesTypes: INodeTypeData = {
@ -38,6 +39,10 @@ export const predefinedNodesTypes: INodeTypeData = {
type: new ManualTrigger(),
sourcePath: '',
},
'n8n-nodes-base.splitInBatches': {
type: new SplitInBatches(),
sourcePath: '',
},
'n8n-nodes-base.versionTest': {
sourcePath: '',
type: {