diff --git a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts index 74d6819961..9c92a63361 100644 --- a/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts +++ b/packages/@n8n/nodes-langchain/nodes/agents/Agent/agents/ToolsAgent/execute.ts @@ -33,7 +33,7 @@ function getOutputParserSchema(outputParser: N8nOutputParser): ZodObject data.mimeType.startsWith('image/')) @@ -260,7 +260,7 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise diff --git a/packages/core/src/Agent/index.ts b/packages/core/src/Agent/index.ts deleted file mode 100644 index ed842d99ee..0000000000 --- a/packages/core/src/Agent/index.ts +++ /dev/null @@ -1,61 +0,0 @@ -import type { - IExecuteFunctions, - Workflow, - IRunExecutionData, - INodeExecutionData, - ITaskDataConnections, - INode, - IWorkflowExecuteAdditionalData, - WorkflowExecuteMode, - INodeParameters, - IExecuteData, - IDataObject, - Result, -} from 'n8n-workflow'; -import { createEnvProviderState } from 'n8n-workflow'; - -export const createAgentStartJob = ( - additionalData: IWorkflowExecuteAdditionalData, - inputData: ITaskDataConnections, - node: INode, - workflow: Workflow, - runExecutionData: IRunExecutionData, - runIndex: number, - activeNodeName: string, - connectionInputData: INodeExecutionData[], - siblingParameters: INodeParameters, - mode: WorkflowExecuteMode, - executeData?: IExecuteData, - defaultReturnRunIndex?: number, - selfData?: IDataObject, - contextNodeName?: string, -): IExecuteFunctions['startJob'] => { - return async function startJob( - this: IExecuteFunctions, - jobType: string, - settings: unknown, - itemIndex: number, - ): Promise> { - return await additionalData.startAgentJob( - additionalData, - jobType, - settings, - this, - inputData, - node, - workflow, - runExecutionData, - runIndex, - itemIndex, - activeNodeName, - connectionInputData, - siblingParameters, - mode, - createEnvProviderState(), - executeData, - defaultReturnRunIndex, - selfData, - contextNodeName, - ); - }; -}; diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 4400609fd8..a07674eecd 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -54,9 +54,7 @@ import type { IPollFunctions, IRequestOptions, IRunExecutionData, - ITaskData, ITaskDataConnections, - ITaskMetadata, ITriggerFunctions, IWebhookData, IWebhookDescription, @@ -2021,113 +2019,6 @@ export function getWebhookDescription( return undefined; } -// TODO: Change options to an object -export const addExecutionDataFunctions = async ( - type: 'input' | 'output', - nodeName: string, - data: INodeExecutionData[][] | ExecutionBaseError, - runExecutionData: IRunExecutionData, - connectionType: NodeConnectionType, - additionalData: IWorkflowExecuteAdditionalData, - sourceNodeName: string, - sourceNodeRunIndex: number, - currentNodeRunIndex: number, - metadata?: ITaskMetadata, -): Promise => { - if (connectionType === NodeConnectionType.Main) { - throw new ApplicationError('Setting type is not supported for main connection', { - extra: { type }, - }); - } - - let taskData: ITaskData | undefined; - if (type === 'input') { - taskData = { - startTime: new Date().getTime(), - executionTime: 0, - executionStatus: 'running', - source: [null], - }; - } else { - // At the moment we expect that there is always an input sent before the output - taskData = get( - runExecutionData, - ['resultData', 'runData', nodeName, currentNodeRunIndex], - undefined, - ); - if (taskData === undefined) { - return; - } - taskData.metadata = metadata; - } - taskData = taskData!; - - if (data instanceof Error) { - taskData.executionStatus = 'error'; - taskData.error = data; - } else { - if (type === 'output') { - taskData.executionStatus = 'success'; - } - taskData.data = { - [connectionType]: data, - } as ITaskDataConnections; - } - - if (type === 'input') { - if (!(data instanceof Error)) { - taskData.inputOverride = { - [connectionType]: data, - } as ITaskDataConnections; - } - - if (!runExecutionData.resultData.runData.hasOwnProperty(nodeName)) { - runExecutionData.resultData.runData[nodeName] = []; - } - - runExecutionData.resultData.runData[nodeName][currentNodeRunIndex] = taskData; - if (additionalData.sendDataToUI) { - additionalData.sendDataToUI('nodeExecuteBefore', { - executionId: additionalData.executionId, - nodeName, - }); - } - } else { - // Outputs - taskData.executionTime = new Date().getTime() - taskData.startTime; - - if (additionalData.sendDataToUI) { - additionalData.sendDataToUI('nodeExecuteAfter', { - executionId: additionalData.executionId, - nodeName, - data: taskData, - }); - } - - if (get(runExecutionData, 'executionData.metadata', undefined) === undefined) { - runExecutionData.executionData!.metadata = {}; - } - - let sourceTaskData = get(runExecutionData, ['executionData', 'metadata', sourceNodeName]); - - if (!sourceTaskData) { - runExecutionData.executionData!.metadata[sourceNodeName] = []; - sourceTaskData = runExecutionData.executionData!.metadata[sourceNodeName]; - } - - if (!sourceTaskData[sourceNodeRunIndex]) { - sourceTaskData[sourceNodeRunIndex] = { - subRun: [], - }; - } - - sourceTaskData[sourceNodeRunIndex]!.subRun!.push({ - node: nodeName, - runIndex: currentNodeRunIndex, - }); - } -}; - export async function getInputConnectionData( this: IAllExecuteFunctions, workflow: Workflow, @@ -2139,7 +2030,7 @@ export async function getInputConnectionData( executeData: IExecuteData, mode: WorkflowExecuteMode, closeFunctions: CloseFunction[], - inputName: NodeConnectionType, + connectionType: NodeConnectionType, itemIndex: number, abortSignal?: AbortSignal, ): Promise { @@ -2150,14 +2041,14 @@ export async function getInputConnectionData( let inputConfiguration = inputs.find((input) => { if (typeof input === 'string') { - return input === inputName; + return input === connectionType; } - return input.type === inputName; + return input.type === connectionType; }); if (inputConfiguration === undefined) { throw new ApplicationError('Node does not have input of type', { - extra: { nodeName: node.name, inputName }, + extra: { nodeName: node.name, connectionType }, }); } @@ -2167,114 +2058,103 @@ export async function getInputConnectionData( } as INodeInputConfiguration; } - const parentNodes = workflow.getParentNodes(node.name, inputName, 1); - if (parentNodes.length === 0) { + const connectedNodes = workflow + .getParentNodes(node.name, connectionType, 1) + .map((nodeName) => workflow.getNode(nodeName) as INode) + .filter((connectedNode) => connectedNode.disabled !== true); + + if (connectedNodes.length === 0) { if (inputConfiguration.required) { throw new NodeOperationError( node, - `A ${inputConfiguration?.displayName ?? inputName} sub-node must be connected`, + `A ${inputConfiguration?.displayName ?? connectionType} sub-node must be connected and enabled`, ); } return inputConfiguration.maxConnections === 1 ? undefined : []; } - const constParentNodes = parentNodes - .map((nodeName) => { - return workflow.getNode(nodeName) as INode; - }) - .filter((connectedNode) => connectedNode.disabled !== true) - .map(async (connectedNode) => { - const nodeType = workflow.nodeTypes.getByNameAndVersion( - connectedNode.type, - connectedNode.typeVersion, - ); - const context = new SupplyDataContext( - workflow, - connectedNode, - additionalData, - mode, - runExecutionData, - runIndex, - connectionInputData, - inputData, - executeData, - closeFunctions, - abortSignal, - ); - - if (!nodeType.supplyData) { - if (nodeType.description.outputs.includes(NodeConnectionType.AiTool)) { - nodeType.supplyData = async function (this: ISupplyDataFunctions) { - return createNodeAsTool(this, nodeType, this.getNode().parameters); - }; - } else { - throw new ApplicationError('Node does not have a `supplyData` method defined', { - extra: { nodeName: connectedNode.name }, - }); - } - } - - try { - const response = await nodeType.supplyData.call(context, itemIndex); - if (response.closeFunction) { - closeFunctions.push(response.closeFunction); - } - return response; - } catch (error) { - // Propagate errors from sub-nodes - if (error.functionality === 'configuration-node') throw error; - if (!(error instanceof ExecutionBaseError)) { - error = new NodeOperationError(connectedNode, error, { - itemIndex, - }); - } - - let currentNodeRunIndex = 0; - if (runExecutionData.resultData.runData.hasOwnProperty(node.name)) { - currentNodeRunIndex = runExecutionData.resultData.runData[node.name].length; - } - - // Display the error on the node which is causing it - await addExecutionDataFunctions( - 'input', - connectedNode.name, - error, - runExecutionData, - inputName, - additionalData, - node.name, - runIndex, - currentNodeRunIndex, - ); - - // Display on the calling node which node has the error - throw new NodeOperationError(connectedNode, `Error in sub-node ${connectedNode.name}`, { - itemIndex, - functionality: 'configuration-node', - description: error.message, - }); - } - }); - - // Validate the inputs - const nodes = await Promise.all(constParentNodes); - - if (inputConfiguration.required && nodes.length === 0) { - throw new NodeOperationError( - node, - `A ${inputConfiguration?.displayName ?? inputName} sub-node must be connected`, - ); - } if ( inputConfiguration.maxConnections !== undefined && - nodes.length > inputConfiguration.maxConnections + connectedNodes.length > inputConfiguration.maxConnections ) { throw new NodeOperationError( node, - `Only ${inputConfiguration.maxConnections} ${inputName} sub-nodes are/is allowed to be connected`, + `Only ${inputConfiguration.maxConnections} ${connectionType} sub-nodes are/is allowed to be connected`, ); } + const constParentNodes = connectedNodes.map(async (connectedNode) => { + const nodeType = workflow.nodeTypes.getByNameAndVersion( + connectedNode.type, + connectedNode.typeVersion, + ); + const context = new SupplyDataContext( + workflow, + connectedNode, + additionalData, + mode, + runExecutionData, + runIndex, + connectionInputData, + inputData, + executeData, + closeFunctions, + abortSignal, + ); + + if (!nodeType.supplyData) { + if (nodeType.description.outputs.includes(NodeConnectionType.AiTool)) { + nodeType.supplyData = async function (this: ISupplyDataFunctions) { + return createNodeAsTool(this, nodeType, this.getNode().parameters); + }; + } else { + throw new ApplicationError('Node does not have a `supplyData` method defined', { + extra: { nodeName: connectedNode.name }, + }); + } + } + + try { + const response = await nodeType.supplyData.call(context, itemIndex); + if (response.closeFunction) { + closeFunctions.push(response.closeFunction); + } + return response; + } catch (error) { + // Propagate errors from sub-nodes + if (error.functionality === 'configuration-node') throw error; + if (!(error instanceof ExecutionBaseError)) { + error = new NodeOperationError(connectedNode, error, { + itemIndex, + }); + } + + let currentNodeRunIndex = 0; + if (runExecutionData.resultData.runData.hasOwnProperty(node.name)) { + currentNodeRunIndex = runExecutionData.resultData.runData[node.name].length; + } + + // Display the error on the node which is causing it + await context.addExecutionDataFunctions( + 'input', + error, + connectionType, + node.name, + currentNodeRunIndex, + ); + + // Display on the calling node which node has the error + throw new NodeOperationError(connectedNode, `Error in sub-node ${connectedNode.name}`, { + itemIndex, + functionality: 'configuration-node', + description: error.message, + }); + } + }); + + // Validate the inputs + const nodes = await Promise.all(constParentNodes); + return inputConfiguration.maxConnections === 1 ? (nodes || [])[0]?.response : nodes.map((node) => node.response); diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 2990d61cf8..27244fafc3 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -1018,8 +1018,8 @@ export class WorkflowExecute { // Update the pairedItem information on items const newTaskDataConnections: ITaskDataConnections = {}; - for (const inputName of Object.keys(executionData.data)) { - newTaskDataConnections[inputName] = executionData.data[inputName].map( + for (const connectionType of Object.keys(executionData.data)) { + newTaskDataConnections[connectionType] = executionData.data[connectionType].map( (input, inputIndex) => { if (input === null) { return input; diff --git a/packages/core/src/node-execution-context/__tests__/execute-context.test.ts b/packages/core/src/node-execution-context/__tests__/execute-context.test.ts index 723bab24f3..a888a5a7ff 100644 --- a/packages/core/src/node-execution-context/__tests__/execute-context.test.ts +++ b/packages/core/src/node-execution-context/__tests__/execute-context.test.ts @@ -14,7 +14,7 @@ import type { INodeTypes, ICredentialDataDecryptedObject, } from 'n8n-workflow'; -import { ApplicationError, ExpressionError } from 'n8n-workflow'; +import { ApplicationError, ExpressionError, NodeConnectionType } from 'n8n-workflow'; import { describeCommonTests } from './shared-tests'; import { ExecuteContext } from '../execute-context'; @@ -92,33 +92,39 @@ describe('ExecuteContext', () => { describe('getInputData', () => { const inputIndex = 0; - const inputName = 'main'; + const connectionType = NodeConnectionType.Main; afterEach(() => { - inputData[inputName] = [[{ json: { test: 'data' } }]]; + inputData[connectionType] = [[{ json: { test: 'data' } }]]; }); it('should return the input data correctly', () => { const expectedData = [{ json: { test: 'data' } }]; - expect(executeContext.getInputData(inputIndex, inputName)).toEqual(expectedData); + expect(executeContext.getInputData(inputIndex, connectionType)).toEqual(expectedData); }); it('should return an empty array if the input name does not exist', () => { - const inputName = 'nonExistent'; - expect(executeContext.getInputData(inputIndex, inputName)).toEqual([]); + const connectionType = 'nonExistent'; + expect(executeContext.getInputData(inputIndex, connectionType as NodeConnectionType)).toEqual( + [], + ); }); it('should throw an error if the input index is out of range', () => { const inputIndex = 2; - expect(() => executeContext.getInputData(inputIndex, inputName)).toThrow(ApplicationError); + expect(() => executeContext.getInputData(inputIndex, connectionType)).toThrow( + ApplicationError, + ); }); it('should throw an error if the input index was not set', () => { inputData.main[inputIndex] = null; - expect(() => executeContext.getInputData(inputIndex, inputName)).toThrow(ApplicationError); + expect(() => executeContext.getInputData(inputIndex, connectionType)).toThrow( + ApplicationError, + ); }); }); diff --git a/packages/core/src/node-execution-context/__tests__/execute-single-context.test.ts b/packages/core/src/node-execution-context/__tests__/execute-single-context.test.ts index e62c2b0f46..6c1b9f1089 100644 --- a/packages/core/src/node-execution-context/__tests__/execute-single-context.test.ts +++ b/packages/core/src/node-execution-context/__tests__/execute-single-context.test.ts @@ -14,7 +14,7 @@ import type { INodeTypes, ICredentialDataDecryptedObject, } from 'n8n-workflow'; -import { ApplicationError } from 'n8n-workflow'; +import { ApplicationError, NodeConnectionType } from 'n8n-workflow'; import { describeCommonTests } from './shared-tests'; import { ExecuteSingleContext } from '../execute-single-context'; @@ -91,29 +91,31 @@ describe('ExecuteSingleContext', () => { describe('getInputData', () => { const inputIndex = 0; - const inputName = 'main'; + const connectionType = NodeConnectionType.Main; afterEach(() => { - inputData[inputName] = [[{ json: { test: 'data' } }]]; + inputData[connectionType] = [[{ json: { test: 'data' } }]]; }); it('should return the input data correctly', () => { const expectedData = { json: { test: 'data' } }; - expect(executeSingleContext.getInputData(inputIndex, inputName)).toEqual(expectedData); + expect(executeSingleContext.getInputData(inputIndex, connectionType)).toEqual(expectedData); }); it('should return an empty object if the input name does not exist', () => { - const inputName = 'nonExistent'; + const connectionType = 'nonExistent'; const expectedData = { json: {} }; - expect(executeSingleContext.getInputData(inputIndex, inputName)).toEqual(expectedData); + expect( + executeSingleContext.getInputData(inputIndex, connectionType as NodeConnectionType), + ).toEqual(expectedData); }); it('should throw an error if the input index is out of range', () => { const inputIndex = 1; - expect(() => executeSingleContext.getInputData(inputIndex, inputName)).toThrow( + expect(() => executeSingleContext.getInputData(inputIndex, connectionType)).toThrow( ApplicationError, ); }); @@ -121,7 +123,7 @@ describe('ExecuteSingleContext', () => { it('should throw an error if the input index was not set', () => { inputData.main[inputIndex] = null; - expect(() => executeSingleContext.getInputData(inputIndex, inputName)).toThrow( + expect(() => executeSingleContext.getInputData(inputIndex, connectionType)).toThrow( ApplicationError, ); }); @@ -129,7 +131,7 @@ describe('ExecuteSingleContext', () => { it('should throw an error if the value of input with given index was not set', () => { delete inputData.main[inputIndex]![itemIndex]; - expect(() => executeSingleContext.getInputData(inputIndex, inputName)).toThrow( + expect(() => executeSingleContext.getInputData(inputIndex, connectionType)).toThrow( ApplicationError, ); }); diff --git a/packages/core/src/node-execution-context/__tests__/supply-data-context.test.ts b/packages/core/src/node-execution-context/__tests__/supply-data-context.test.ts index 6c5a3849dd..d3ebddc75c 100644 --- a/packages/core/src/node-execution-context/__tests__/supply-data-context.test.ts +++ b/packages/core/src/node-execution-context/__tests__/supply-data-context.test.ts @@ -14,7 +14,7 @@ import type { INodeTypes, ICredentialDataDecryptedObject, } from 'n8n-workflow'; -import { ApplicationError } from 'n8n-workflow'; +import { ApplicationError, NodeConnectionType } from 'n8n-workflow'; import { describeCommonTests } from './shared-tests'; import { SupplyDataContext } from '../supply-data-context'; @@ -56,7 +56,8 @@ describe('SupplyDataContext', () => { const mode: WorkflowExecuteMode = 'manual'; const runExecutionData = mock(); const connectionInputData: INodeExecutionData[] = []; - const inputData: ITaskDataConnections = { main: [[{ json: { test: 'data' } }]] }; + const connectionType = NodeConnectionType.Main; + const inputData: ITaskDataConnections = { [connectionType]: [[{ json: { test: 'data' } }]] }; const executeData = mock(); const runIndex = 0; const closeFn = jest.fn(); @@ -91,33 +92,38 @@ describe('SupplyDataContext', () => { describe('getInputData', () => { const inputIndex = 0; - const inputName = 'main'; afterEach(() => { - inputData[inputName] = [[{ json: { test: 'data' } }]]; + inputData[connectionType] = [[{ json: { test: 'data' } }]]; }); it('should return the input data correctly', () => { const expectedData = [{ json: { test: 'data' } }]; - expect(supplyDataContext.getInputData(inputIndex, inputName)).toEqual(expectedData); + expect(supplyDataContext.getInputData(inputIndex, connectionType)).toEqual(expectedData); }); it('should return an empty array if the input name does not exist', () => { - const inputName = 'nonExistent'; - expect(supplyDataContext.getInputData(inputIndex, inputName)).toEqual([]); + const connectionType = 'nonExistent'; + expect( + supplyDataContext.getInputData(inputIndex, connectionType as NodeConnectionType), + ).toEqual([]); }); it('should throw an error if the input index is out of range', () => { const inputIndex = 2; - expect(() => supplyDataContext.getInputData(inputIndex, inputName)).toThrow(ApplicationError); + expect(() => supplyDataContext.getInputData(inputIndex, connectionType)).toThrow( + ApplicationError, + ); }); it('should throw an error if the input index was not set', () => { inputData.main[inputIndex] = null; - expect(() => supplyDataContext.getInputData(inputIndex, inputName)).toThrow(ApplicationError); + expect(() => supplyDataContext.getInputData(inputIndex, connectionType)).toThrow( + ApplicationError, + ); }); }); diff --git a/packages/core/src/node-execution-context/base-execute-context.ts b/packages/core/src/node-execution-context/base-execute-context.ts index 0794a263b0..4f186e5597 100644 --- a/packages/core/src/node-execution-context/base-execute-context.ts +++ b/packages/core/src/node-execution-context/base-execute-context.ts @@ -21,6 +21,7 @@ import type { IWorkflowDataProxyData, ISourceData, AiEvent, + NodeConnectionType, } from 'n8n-workflow'; import { ApplicationError, NodeHelpers, WAIT_INDEFINITELY, WorkflowDataProxy } from 'n8n-workflow'; import { Container } from 'typedi'; @@ -137,6 +138,24 @@ export class BaseExecuteContext extends NodeExecutionContext { return { ...result, data }; } + protected getInputItems(inputIndex: number, connectionType: NodeConnectionType) { + const inputData = this.inputData[connectionType]; + if (inputData.length < inputIndex) { + throw new ApplicationError('Could not get input with given index', { + extra: { inputIndex, connectionType }, + }); + } + + const allItems = inputData[inputIndex] as INodeExecutionData[] | null | undefined; + if (allItems === null) { + throw new ApplicationError('Input index was not set', { + extra: { inputIndex, connectionType }, + }); + } + + return allItems; + } + getNodeInputs(): INodeInputConfiguration[] { const nodeType = this.workflow.nodeTypes.getByNameAndVersion( this.node.type, @@ -157,12 +176,12 @@ export class BaseExecuteContext extends NodeExecutionContext { ); } - getInputSourceData(inputIndex = 0, inputName = 'main'): ISourceData { + getInputSourceData(inputIndex = 0, connectionType = 'main'): ISourceData { if (this.executeData?.source === null) { // Should never happen as n8n sets it automatically throw new ApplicationError('Source data is missing'); } - return this.executeData.source[inputName][inputIndex]!; + return this.executeData.source[connectionType][inputIndex]!; } getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData { diff --git a/packages/core/src/node-execution-context/execute-context.ts b/packages/core/src/node-execution-context/execute-context.ts index c587fb2168..2424697c6f 100644 --- a/packages/core/src/node-execution-context/execute-context.ts +++ b/packages/core/src/node-execution-context/execute-context.ts @@ -1,7 +1,6 @@ import type { CallbackManager, CloseFunction, - ExecutionBaseError, IExecuteData, IExecuteFunctions, IExecuteResponsePromiseData, @@ -10,15 +9,18 @@ import type { INodeExecutionData, IRunExecutionData, ITaskDataConnections, - ITaskMetadata, IWorkflowExecuteAdditionalData, - NodeConnectionType, + Result, Workflow, WorkflowExecuteMode, } from 'n8n-workflow'; -import { ApplicationError, createDeferredPromise } from 'n8n-workflow'; +import { + ApplicationError, + createDeferredPromise, + createEnvProviderState, + NodeConnectionType, +} from 'n8n-workflow'; -import { createAgentStartJob } from '@/Agent'; // eslint-disable-next-line import/no-cycle import { returnJsonArray, @@ -26,7 +28,6 @@ import { normalizeItems, constructExecutionMetaData, getInputConnectionData, - addExecutionDataFunctions, assertBinaryData, getBinaryDataBuffer, copyBinaryFile, @@ -46,8 +47,6 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti readonly getNodeParameter: IExecuteFunctions['getNodeParameter']; - readonly startJob: IExecuteFunctions['startJob']; - constructor( workflow: Workflow, node: INode, @@ -122,23 +121,37 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti fallbackValue, options, )) as IExecuteFunctions['getNodeParameter']; + } - this.startJob = createAgentStartJob( + async startJob( + jobType: string, + settings: unknown, + itemIndex: number, + ): Promise> { + return await this.additionalData.startAgentJob( this.additionalData, + jobType, + settings, + this, this.inputData, this.node, this.workflow, this.runExecutionData, this.runIndex, + itemIndex, this.node.name, this.connectionInputData, {}, this.mode, + createEnvProviderState(), this.executeData, ); } - async getInputConnectionData(inputName: NodeConnectionType, itemIndex: number): Promise { + async getInputConnectionData( + connectionType: NodeConnectionType, + itemIndex: number, + ): Promise { return await getInputConnectionData.call( this, this.workflow, @@ -150,33 +163,18 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti this.executeData, this.mode, this.closeFunctions, - inputName, + connectionType, itemIndex, this.abortSignal, ); } - getInputData(inputIndex = 0, inputName = 'main') { - if (!this.inputData.hasOwnProperty(inputName)) { + getInputData(inputIndex = 0, connectionType = NodeConnectionType.Main) { + if (!this.inputData.hasOwnProperty(connectionType)) { // Return empty array because else it would throw error when nothing is connected to input return []; } - - const inputData = this.inputData[inputName]; - // TODO: Check if nodeType has input with that index defined - if (inputData.length < inputIndex) { - throw new ApplicationError('Could not get input with given index', { - extra: { inputIndex, inputName }, - }); - } - - if (inputData[inputIndex] === null) { - throw new ApplicationError('Value of input was not set', { - extra: { inputIndex, inputName }, - }); - } - - return inputData[inputIndex]; + return super.getInputItems(inputIndex, connectionType) ?? []; } logNodeOutput(...args: unknown[]): void { @@ -194,60 +192,14 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti await this.additionalData.hooks?.executeHookFunctions('sendResponse', [response]); } - addInputData( - connectionType: NodeConnectionType, - data: INodeExecutionData[][] | ExecutionBaseError, - ): { index: number } { - const nodeName = this.node.name; - let currentNodeRunIndex = 0; - if (this.runExecutionData.resultData.runData.hasOwnProperty(nodeName)) { - currentNodeRunIndex = this.runExecutionData.resultData.runData[nodeName].length; - } - - void addExecutionDataFunctions( - 'input', - nodeName, - data, - this.runExecutionData, - connectionType, - this.additionalData, - nodeName, - this.runIndex, - currentNodeRunIndex, - ).catch((error) => { - this.logger.warn( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - `There was a problem logging input data of node "${nodeName}": ${error.message}`, - ); - }); - - return { index: currentNodeRunIndex }; + /** @deprecated use ISupplyDataFunctions.addInputData */ + addInputData(): { index: number } { + throw new ApplicationError('addInputData should not be called on IExecuteFunctions'); } - addOutputData( - connectionType: NodeConnectionType, - currentNodeRunIndex: number, - data: INodeExecutionData[][] | ExecutionBaseError, - metadata?: ITaskMetadata, - ): void { - const nodeName = this.node.name; - addExecutionDataFunctions( - 'output', - nodeName, - data, - this.runExecutionData, - connectionType, - this.additionalData, - nodeName, - this.runIndex, - currentNodeRunIndex, - metadata, - ).catch((error) => { - this.logger.warn( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - `There was a problem logging output data of node "${nodeName}": ${error.message}`, - ); - }); + /** @deprecated use ISupplyDataFunctions.addOutputData */ + addOutputData(): void { + throw new ApplicationError('addOutputData should not be called on IExecuteFunctions'); } getParentCallbackManager(): CallbackManager | undefined { diff --git a/packages/core/src/node-execution-context/execute-single-context.ts b/packages/core/src/node-execution-context/execute-single-context.ts index 91c7fcf683..cb46ea9c91 100644 --- a/packages/core/src/node-execution-context/execute-single-context.ts +++ b/packages/core/src/node-execution-context/execute-single-context.ts @@ -11,7 +11,7 @@ import type { ITaskDataConnections, IExecuteData, } from 'n8n-workflow'; -import { ApplicationError, createDeferredPromise } from 'n8n-workflow'; +import { ApplicationError, createDeferredPromise, NodeConnectionType } from 'n8n-workflow'; // eslint-disable-next-line import/no-cycle import { @@ -76,31 +76,18 @@ export class ExecuteSingleContext extends BaseExecuteContext implements IExecute return super.evaluateExpression(expression, itemIndex); } - getInputData(inputIndex = 0, inputName = 'main') { - if (!this.inputData.hasOwnProperty(inputName)) { + getInputData(inputIndex = 0, connectionType = NodeConnectionType.Main) { + if (!this.inputData.hasOwnProperty(connectionType)) { // Return empty array because else it would throw error when nothing is connected to input return { json: {} }; } - // TODO: Check if nodeType has input with that index defined - if (this.inputData[inputName].length < inputIndex) { - throw new ApplicationError('Could not get input index', { - extra: { inputIndex, inputName }, - }); - } + const allItems = super.getInputItems(inputIndex, connectionType); - const allItems = this.inputData[inputName][inputIndex]; - - if (allItems === null || allItems === undefined) { - throw new ApplicationError('Input index was not set', { - extra: { inputIndex, inputName }, - }); - } - - const data = allItems[this.itemIndex]; - if (data === null || data === undefined) { + const data = allItems?.[this.itemIndex]; + if (data === undefined) { throw new ApplicationError('Value of input with given index was not set', { - extra: { inputIndex, inputName, itemIndex: this.itemIndex }, + extra: { inputIndex, connectionType, itemIndex: this.itemIndex }, }); } diff --git a/packages/core/src/node-execution-context/supply-data-context.ts b/packages/core/src/node-execution-context/supply-data-context.ts index bab9d11108..0155b9d85e 100644 --- a/packages/core/src/node-execution-context/supply-data-context.ts +++ b/packages/core/src/node-execution-context/supply-data-context.ts @@ -1,19 +1,21 @@ +import get from 'lodash/get'; import type { CloseFunction, + ExecutionBaseError, IExecuteData, IGetNodeParameterOptions, INode, INodeExecutionData, IRunExecutionData, ISupplyDataFunctions, + ITaskData, ITaskDataConnections, ITaskMetadata, IWorkflowExecuteAdditionalData, - NodeConnectionType, Workflow, WorkflowExecuteMode, } from 'n8n-workflow'; -import { ApplicationError, createDeferredPromise } from 'n8n-workflow'; +import { ApplicationError, NodeConnectionType, createDeferredPromise } from 'n8n-workflow'; // eslint-disable-next-line import/no-cycle import { @@ -29,7 +31,6 @@ import { normalizeItems, returnJsonArray, getInputConnectionData, - addExecutionDataFunctions, } from '@/NodeExecuteFunctions'; import { BaseExecuteContext } from './base-execute-context'; @@ -104,7 +105,10 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData )) as ISupplyDataFunctions['getNodeParameter']; } - async getInputConnectionData(inputName: NodeConnectionType, itemIndex: number): Promise { + async getInputConnectionData( + connectionType: NodeConnectionType, + itemIndex: number, + ): Promise { return await getInputConnectionData.call( this, this.workflow, @@ -116,34 +120,21 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData this.executeData, this.mode, this.closeFunctions, - inputName, + connectionType, itemIndex, this.abortSignal, ); } - getInputData(inputIndex = 0, inputName = 'main') { - if (!this.inputData.hasOwnProperty(inputName)) { + getInputData(inputIndex = 0, connectionType = NodeConnectionType.Main) { + if (!this.inputData.hasOwnProperty(connectionType)) { // Return empty array because else it would throw error when nothing is connected to input return []; } - - // TODO: Check if nodeType has input with that index defined - if (this.inputData[inputName].length < inputIndex) { - throw new ApplicationError('Could not get input with given index', { - extra: { inputIndex, inputName }, - }); - } - - if (this.inputData[inputName][inputIndex] === null) { - throw new ApplicationError('Value of input was not set', { - extra: { inputIndex, inputName }, - }); - } - - return this.inputData[inputName][inputIndex]; + return super.getInputItems(inputIndex, connectionType) ?? []; } + /** @deprecated create a context object with inputData for every runIndex */ addInputData( connectionType: NodeConnectionType, data: INodeExecutionData[][], @@ -154,15 +145,11 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData currentNodeRunIndex = this.runExecutionData.resultData.runData[nodeName].length; } - addExecutionDataFunctions( + this.addExecutionDataFunctions( 'input', - nodeName, data, - this.runExecutionData, connectionType, - this.additionalData, nodeName, - this.runIndex, currentNodeRunIndex, ).catch((error) => { this.logger.warn( @@ -176,6 +163,7 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData return { index: currentNodeRunIndex }; } + /** @deprecated Switch to WorkflowExecute to store output on runExecutionData.resultData.runData */ addOutputData( connectionType: NodeConnectionType, currentNodeRunIndex: number, @@ -183,15 +171,11 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData metadata?: ITaskMetadata, ): void { const nodeName = this.node.name; - addExecutionDataFunctions( + this.addExecutionDataFunctions( 'output', - nodeName, data, - this.runExecutionData, connectionType, - this.additionalData, nodeName, - this.runIndex, currentNodeRunIndex, metadata, ).catch((error) => { @@ -203,4 +187,115 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData ); }); } + + async addExecutionDataFunctions( + type: 'input' | 'output', + data: INodeExecutionData[][] | ExecutionBaseError, + connectionType: NodeConnectionType, + sourceNodeName: string, + currentNodeRunIndex: number, + metadata?: ITaskMetadata, + ): Promise { + if (connectionType === NodeConnectionType.Main) { + throw new ApplicationError('Setting type is not supported for main connection', { + extra: { type }, + }); + } + + const { + additionalData, + runExecutionData, + runIndex: sourceNodeRunIndex, + node: { name: nodeName }, + } = this; + + let taskData: ITaskData | undefined; + if (type === 'input') { + taskData = { + startTime: new Date().getTime(), + executionTime: 0, + executionStatus: 'running', + source: [null], + }; + } else { + // At the moment we expect that there is always an input sent before the output + taskData = get( + runExecutionData, + ['resultData', 'runData', nodeName, currentNodeRunIndex], + undefined, + ); + if (taskData === undefined) { + return; + } + taskData.metadata = metadata; + } + taskData = taskData!; + + if (data instanceof Error) { + taskData.executionStatus = 'error'; + taskData.error = data; + } else { + if (type === 'output') { + taskData.executionStatus = 'success'; + } + taskData.data = { + [connectionType]: data, + } as ITaskDataConnections; + } + + if (type === 'input') { + if (!(data instanceof Error)) { + this.inputData[connectionType] = data; + // TODO: remove inputOverride + taskData.inputOverride = { + [connectionType]: data, + } as ITaskDataConnections; + } + + if (!runExecutionData.resultData.runData.hasOwnProperty(nodeName)) { + runExecutionData.resultData.runData[nodeName] = []; + } + + runExecutionData.resultData.runData[nodeName][currentNodeRunIndex] = taskData; + if (additionalData.sendDataToUI) { + additionalData.sendDataToUI('nodeExecuteBefore', { + executionId: additionalData.executionId, + nodeName, + }); + } + } else { + // Outputs + taskData.executionTime = new Date().getTime() - taskData.startTime; + + if (additionalData.sendDataToUI) { + additionalData.sendDataToUI('nodeExecuteAfter', { + executionId: additionalData.executionId, + nodeName, + data: taskData, + }); + } + + if (get(runExecutionData, 'executionData.metadata', undefined) === undefined) { + runExecutionData.executionData!.metadata = {}; + } + + let sourceTaskData = runExecutionData.executionData?.metadata?.[sourceNodeName]; + + if (!sourceTaskData) { + runExecutionData.executionData!.metadata[sourceNodeName] = []; + sourceTaskData = runExecutionData.executionData!.metadata[sourceNodeName]; + } + + if (!sourceTaskData[sourceNodeRunIndex]) { + sourceTaskData[sourceNodeRunIndex] = { + subRun: [], + }; + } + + sourceTaskData[sourceNodeRunIndex].subRun!.push({ + node: nodeName, + runIndex: currentNodeRunIndex, + }); + } + } } diff --git a/packages/core/src/node-execution-context/webhook-context.ts b/packages/core/src/node-execution-context/webhook-context.ts index e1dae9c1de..8f55b26097 100644 --- a/packages/core/src/node-execution-context/webhook-context.ts +++ b/packages/core/src/node-execution-context/webhook-context.ts @@ -138,7 +138,10 @@ export class WebhookContext extends NodeExecutionContext implements IWebhookFunc return this.webhookData.webhookDescription.name; } - async getInputConnectionData(inputName: NodeConnectionType, itemIndex: number): Promise { + async getInputConnectionData( + connectionType: NodeConnectionType, + itemIndex: number, + ): Promise { // To be able to use expressions like "$json.sessionId" set the // body data the webhook received to what is normally used for // incoming node data. @@ -170,7 +173,7 @@ export class WebhookContext extends NodeExecutionContext implements IWebhookFunc executeData, this.mode, this.closeFunctions, - inputName, + connectionType, itemIndex, ); } diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index b5bca9c5c3..f88db21ae7 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -943,7 +943,7 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & { getContext(type: ContextType): IContextObject; getExecuteData(): IExecuteData; getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData; - getInputSourceData(inputIndex?: number, inputName?: string): ISourceData; + getInputSourceData(inputIndex?: number, connectionType?: NodeConnectionType): ISourceData; getExecutionCancelSignal(): AbortSignal | undefined; onExecutionCancellation(handler: () => unknown): void; logAiEvent(eventName: AiEvent, msg?: string | undefined): void; @@ -962,11 +962,11 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & }, ): Promise; getInputConnectionData( - inputName: NodeConnectionType, + connectionType: NodeConnectionType, itemIndex: number, inputIndex?: number, ): Promise; - getInputData(inputIndex?: number, inputName?: string): INodeExecutionData[]; + getInputData(inputIndex?: number, connectionType?: NodeConnectionType): INodeExecutionData[]; getNodeInputs(): INodeInputConfiguration[]; getNodeOutputs(): INodeOutputConfiguration[]; putExecutionToWait(waitTill: Date): Promise; @@ -1013,7 +1013,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & }; export interface IExecuteSingleFunctions extends BaseExecutionFunctions { - getInputData(inputIndex?: number, inputName?: string): INodeExecutionData; + getInputData(inputIndex?: number, connectionType?: NodeConnectionType): INodeExecutionData; getItemIndex(): number; getNodeParameter( parameterName: string, @@ -1127,7 +1127,7 @@ export interface IWebhookFunctions extends FunctionsBaseWithRequiredKeys<'getMod getBodyData(): IDataObject; getHeaderData(): IncomingHttpHeaders; getInputConnectionData( - inputName: NodeConnectionType, + connectionType: NodeConnectionType, itemIndex: number, inputIndex?: number, ): Promise; @@ -2372,9 +2372,6 @@ export interface IWorkflowExecuteAdditionalData { mode: WorkflowExecuteMode, envProviderState: EnvProviderState, executeData?: IExecuteData, - defaultReturnRunIndex?: number, - selfData?: IDataObject, - contextNodeName?: string, ): Promise>; } diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index b2eb597e2e..19f0e14e7f 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -1357,8 +1357,8 @@ export class Workflow { if (node.executeOnce === true) { // If node should be executed only once so use only the first input item const newInputData: ITaskDataConnections = {}; - for (const inputName of Object.keys(inputData)) { - newInputData[inputName] = inputData[inputName].map((input) => { + for (const connectionType of Object.keys(inputData)) { + newInputData[connectionType] = inputData[connectionType].map((input) => { // eslint-disable-next-line @typescript-eslint/prefer-optional-chain return input && input.slice(0, 1); });