From 11e8520b70fcccb5c3a13fe39a795bb7e28c3366 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Mon, 30 Dec 2024 16:28:46 +0100 Subject: [PATCH] refactor(core): Add more workflow engine tests (no-changelog) (#12385) Co-authored-by: Danny Martini --- packages/core/src/WorkflowExecute.ts | 420 +++++++----- packages/core/test/WorkflowExecute.test.ts | 753 +++++++++++++++++++++ 2 files changed, 995 insertions(+), 178 deletions(-) diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index f340c8da67..ebfad1e204 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -417,6 +417,17 @@ export class WorkflowExecute { return await this.additionalData.hooks.executeHookFunctions(hookName, parameters); } + /** + * Merges temporary execution metadata into the final runData structure. + * During workflow execution, metadata is collected in a temporary location + * (executionData.metadata). This method moves that metadata to its final + * location in the resultData.runData for each node. + * + * @remarks + * - Metadata from multiple runs is preserved using run indices + * - Existing metadata in runData is preserved and merged with new metadata + * - If no metadata exists, the operation is a no-op + */ moveNodeMetadata(): void { const metadata = get(this.runExecutionData, 'executionData.metadata'); @@ -437,14 +448,27 @@ export class WorkflowExecute { } /** - * Checks the incoming connection does not receive any data + * Checks if all incoming connections to a node are empty (have no data). + * This is used to determine if a node should be executed or skipped. + * + * @param runData - The execution data from all nodes in the workflow + * @param inputConnections - Array of connections to check + * @param runIndex - Index of the current execution run (nodes can execute multiple times) + * + * @returns `true` if all connections are empty (no data), `false` if any connection has data + * + * @remarks + * A connection is considered empty when: + * - The source node doesn't exist in runData + * - The source node's data is undefined + * - The source node's output array is empty + * - The specified output index contains no items */ incomingConnectionIsEmpty( runData: IRunData, inputConnections: IConnection[], runIndex: number, ): boolean { - // for (const inputConnection of workflow.connectionsByDestinationNode[nodeToAdd].main[0]) { for (const inputConnection of inputConnections) { const nodeIncomingData = get(runData, [ inputConnection.node, @@ -460,24 +484,29 @@ export class WorkflowExecute { return true; } + /** + * Prepares the waiting execution data structure for a node that needs to wait for data before it can execute. + * This function initializes arrays to store data and metadata for each connection of the node. + * + * @param nodeName - The name of the node to prepare waiting execution for + * @param numberOfConnections - Number of input connections the node has + * @param runIndex - The index of the current run (for nodes that may run multiple times) + */ prepareWaitingToExecution(nodeName: string, numberOfConnections: number, runIndex: number) { - if (!this.runExecutionData.executionData!.waitingExecutionSource) { - this.runExecutionData.executionData!.waitingExecutionSource = {}; - } + const executionData = this.runExecutionData.executionData!; - this.runExecutionData.executionData!.waitingExecution[nodeName][runIndex] = { - main: [], - }; - this.runExecutionData.executionData!.waitingExecutionSource[nodeName][runIndex] = { - main: [], - }; + executionData.waitingExecution ??= {}; + executionData.waitingExecutionSource ??= {}; + + const nodeWaiting = (executionData.waitingExecution[nodeName] ??= []); + const nodeWaitingSource = (executionData.waitingExecutionSource[nodeName] ??= []); + + nodeWaiting[runIndex] = { main: [] }; + nodeWaitingSource[runIndex] = { main: [] }; for (let i = 0; i < numberOfConnections; i++) { - this.runExecutionData.executionData!.waitingExecution[nodeName][runIndex].main.push(null); - - this.runExecutionData.executionData!.waitingExecutionSource[nodeName][runIndex].main.push( - null, - ); + nodeWaiting[runIndex].main.push(null); + nodeWaitingSource[runIndex].main.push(null); } } @@ -1489,119 +1518,7 @@ export class WorkflowExecute { } if (nodeSuccessData && executionData.node.onError === 'continueErrorOutput') { - // If errorOutput is activated check all the output items for error data. - // If any is found, route them to the last output as that will be the - // error output. - - const nodeType = workflow.nodeTypes.getByNameAndVersion( - executionData.node.type, - executionData.node.typeVersion, - ); - const outputs = NodeHelpers.getNodeOutputs( - workflow, - executionData.node, - nodeType.description, - ); - const outputTypes = NodeHelpers.getConnectionTypes(outputs); - const mainOutputTypes = outputTypes.filter( - (output) => output === NodeConnectionType.Main, - ); - - const errorItems: INodeExecutionData[] = []; - const closeFunctions: CloseFunction[] = []; - // Create a WorkflowDataProxy instance that we can get the data of the - // item which did error - const executeFunctions = new ExecuteContext( - workflow, - executionData.node, - this.additionalData, - this.mode, - this.runExecutionData, - runIndex, - [], - executionData.data, - executionData, - closeFunctions, - this.abortController.signal, - ); - - const dataProxy = executeFunctions.getWorkflowDataProxy(0); - - // Loop over all outputs except the error output as it would not contain data by default - for ( - let outputIndex = 0; - outputIndex < mainOutputTypes.length - 1; - outputIndex++ - ) { - const successItems: INodeExecutionData[] = []; - const items = nodeSuccessData[outputIndex]?.length - ? nodeSuccessData[outputIndex] - : []; - - while (items.length) { - const item = items.shift(); - if (item === undefined) { - continue; - } - - let errorData: GenericValue | undefined; - if (item.error) { - errorData = item.error; - item.error = undefined; - } else if (item.json.error && Object.keys(item.json).length === 1) { - errorData = item.json.error; - } else if ( - item.json.error && - item.json.message && - Object.keys(item.json).length === 2 - ) { - errorData = item.json.error; - } - - if (errorData) { - const pairedItemData = - item.pairedItem && typeof item.pairedItem === 'object' - ? Array.isArray(item.pairedItem) - ? item.pairedItem[0] - : item.pairedItem - : undefined; - - if (executionData!.source === null || pairedItemData === undefined) { - // Source data is missing for some reason so we can not figure out the item - errorItems.push(item); - } else { - const pairedItemInputIndex = pairedItemData.input || 0; - - const sourceData = - executionData!.source[NodeConnectionType.Main][pairedItemInputIndex]; - - const constPairedItem = dataProxy.$getPairedItem( - sourceData!.previousNode, - sourceData, - pairedItemData, - ); - - if (constPairedItem === null) { - errorItems.push(item); - } else { - errorItems.push({ - ...item, - json: { - ...constPairedItem.json, - ...item.json, - }, - }); - } - } - } else { - successItems.push(item); - } - } - - nodeSuccessData[outputIndex] = successItems; - } - - nodeSuccessData[mainOutputTypes.length - 1] = errorItems; + this.handleNodeErrorOutput(workflow, executionData, nodeSuccessData, runIndex); } if (runNodeData.closeFunction) { @@ -1616,53 +1533,7 @@ export class WorkflowExecute { workflowId: workflow.id, }); - if (nodeSuccessData?.length) { - // Check if the output data contains pairedItem data and if not try - // to automatically fix it - - const isSingleInputAndOutput = - executionData.data.main.length === 1 && executionData.data.main[0]?.length === 1; - - const isSameNumberOfItems = - nodeSuccessData.length === 1 && - executionData.data.main.length === 1 && - executionData.data.main[0]?.length === nodeSuccessData[0].length; - - checkOutputData: for (const outputData of nodeSuccessData) { - if (outputData === null) { - continue; - } - for (const [index, item] of outputData.entries()) { - if (item.pairedItem === undefined) { - // The pairedItem data is missing, so check if it can get automatically fixed - if (isSingleInputAndOutput) { - // The node has one input and one incoming item, so we know - // that all items must originate from that single - item.pairedItem = { - item: 0, - }; - } else if (isSameNumberOfItems) { - // The number of oncoming and outcoming items is identical so we can - // make the reasonable assumption that each of the input items - // is the origin of the corresponding output items - item.pairedItem = { - item: index, - }; - } else { - // In all other cases autofixing is not possible - break checkOutputData; - } - } - } - } - } - - if (nodeSuccessData === undefined) { - // Node did not get executed - nodeSuccessData = null; - } else { - this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name; - } + nodeSuccessData = this.assignPairedItems(nodeSuccessData, executionData); if (nodeSuccessData === null || nodeSuccessData[0][0] === undefined) { if (executionData.node.alwaysOutputData === true) { @@ -2175,6 +2046,27 @@ export class WorkflowExecute { }); } + /** + * Processes the final state of a workflow execution and prepares the execution result. + * This method handles different completion scenarios: success, waiting, error, and canceled states. + * It also manages cleanup tasks like static data updates and trigger deactivation. + * + * @param startedAt - The timestamp when the workflow execution started + * @param workflow - The workflow being executed + * @param executionError - Optional error that occurred during execution + * @param closeFunction - Optional promise that handles cleanup of triggers/webhooks + * + * @returns A promise that resolves to the complete workflow execution data (IRun) + * + * @remarks + * The function performs these tasks in order: + * 1. Generates full execution data + * 2. Sets appropriate status based on execution outcome + * 3. Handles any static data changes + * 4. Moves node metadata to its final location + * 5. Executes the 'workflowExecuteAfter' hook + * 6. Performs cleanup via closeFunction if provided + */ async processSuccessExecution( startedAt: Date, workflow: Workflow, @@ -2240,15 +2132,187 @@ export class WorkflowExecute { } getFullRunData(startedAt: Date): IRun { - const fullRunData: IRun = { + return { data: this.runExecutionData, mode: this.mode, startedAt, stoppedAt: new Date(), status: this.status, }; + } - return fullRunData; + handleNodeErrorOutput( + workflow: Workflow, + executionData: IExecuteData, + nodeSuccessData: INodeExecutionData[][], + runIndex: number, + ): void { + const nodeType = workflow.nodeTypes.getByNameAndVersion( + executionData.node.type, + executionData.node.typeVersion, + ); + const outputs = NodeHelpers.getNodeOutputs(workflow, executionData.node, nodeType.description); + const outputTypes = NodeHelpers.getConnectionTypes(outputs); + const mainOutputTypes = outputTypes.filter((output) => output === NodeConnectionType.Main); + + const errorItems: INodeExecutionData[] = []; + const closeFunctions: CloseFunction[] = []; + // Create a WorkflowDataProxy instance that we can get the data of the + // item which did error + const executeFunctions = new ExecuteContext( + workflow, + executionData.node, + this.additionalData, + this.mode, + this.runExecutionData, + runIndex, + [], + executionData.data, + executionData, + closeFunctions, + this.abortController.signal, + ); + + const dataProxy = executeFunctions.getWorkflowDataProxy(0); + + // Loop over all outputs except the error output as it would not contain data by default + for (let outputIndex = 0; outputIndex < mainOutputTypes.length - 1; outputIndex++) { + const successItems: INodeExecutionData[] = []; + const items = nodeSuccessData[outputIndex]?.length ? nodeSuccessData[outputIndex] : []; + + while (items.length) { + const item = items.shift(); + if (item === undefined) { + continue; + } + + let errorData: GenericValue | undefined; + if (item.error) { + errorData = item.error; + item.error = undefined; + } else if (item.json.error && Object.keys(item.json).length === 1) { + errorData = item.json.error; + } else if (item.json.error && item.json.message && Object.keys(item.json).length === 2) { + errorData = item.json.error; + } + + if (errorData) { + const pairedItemData = + item.pairedItem && typeof item.pairedItem === 'object' + ? Array.isArray(item.pairedItem) + ? item.pairedItem[0] + : item.pairedItem + : undefined; + + if (executionData.source === null || pairedItemData === undefined) { + // Source data is missing for some reason so we can not figure out the item + errorItems.push(item); + } else { + const pairedItemInputIndex = pairedItemData.input || 0; + + const sourceData = executionData.source[NodeConnectionType.Main][pairedItemInputIndex]; + + const constPairedItem = dataProxy.$getPairedItem( + sourceData!.previousNode, + sourceData, + pairedItemData, + ); + + if (constPairedItem === null) { + errorItems.push(item); + } else { + errorItems.push({ + ...item, + json: { + ...constPairedItem.json, + ...item.json, + }, + }); + } + } + } else { + successItems.push(item); + } + } + + nodeSuccessData[outputIndex] = successItems; + } + + nodeSuccessData[mainOutputTypes.length - 1] = errorItems; + } + + /** + * Assigns pairedItem information to node output items by matching them with input items. + * PairedItem data is used to track which output items were derived from which input items. + * + * @param nodeSuccessData - The output data from a node execution + * @param executionData - The execution data containing input information + * + * @returns The node output data with pairedItem information assigned where possible + * + * @remarks + * Auto-assignment of pairedItem happens in two scenarios: + * 1. Single input/output: When node has exactly one input item and produces output(s), + * all outputs are marked as derived from that single input (item: 0) + * 2. Matching items count: When number of input and output items match exactly, + * each output item is paired with the input item at the same index + * + * In all other cases, if pairedItem is missing, it remains undefined as automatic + * assignment cannot be done reliably. + */ + assignPairedItems( + nodeSuccessData: INodeExecutionData[][] | null | undefined, + executionData: IExecuteData, + ) { + if (nodeSuccessData?.length) { + // Check if the output data contains pairedItem data and if not try + // to automatically fix it + + const isSingleInputAndOutput = + executionData.data.main.length === 1 && executionData.data.main[0]?.length === 1; + + const isSameNumberOfItems = + nodeSuccessData.length === 1 && + executionData.data.main.length === 1 && + executionData.data.main[0]?.length === nodeSuccessData[0].length; + + checkOutputData: for (const outputData of nodeSuccessData) { + if (outputData === null) { + continue; + } + for (const [index, item] of outputData.entries()) { + if (item.pairedItem === undefined) { + // The pairedItem data is missing, so check if it can get automatically fixed + if (isSingleInputAndOutput) { + // The node has one input and one incoming item, so we know + // that all items must originate from that single + item.pairedItem = { + item: 0, + }; + } else if (isSameNumberOfItems) { + // The number of oncoming and outcoming items is identical so we can + // make the reasonable assumption that each of the input items + // is the origin of the corresponding output items + item.pairedItem = { + item: index, + }; + } else { + // In all other cases autofixing is not possible + break checkOutputData; + } + } + } + } + } + + if (nodeSuccessData === undefined) { + // Node did not get executed + nodeSuccessData = null; + } else { + this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name; + } + + return nodeSuccessData; } private get isCancelled() { diff --git a/packages/core/test/WorkflowExecute.test.ts b/packages/core/test/WorkflowExecute.test.ts index 6a826a8118..6ab3afdaeb 100644 --- a/packages/core/test/WorkflowExecute.test.ts +++ b/packages/core/test/WorkflowExecute.test.ts @@ -12,8 +12,11 @@ import { mock } from 'jest-mock-extended'; import { pick } from 'lodash'; import type { + ExecutionBaseError, + IConnection, IExecuteData, INode, + INodeExecutionData, INodeType, INodeTypes, IPinData, @@ -23,10 +26,12 @@ import type { ITriggerResponse, IWorkflowExecuteAdditionalData, WorkflowTestData, + RelatedExecution, } from 'n8n-workflow'; import { ApplicationError, createDeferredPromise, + NodeConnectionType, NodeExecutionOutput, NodeHelpers, Workflow, @@ -604,4 +609,752 @@ describe('WorkflowExecute', () => { expect(triggerResponse.closeFunction).toHaveBeenCalled(); }); }); + + describe('handleNodeErrorOutput', () => { + const testNode: INode = { + id: '1', + name: 'Node1', + type: 'test.set', + typeVersion: 1, + position: [0, 0], + parameters: {}, + }; + + const nodeType = mock({ + description: { + name: 'test', + displayName: 'test', + defaultVersion: 1, + properties: [], + inputs: [{ type: NodeConnectionType.Main }], + outputs: [ + { type: NodeConnectionType.Main }, + { type: NodeConnectionType.Main, category: 'error' }, + ], + }, + }); + + const nodeTypes = mock(); + + const workflow = new Workflow({ + id: 'test', + nodes: [testNode], + connections: {}, + active: false, + nodeTypes, + }); + + const executionData = { + node: workflow.nodes.Node1, + data: { + main: [ + [ + { + json: { data: 'test' }, + pairedItem: { item: 0, input: 0 }, + }, + ], + ], + }, + source: { + [NodeConnectionType.Main]: [ + { + previousNode: 'previousNode', + previousNodeOutput: 0, + previousNodeRun: 0, + }, + ], + }, + }; + + const runExecutionData: IRunExecutionData = { + resultData: { + runData: { + previousNode: [ + { + data: { + main: [[{ json: { someData: 'test' } }]], + }, + source: [], + startTime: 0, + executionTime: 0, + }, + ], + }, + }, + }; + + let workflowExecute: WorkflowExecute; + + beforeEach(() => { + jest.clearAllMocks(); + + nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); + + workflowExecute = new WorkflowExecute(mock(), 'manual', runExecutionData); + }); + + test('should handle undefined error data input correctly', () => { + const nodeSuccessData: INodeExecutionData[][] = [ + [undefined as unknown as INodeExecutionData], + ]; + workflowExecute.handleNodeErrorOutput(workflow, executionData, nodeSuccessData, 0); + expect(nodeSuccessData[0]).toEqual([undefined]); + expect(nodeSuccessData[1]).toEqual([]); + }); + + test('should handle empty input', () => { + const nodeSuccessData: INodeExecutionData[][] = [[]]; + + workflowExecute.handleNodeErrorOutput(workflow, executionData, nodeSuccessData, 0); + + expect(nodeSuccessData[0]).toHaveLength(0); + expect(nodeSuccessData[1]).toHaveLength(0); + }); + + test('should route error items to last output', () => { + const nodeSuccessData: INodeExecutionData[][] = [ + [ + { + json: { error: 'Test error', additionalData: 'preserved' }, + pairedItem: { item: 0, input: 0 }, + }, + { + json: { regularData: 'success' }, + pairedItem: { item: 1, input: 0 }, + }, + ], + ]; + + workflowExecute.handleNodeErrorOutput(workflow, executionData, nodeSuccessData, 0); + + expect(nodeSuccessData[0]).toEqual([ + { + json: { additionalData: 'preserved', error: 'Test error' }, + pairedItem: { item: 0, input: 0 }, + }, + { json: { regularData: 'success' }, pairedItem: { item: 1, input: 0 } }, + ]); + expect(nodeSuccessData[1]).toEqual([]); + }); + + test('should handle error in json with message property', () => { + const nodeSuccessData: INodeExecutionData[][] = [ + [ + { + json: { + error: 'Error occurred', + message: 'Error details', + }, + pairedItem: { item: 0, input: 0 }, + }, + ], + ]; + + workflowExecute.handleNodeErrorOutput(workflow, executionData, nodeSuccessData, 0); + + expect(nodeSuccessData[0]).toEqual([]); + expect(nodeSuccessData[1]).toEqual([ + { + json: { + error: 'Error occurred', + message: 'Error details', + someData: 'test', + }, + pairedItem: { item: 0, input: 0 }, + }, + ]); + }); + + test('should preserve pairedItem data when routing errors', () => { + const nodeSuccessData: INodeExecutionData[][] = [ + [ + { + json: { error: 'Test error' }, + pairedItem: [ + { item: 0, input: 0 }, + { item: 1, input: 1 }, + ], + }, + ], + ]; + + workflowExecute.handleNodeErrorOutput(workflow, executionData, nodeSuccessData, 0); + + expect(nodeSuccessData[0]).toEqual([]); + expect(nodeSuccessData[1]).toEqual([ + { + json: { someData: 'test', error: 'Test error' }, + pairedItem: [ + { item: 0, input: 0 }, + { item: 1, input: 1 }, + ], + }, + ]); + }); + + test('should route multiple error items correctly', () => { + const nodeSuccessData: INodeExecutionData[][] = [ + [ + { + json: { error: 'Error 1', data: 'preserved1' }, + pairedItem: { item: 0, input: 0 }, + }, + { + json: { error: 'Error 2', data: 'preserved2' }, + pairedItem: { item: 1, input: 0 }, + }, + ], + ]; + + workflowExecute.handleNodeErrorOutput(workflow, executionData, nodeSuccessData, 0); + + expect(nodeSuccessData[1]).toEqual([]); + expect(nodeSuccessData[0]).toEqual([ + { + json: { error: 'Error 1', data: 'preserved1' }, + pairedItem: { item: 0, input: 0 }, + }, + { + json: { error: 'Error 2', data: 'preserved2' }, + pairedItem: { item: 1, input: 0 }, + }, + ]); + }); + + test('should handle complex pairedItem data correctly', () => { + const nodeSuccessData: INodeExecutionData[][] = [ + [ + { + json: { error: 'Test error' }, + pairedItem: [ + { item: 0, input: 0 }, + { item: 1, input: 1 }, + ], + }, + ], + ]; + + workflowExecute.handleNodeErrorOutput(workflow, executionData, nodeSuccessData, 0); + + expect(nodeSuccessData[0]).toEqual([]); + expect(nodeSuccessData[1]).toEqual([ + { + json: { someData: 'test', error: 'Test error' }, + pairedItem: [ + { item: 0, input: 0 }, + { item: 1, input: 1 }, + ], + }, + ]); + }); + }); + + describe('prepareWaitingToExecution', () => { + let runExecutionData: IRunExecutionData; + let workflowExecute: WorkflowExecute; + + beforeEach(() => { + runExecutionData = { + startData: {}, + resultData: { + runData: {}, + pinData: {}, + }, + executionData: { + contextData: {}, + nodeExecutionStack: [], + metadata: {}, + waitingExecution: {}, + waitingExecutionSource: {}, + }, + }; + workflowExecute = new WorkflowExecute(mock(), 'manual', runExecutionData); + }); + + test('should initialize waitingExecutionSource if undefined', () => { + runExecutionData.executionData!.waitingExecutionSource = null; + const nodeName = 'testNode'; + const numberOfConnections = 2; + const runIndex = 0; + + workflowExecute.prepareWaitingToExecution(nodeName, numberOfConnections, runIndex); + + expect(runExecutionData.executionData?.waitingExecutionSource).toBeDefined(); + }); + + test('should create arrays of correct length with null values', () => { + const nodeName = 'testNode'; + const numberOfConnections = 3; + const runIndex = 0; + runExecutionData.executionData!.waitingExecution[nodeName] = {}; + + workflowExecute.prepareWaitingToExecution(nodeName, numberOfConnections, runIndex); + + const nodeWaiting = runExecutionData.executionData!.waitingExecution[nodeName]; + const nodeWaitingSource = runExecutionData.executionData!.waitingExecutionSource![nodeName]; + + expect(nodeWaiting[runIndex].main).toHaveLength(3); + expect(nodeWaiting[runIndex].main).toEqual([null, null, null]); + expect(nodeWaitingSource[runIndex].main).toHaveLength(3); + expect(nodeWaitingSource[runIndex].main).toEqual([null, null, null]); + }); + + test('should work with zero connections', () => { + const nodeName = 'testNode'; + const numberOfConnections = 0; + const runIndex = 0; + runExecutionData.executionData!.waitingExecution[nodeName] = {}; + + workflowExecute.prepareWaitingToExecution(nodeName, numberOfConnections, runIndex); + + expect( + runExecutionData.executionData!.waitingExecution[nodeName][runIndex].main, + ).toHaveLength(0); + expect( + runExecutionData.executionData!.waitingExecutionSource![nodeName][runIndex].main, + ).toHaveLength(0); + }); + + test('should handle multiple run indices', () => { + const nodeName = 'testNode'; + const numberOfConnections = 2; + runExecutionData.executionData!.waitingExecution[nodeName] = {}; + + workflowExecute.prepareWaitingToExecution(nodeName, numberOfConnections, 0); + workflowExecute.prepareWaitingToExecution(nodeName, numberOfConnections, 1); + + const nodeWaiting = runExecutionData.executionData!.waitingExecution[nodeName]; + const nodeWaitingSource = runExecutionData.executionData!.waitingExecutionSource![nodeName]; + + expect(nodeWaiting[0].main).toHaveLength(2); + expect(nodeWaiting[1].main).toHaveLength(2); + expect(nodeWaitingSource[0].main).toHaveLength(2); + expect(nodeWaitingSource[1].main).toHaveLength(2); + }); + }); + + describe('incomingConnectionIsEmpty', () => { + let workflowExecute: WorkflowExecute; + + beforeEach(() => { + workflowExecute = new WorkflowExecute(mock(), 'manual'); + }); + + test('should return true when there are no input connections', () => { + const result = workflowExecute.incomingConnectionIsEmpty({}, [], 0); + expect(result).toBe(true); + }); + + test('should return true when all input connections have no data', () => { + const runData: IRunData = { + node1: [ + { + source: [], + data: { main: [[], []] }, + startTime: 0, + executionTime: 0, + }, + ], + }; + + const inputConnections: IConnection[] = [ + { node: 'node1', type: NodeConnectionType.Main, index: 0 }, + { node: 'node1', type: NodeConnectionType.Main, index: 1 }, + ]; + + const result = workflowExecute.incomingConnectionIsEmpty(runData, inputConnections, 0); + expect(result).toBe(true); + }); + + test('should return true when input connection node does not exist in runData', () => { + const runData: IRunData = {}; + const inputConnections: IConnection[] = [ + { node: 'nonexistentNode', type: NodeConnectionType.Main, index: 0 }, + ]; + + const result = workflowExecute.incomingConnectionIsEmpty(runData, inputConnections, 0); + expect(result).toBe(true); + }); + + test('should return false when any input connection has data', () => { + const runData: IRunData = { + node1: [ + { + source: [], + data: { + main: [[{ json: { data: 'test' } }], []], + }, + startTime: 0, + executionTime: 0, + }, + ], + }; + + const inputConnections: IConnection[] = [ + { node: 'node1', type: NodeConnectionType.Main, index: 0 }, + { node: 'node1', type: NodeConnectionType.Main, index: 1 }, + ]; + + const result = workflowExecute.incomingConnectionIsEmpty(runData, inputConnections, 0); + expect(result).toBe(false); + }); + + test('should check correct run index', () => { + const runData: IRunData = { + node1: [ + { + source: [], + data: { + main: [[]], + }, + startTime: 0, + executionTime: 0, + }, + { + source: [], + data: { + main: [[{ json: { data: 'test' } }]], + }, + startTime: 0, + executionTime: 0, + }, + ], + }; + + const inputConnections: IConnection[] = [ + { node: 'node1', type: NodeConnectionType.Main, index: 0 }, + ]; + + expect(workflowExecute.incomingConnectionIsEmpty(runData, inputConnections, 0)).toBe(true); + expect(workflowExecute.incomingConnectionIsEmpty(runData, inputConnections, 1)).toBe(false); + }); + + test('should handle undefined data in runData correctly', () => { + const runData: IRunData = { + node1: [ + { + source: [], + startTime: 0, + executionTime: 0, + }, + ], + }; + + const inputConnections: IConnection[] = [ + { node: 'node1', type: NodeConnectionType.Main, index: 0 }, + ]; + + const result = workflowExecute.incomingConnectionIsEmpty(runData, inputConnections, 0); + expect(result).toBe(true); + }); + }); + + describe('moveNodeMetadata', () => { + let runExecutionData: IRunExecutionData; + let workflowExecute: WorkflowExecute; + const parentExecution = mock(); + + beforeEach(() => { + runExecutionData = { + startData: {}, + resultData: { + runData: {}, + pinData: {}, + }, + executionData: { + contextData: {}, + nodeExecutionStack: [], + metadata: {}, + waitingExecution: {}, + waitingExecutionSource: {}, + }, + }; + workflowExecute = new WorkflowExecute(mock(), 'manual', runExecutionData); + }); + + test('should do nothing when there is no metadata', () => { + runExecutionData.resultData.runData = { + node1: [{ startTime: 0, executionTime: 0, source: [] }], + }; + + workflowExecute.moveNodeMetadata(); + + expect(runExecutionData.resultData.runData.node1[0].metadata).toBeUndefined(); + }); + + test('should merge metadata into runData for single node', () => { + runExecutionData.resultData.runData = { + node1: [{ startTime: 0, executionTime: 0, source: [] }], + }; + runExecutionData.executionData!.metadata = { + node1: [{ parentExecution }], + }; + + workflowExecute.moveNodeMetadata(); + + expect(runExecutionData.resultData.runData.node1[0].metadata).toEqual({ parentExecution }); + }); + + test('should merge metadata into runData for multiple nodes', () => { + runExecutionData.resultData.runData = { + node1: [{ startTime: 0, executionTime: 0, source: [] }], + node2: [{ startTime: 0, executionTime: 0, source: [] }], + }; + runExecutionData.executionData!.metadata = { + node1: [{ parentExecution }], + node2: [{ subExecutionsCount: 4 }], + }; + + workflowExecute.moveNodeMetadata(); + + const { runData } = runExecutionData.resultData; + expect(runData.node1[0].metadata).toEqual({ parentExecution }); + expect(runData.node2[0].metadata).toEqual({ subExecutionsCount: 4 }); + }); + + test('should preserve existing metadata when merging', () => { + runExecutionData.resultData.runData = { + node1: [ + { + startTime: 0, + executionTime: 0, + source: [], + metadata: { subExecutionsCount: 4 }, + }, + ], + }; + runExecutionData.executionData!.metadata = { + node1: [{ parentExecution }], + }; + + workflowExecute.moveNodeMetadata(); + + expect(runExecutionData.resultData.runData.node1[0].metadata).toEqual({ + parentExecution, + subExecutionsCount: 4, + }); + }); + + test('should handle multiple run indices', () => { + runExecutionData.resultData.runData = { + node1: [ + { startTime: 0, executionTime: 0, source: [] }, + { startTime: 0, executionTime: 0, source: [] }, + ], + }; + runExecutionData.executionData!.metadata = { + node1: [{ parentExecution }, { subExecutionsCount: 4 }], + }; + + workflowExecute.moveNodeMetadata(); + + const { runData } = runExecutionData.resultData; + expect(runData.node1[0].metadata).toEqual({ parentExecution }); + expect(runData.node1[1].metadata).toEqual({ subExecutionsCount: 4 }); + }); + }); + + describe('getFullRunData', () => { + afterAll(() => { + jest.useRealTimers(); + }); + + test('should return complete IRun object with all properties correctly set', () => { + const runExecutionData = mock(); + + const workflowExecute = new WorkflowExecute(mock(), 'manual', runExecutionData); + + const startedAt = new Date('2023-01-01T00:00:00.000Z'); + jest.useFakeTimers().setSystemTime(startedAt); + + const result1 = workflowExecute.getFullRunData(startedAt); + + expect(result1).toEqual({ + data: runExecutionData, + mode: 'manual', + startedAt, + stoppedAt: startedAt, + status: 'new', + }); + + const stoppedAt = new Date('2023-01-01T00:00:10.000Z'); + jest.setSystemTime(stoppedAt); + // @ts-expect-error read-only property + workflowExecute.status = 'running'; + + const result2 = workflowExecute.getFullRunData(startedAt); + + expect(result2).toEqual({ + data: runExecutionData, + mode: 'manual', + startedAt, + stoppedAt, + status: 'running', + }); + }); + }); + + describe('processSuccessExecution', () => { + const startedAt: Date = new Date('2023-01-01T00:00:00.000Z'); + const workflow = new Workflow({ + id: 'test', + nodes: [], + connections: {}, + active: false, + nodeTypes: mock(), + }); + + let runExecutionData: IRunExecutionData; + let workflowExecute: WorkflowExecute; + + beforeEach(() => { + runExecutionData = { + startData: {}, + resultData: { runData: {} }, + executionData: { + contextData: {}, + nodeExecutionStack: [], + metadata: {}, + waitingExecution: {}, + waitingExecutionSource: null, + }, + }; + workflowExecute = new WorkflowExecute(mock(), 'manual', runExecutionData); + + jest.spyOn(workflowExecute, 'executeHook').mockResolvedValue(undefined); + jest.spyOn(workflowExecute, 'moveNodeMetadata').mockImplementation(); + }); + + test('should handle different workflow completion scenarios', async () => { + // Test successful execution + const successResult = await workflowExecute.processSuccessExecution(startedAt, workflow); + expect(successResult.status).toBe('success'); + expect(successResult.finished).toBe(true); + + // Test execution with wait + runExecutionData.waitTill = new Date('2024-01-01'); + const waitResult = await workflowExecute.processSuccessExecution(startedAt, workflow); + expect(waitResult.status).toBe('waiting'); + expect(waitResult.waitTill).toEqual(runExecutionData.waitTill); + + // Test execution with error + const testError = new Error('Test error') as ExecutionBaseError; + + // Reset the status since it was changed by previous tests + // @ts-expect-error read-only property + workflowExecute.status = 'new'; + runExecutionData.waitTill = undefined; + + const errorResult = await workflowExecute.processSuccessExecution( + startedAt, + workflow, + testError, + ); + + expect(errorResult.data.resultData.error).toBeDefined(); + expect(errorResult.data.resultData.error?.message).toBe('Test error'); + + // Test canceled execution + const cancelError = new Error('Workflow execution canceled') as ExecutionBaseError; + const cancelResult = await workflowExecute.processSuccessExecution( + startedAt, + workflow, + cancelError, + ); + expect(cancelResult.data.resultData.error).toBeDefined(); + expect(cancelResult.data.resultData.error?.message).toBe('Workflow execution canceled'); + }); + + test('should handle static data, hooks, and cleanup correctly', async () => { + // Mock static data change + workflow.staticData.__dataChanged = true; + workflow.staticData.testData = 'changed'; + + // Mock cleanup function that's actually a promise + let cleanupCalled = false; + const mockCleanupPromise = new Promise((resolve) => { + setTimeout(() => { + cleanupCalled = true; + resolve(); + }, 0); + }); + + const result = await workflowExecute.processSuccessExecution( + startedAt, + workflow, + undefined, + mockCleanupPromise, + ); + + // Verify static data handling + expect(result).toBeDefined(); + expect(workflowExecute.moveNodeMetadata).toHaveBeenCalled(); + expect(workflowExecute.executeHook).toHaveBeenCalledWith('workflowExecuteAfter', [ + result, + workflow.staticData, + ]); + + // Verify cleanup was called + await mockCleanupPromise; + expect(cleanupCalled).toBe(true); + }); + }); + + describe('assignPairedItems', () => { + let workflowExecute: WorkflowExecute; + + beforeEach(() => { + workflowExecute = new WorkflowExecute(mock(), 'manual'); + }); + + test('should handle undefined node output', () => { + const result = workflowExecute.assignPairedItems( + undefined, + mock({ data: { main: [] } }), + ); + expect(result).toBeNull(); + }); + + test('should auto-fix pairedItem for single input/output scenario', () => { + const nodeOutput = [[{ json: { test: true } }]]; + const executionData = mock({ data: { main: [[{ json: { input: true } }]] } }); + + const result = workflowExecute.assignPairedItems(nodeOutput, executionData); + + expect(result?.[0][0].pairedItem).toEqual({ item: 0 }); + }); + + test('should auto-fix pairedItem when number of items match', () => { + const nodeOutput = [[{ json: { test: 1 } }, { json: { test: 2 } }]]; + const executionData = mock({ + data: { main: [[{ json: { input: 1 } }, { json: { input: 2 } }]] }, + }); + + const result = workflowExecute.assignPairedItems(nodeOutput, executionData); + + expect(result?.[0][0].pairedItem).toEqual({ item: 0 }); + expect(result?.[0][1].pairedItem).toEqual({ item: 1 }); + }); + + test('should not modify existing pairedItem data', () => { + const existingPairedItem = { item: 5, input: 2 }; + const nodeOutput = [[{ json: { test: true }, pairedItem: existingPairedItem }]]; + const executionData = mock({ data: { main: [[{ json: { input: true } }]] } }); + + const result = workflowExecute.assignPairedItems(nodeOutput, executionData); + + expect(result?.[0][0].pairedItem).toEqual(existingPairedItem); + }); + + test('should process multiple output branches correctly', () => { + const nodeOutput = [[{ json: { test: 1 } }], [{ json: { test: 2 } }]]; + const executionData = mock({ data: { main: [[{ json: { input: true } }]] } }); + + const result = workflowExecute.assignPairedItems(nodeOutput, executionData); + + expect(result?.[0][0].pairedItem).toEqual({ item: 0 }); + expect(result?.[1][0].pairedItem).toEqual({ item: 0 }); + }); + }); });