fix(core): Fix pairedItem issue with partial manual executions (#8575)

Co-authored-by: Danny Martini <danny@n8n.io>
Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
Jan Oberhauser 2024-02-23 11:43:08 +01:00 committed by GitHub
parent 0882dc0ce9
commit a29b41ec55
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 250 additions and 27 deletions

View file

@ -569,4 +569,27 @@ describe('Execution', () => {
expect(interception.request.body.runData).to.include.all.keys(expectedRunDataKeys); expect(interception.request.body.runData).to.include.all.keys(expectedRunDataKeys);
}); });
}); });
it('should successfully execute partial executions with nodes attached to the second output', () => {
cy.createFixtureWorkflow(
'Test_Workflow_pairedItem_incomplete_manual_bug.json',
'My test workflow',
);
cy.intercept('POST', '/rest/workflows/run').as('workflowRun');
workflowPage.getters.zoomToFitButton().click();
workflowPage.getters.executeWorkflowButton().click();
workflowPage.getters
.canvasNodeByName('Test Expression')
.findChildByTestId('execute-node-button')
.click({ force: true });
// Check toast (works because Cypress waits enough for the element to show after the http request node has finished)
// Wait for the execution to return.
cy.wait('@workflowRun');
// Wait again for the websocket message to arrive and the UI to update.
cy.wait(100);
workflowPage.getters.errorToast({ timeout: 1 }).should('not.exist');
});
}); });

View file

@ -0,0 +1,160 @@
{
"name": "Test Workflow pairedItem incomplete manual bug",
"nodes": [
{
"parameters": {},
"id": "f26332f3-c61a-4843-94bd-64a73ad161ff",
"name": "When clicking \"Test workflow\"",
"type": "n8n-nodes-base.manualTrigger",
"typeVersion": 1,
"position": [
860,
340
]
},
{
"parameters": {
"assignments": {
"assignments": [
{
"id": "bd522794-d056-48b8-9204-26f7d68288d9",
"name": "test",
"value": "a",
"type": "string"
}
]
},
"options": {}
},
"id": "fae0c907-e2bf-4ecf-82be-f9caa209f925",
"name": "Init Data",
"type": "n8n-nodes-base.set",
"typeVersion": 3.3,
"position": [
1080,
340
]
},
{
"parameters": {
"conditions": {
"options": {
"caseSensitive": true,
"leftValue": "",
"typeValidation": "strict"
},
"conditions": [
{
"id": "8db21b4b-1675-4e63-b092-7fcc45a86547",
"leftValue": "={{ $json.test }}",
"rightValue": "b",
"operator": {
"type": "string",
"operation": "equals",
"name": "filter.operator.equals"
}
}
],
"combinator": "and"
},
"options": {}
},
"id": "f7990edd-2c0f-42e6-b3ce-74c7df02b6a4",
"name": "If",
"type": "n8n-nodes-base.if",
"typeVersion": 2,
"position": [
1300,
340
]
},
{
"parameters": {},
"id": "850d48f5-0689-4cab-b30c-30e179577c82",
"name": "NoOp1",
"type": "n8n-nodes-base.noOp",
"typeVersion": 1,
"position": [
1540,
200
]
},
{
"parameters": {
"assignments": {
"assignments": [
{
"id": "bd522794-d056-48b8-9204-26f7d68288d9",
"name": "test2",
"value": "={{ $('Init Data').item.json.test }}",
"type": "string"
}
]
},
"options": {}
},
"id": "91d93c3a-a557-465e-812b-266d6277b279",
"name": "Test Expression",
"type": "n8n-nodes-base.set",
"typeVersion": 3.3,
"position": [
1540,
440
]
}
],
"pinData": {},
"connections": {
"When clicking \"Test workflow\"": {
"main": [
[
{
"node": "Init Data",
"type": "main",
"index": 0
}
]
]
},
"Init Data": {
"main": [
[
{
"node": "If",
"type": "main",
"index": 0
}
]
]
},
"If": {
"main": [
[
{
"node": "NoOp1",
"type": "main",
"index": 0
}
],
[
{
"node": "Test Expression",
"type": "main",
"index": 0
}
]
]
}
},
"active": false,
"settings": {
"executionOrder": "v1"
},
"versionId": "765a6d9b-d667-4a59-9bd7-b0bc2627b008",
"meta": {
"templateCredsSetupCompleted": true,
"instanceId": "021d3c82ba2d3bc090cbf4fc81c9312668bcc34297e022bb3438c5c88a43a5ff"
},
"id": "qnGQYw8TD58xs214",
"tags": []
}

View file

@ -3,6 +3,8 @@ import { BasePage } from './base';
import { getVisibleSelect } from '../utils'; import { getVisibleSelect } from '../utils';
import { NodeCreator } from './features/node-creator'; import { NodeCreator } from './features/node-creator';
type CyGetOptions = Parameters<(typeof cy)['get']>[1];
const nodeCreator = new NodeCreator(); const nodeCreator = new NodeCreator();
export class WorkflowPage extends BasePage { export class WorkflowPage extends BasePage {
url = '/workflow/new'; url = '/workflow/new';
@ -48,7 +50,8 @@ export class WorkflowPage extends BasePage {
}, },
successToast: () => cy.get('.el-notification:has(.el-notification--success)'), successToast: () => cy.get('.el-notification:has(.el-notification--success)'),
warningToast: () => cy.get('.el-notification:has(.el-notification--warning)'), warningToast: () => cy.get('.el-notification:has(.el-notification--warning)'),
errorToast: () => cy.get('.el-notification:has(.el-notification--error)'), errorToast: (options?: CyGetOptions) =>
cy.get('.el-notification:has(.el-notification--error)', options),
infoToast: () => cy.get('.el-notification:has(.el-notification--info)'), infoToast: () => cy.get('.el-notification:has(.el-notification--info)'),
activatorSwitch: () => cy.getByTestId('workflow-activate-switch'), activatorSwitch: () => cy.getByTestId('workflow-activate-switch'),
workflowMenu: () => cy.getByTestId('workflow-menu'), workflowMenu: () => cy.getByTestId('workflow-menu'),

View file

@ -23,6 +23,7 @@ import type {
INodeProperties, INodeProperties,
IUserSettings, IUserSettings,
IHttpRequestMethods, IHttpRequestMethods,
StartNodeData,
} from 'n8n-workflow'; } from 'n8n-workflow';
import type { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; import type { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
@ -532,7 +533,7 @@ export interface IWorkflowExecutionDataProcess {
pinData?: IPinData; pinData?: IPinData;
retryOf?: string; retryOf?: string;
sessionId?: string; sessionId?: string;
startNodes?: string[]; startNodes?: StartNodeData[];
workflowData: IWorkflowBase; workflowData: IWorkflowBase;
userId: string; userId: string;
} }

View file

@ -227,9 +227,9 @@ export function getExecutionStartNode(data: IWorkflowExecutionDataProcess, workf
let startNode; let startNode;
if ( if (
data.startNodes?.length === 1 && data.startNodes?.length === 1 &&
Object.keys(data.pinData ?? {}).includes(data.startNodes[0]) Object.keys(data.pinData ?? {}).includes(data.startNodes[0].name)
) { ) {
startNode = workflow.getNode(data.startNodes[0]) ?? undefined; startNode = workflow.getNode(data.startNodes[0].name) ?? undefined;
} }
return startNode; return startNode;

View file

@ -101,7 +101,7 @@ export class Execute extends BaseCommand {
const user = await Container.get(OwnershipService).getInstanceOwner(); const user = await Container.get(OwnershipService).getInstanceOwner();
const runData: IWorkflowExecutionDataProcess = { const runData: IWorkflowExecutionDataProcess = {
executionMode: 'cli', executionMode: 'cli',
startNodes: [startingNode.name], startNodes: [{ name: startingNode.name, sourceData: null }],
workflowData, workflowData,
userId: user.id, userId: user.id,
}; };

View file

@ -620,7 +620,7 @@ export class ExecuteBatch extends BaseCommand {
const runData: IWorkflowExecutionDataProcess = { const runData: IWorkflowExecutionDataProcess = {
executionMode: 'cli', executionMode: 'cli',
startNodes: [startingNode.name], startNodes: [{ name: startingNode.name, sourceData: null }],
workflowData, workflowData,
userId: ExecuteBatch.instanceOwner.id, userId: ExecuteBatch.instanceOwner.id,
}; };

View file

@ -1,6 +1,13 @@
import type { IWorkflowDb } from '@/Interfaces'; import type { IWorkflowDb } from '@/Interfaces';
import type { AuthenticatedRequest } from '@/requests'; import type { AuthenticatedRequest } from '@/requests';
import type { INode, IConnections, IWorkflowSettings, IRunData, IPinData } from 'n8n-workflow'; import type {
INode,
IConnections,
IWorkflowSettings,
IRunData,
IPinData,
StartNodeData,
} from 'n8n-workflow';
export declare namespace WorkflowRequest { export declare namespace WorkflowRequest {
type CreateUpdatePayload = Partial<{ type CreateUpdatePayload = Partial<{
@ -19,7 +26,7 @@ export declare namespace WorkflowRequest {
workflowData: IWorkflowDb; workflowData: IWorkflowDb;
runData: IRunData; runData: IRunData;
pinData: IPinData; pinData: IPinData;
startNodes?: string[]; startNodes?: StartNodeData[];
destinationNode?: string; destinationNode?: string;
}; };

View file

@ -101,7 +101,11 @@ export class WorkflowExecutionService {
user: User, user: User,
sessionId?: string, sessionId?: string,
) { ) {
const pinnedTrigger = this.selectPinnedActivatorStarter(workflowData, startNodes, pinData); const pinnedTrigger = this.selectPinnedActivatorStarter(
workflowData,
startNodes?.map((nodeData) => nodeData.name),
pinData,
);
// If webhooks nodes exist and are active we have to wait for till we receive a call // If webhooks nodes exist and are active we have to wait for till we receive a call
if ( if (
@ -143,7 +147,7 @@ export class WorkflowExecutionService {
const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name]; const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name];
if (pinnedTrigger && !hasRunData(pinnedTrigger)) { if (pinnedTrigger && !hasRunData(pinnedTrigger)) {
data.startNodes = [pinnedTrigger.name]; data.startNodes = [{ name: pinnedTrigger.name, sourceData: null }];
} }
const executionId = await this.workflowRunner.run(data); const executionId = await this.workflowRunner.run(data);

View file

@ -25,7 +25,7 @@ describe('WorkflowHelpers', () => {
node1: {}, node1: {},
node2: {}, node2: {},
}, },
startNodes: ['node2'], startNodes: [{ name: 'node2' }],
} as unknown as IWorkflowExecutionDataProcess; } as unknown as IWorkflowExecutionDataProcess;
const workflow = { const workflow = {
getNode(nodeName: string) { getNode(nodeName: string) {

View file

@ -33,6 +33,7 @@ import type {
IWorkflowExecuteAdditionalData, IWorkflowExecuteAdditionalData,
WorkflowExecuteMode, WorkflowExecuteMode,
CloseFunction, CloseFunction,
StartNodeData,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { import {
LoggerProxy as Logger, LoggerProxy as Logger,
@ -157,7 +158,7 @@ export class WorkflowExecute {
runPartialWorkflow( runPartialWorkflow(
workflow: Workflow, workflow: Workflow,
runData: IRunData, runData: IRunData,
startNodes: string[], startNodes: StartNodeData[],
destinationNode?: string, destinationNode?: string,
pinData?: IPinData, pinData?: IPinData,
): PCancelable<IRun> { ): PCancelable<IRun> {
@ -175,7 +176,7 @@ export class WorkflowExecute {
const waitingExecution: IWaitingForExecution = {}; const waitingExecution: IWaitingForExecution = {};
const waitingExecutionSource: IWaitingForExecutionSource = {}; const waitingExecutionSource: IWaitingForExecutionSource = {};
for (const startNode of startNodes) { for (const startNode of startNodes) {
incomingNodeConnections = workflow.connectionsByDestinationNode[startNode]; incomingNodeConnections = workflow.connectionsByDestinationNode[startNode.name];
const incomingData: INodeExecutionData[][] = []; const incomingData: INodeExecutionData[][] = [];
let incomingSourceData: ITaskDataConnectionsSource | null = null; let incomingSourceData: ITaskDataConnectionsSource | null = null;
@ -210,15 +211,13 @@ export class WorkflowExecute {
} }
} }
incomingSourceData.main.push({ incomingSourceData.main.push(startNode.sourceData);
previousNode: connection.node,
});
} }
} }
} }
const executeData: IExecuteData = { const executeData: IExecuteData = {
node: workflow.getNode(startNode) as INode, node: workflow.getNode(startNode.name) as INode,
data: { data: {
main: incomingData, main: incomingData,
}, },

View file

@ -47,6 +47,7 @@ import {
type INodeProperties, type INodeProperties,
type NodeConnectionType, type NodeConnectionType,
type INodeCredentialsDetails, type INodeCredentialsDetails,
type StartNodeData,
} from 'n8n-workflow'; } from 'n8n-workflow';
import type { BulkCommand, Undoable } from '@/models/history'; import type { BulkCommand, Undoable } from '@/models/history';
import type { PartialBy, TupleToUnion } from '@/utils/typeHelpers'; import type { PartialBy, TupleToUnion } from '@/utils/typeHelpers';
@ -188,7 +189,7 @@ export interface IAiData {
export interface IStartRunData { export interface IStartRunData {
workflowData: IWorkflowData; workflowData: IWorkflowData;
startNodes?: string[]; startNodes?: StartNodeData[];
destinationNode?: string; destinationNode?: string;
runData?: IRunData; runData?: IRunData;
pinData?: IPinData; pinData?: IPinData;

View file

@ -1,6 +1,7 @@
import type { IExecutionPushResponse, IExecutionResponse, IStartRunData } from '@/Interface'; import type { IExecutionPushResponse, IExecutionResponse, IStartRunData } from '@/Interface';
import { mapStores } from 'pinia'; import { mapStores } from 'pinia';
import { defineComponent } from 'vue'; import { defineComponent } from 'vue';
import { get } from 'lodash-es';
import type { import type {
IDataObject, IDataObject,
@ -10,6 +11,7 @@ import type {
IPinData, IPinData,
IWorkflowBase, IWorkflowBase,
Workflow, Workflow,
StartNodeData,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { import {
NodeHelpers, NodeHelpers,
@ -37,8 +39,8 @@ export const consolidateRunDataAndStartNodes = (
runData: IRunData | null, runData: IRunData | null,
pinData: IPinData | undefined, pinData: IPinData | undefined,
workflow: Workflow, workflow: Workflow,
): { runData: IRunData | undefined; startNodes: string[] } => { ): { runData: IRunData | undefined; startNodeNames: string[] } => {
const startNodes: string[] = []; const startNodeNames: string[] = [];
let newRunData: IRunData | undefined; let newRunData: IRunData | undefined;
if (runData !== null && Object.keys(runData).length !== 0) { if (runData !== null && Object.keys(runData).length !== 0) {
@ -59,7 +61,7 @@ export const consolidateRunDataAndStartNodes = (
// When we hit a node which has no data we stop and set it // When we hit a node which has no data we stop and set it
// as a start node the execution from and then go on with other // as a start node the execution from and then go on with other
// direct input nodes // direct input nodes
startNodes.push(parentNode); startNodeNames.push(parentNode);
break; break;
} }
if (runData[parentNode]) { if (runData[parentNode]) {
@ -75,7 +77,7 @@ export const consolidateRunDataAndStartNodes = (
} }
} }
return { runData: newRunData, startNodes }; return { runData: newRunData, startNodeNames };
}; };
export const workflowRun = defineComponent({ export const workflowRun = defineComponent({
@ -243,18 +245,18 @@ export const workflowRun = defineComponent({
workflow, workflow,
); );
const { startNodes } = consolidatedData; const { startNodeNames } = consolidatedData;
let { runData: newRunData } = consolidatedData; let { runData: newRunData } = consolidatedData;
let executedNode: string | undefined; let executedNode: string | undefined;
if ( if (
startNodes.length === 0 && startNodeNames.length === 0 &&
'destinationNode' in options && 'destinationNode' in options &&
options.destinationNode !== undefined options.destinationNode !== undefined
) { ) {
executedNode = options.destinationNode; executedNode = options.destinationNode;
startNodes.push(options.destinationNode); startNodeNames.push(options.destinationNode);
} else if ('triggerNode' in options && 'nodeData' in options) { } else if ('triggerNode' in options && 'nodeData' in options) {
startNodes.push( startNodeNames.push(
...workflow.getChildNodes(options.triggerNode, NodeConnectionType.Main, 1), ...workflow.getChildNodes(options.triggerNode, NodeConnectionType.Main, 1),
); );
newRunData = { newRunData = {
@ -263,6 +265,25 @@ export const workflowRun = defineComponent({
executedNode = options.triggerNode; executedNode = options.triggerNode;
} }
const startNodes: StartNodeData[] = startNodeNames.map((name) => {
// Find for each start node the source data
let sourceData = get(runData, [name, 0, 'source', 0], null);
if (sourceData === null) {
const parentNodes = workflow.getParentNodes(name, NodeConnectionType.Main, 1);
const executeData = this.workflowHelpers.executeData(
parentNodes,
name,
NodeConnectionType.Main,
0,
);
sourceData = get(executeData, ['source', NodeConnectionType.Main, 0], null);
}
return {
name,
sourceData,
};
});
const startRunData: IStartRunData = { const startRunData: IStartRunData = {
workflowData, workflowData,
runData: newRunData, runData: newRunData,
@ -288,7 +309,6 @@ export const workflowRun = defineComponent({
resultData: { resultData: {
runData: newRunData || {}, runData: newRunData || {},
pinData: workflowData.pinData, pinData: workflowData.pinData,
startNodes,
workflowData, workflowData,
}, },
} as IRunExecutionData, } as IRunExecutionData,

View file

@ -1943,6 +1943,11 @@ export interface ISourceData {
previousNodeRun?: number; // If undefined "0" gets used previousNodeRun?: number; // If undefined "0" gets used
} }
export interface StartNodeData {
name: string;
sourceData: ISourceData | null;
}
// The data for all the different kind of connections (like main) and all the indexes // The data for all the different kind of connections (like main) and all the indexes
export interface ITaskDataConnections { export interface ITaskDataConnections {
// Key for each input type and because there can be multiple inputs of the same type it is an array // Key for each input type and because there can be multiple inputs of the same type it is an array