refactor(core): Refactor execution contexts to reduce code duplication, and improve type-safety (no-changelog) (#12138)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-12-11 11:26:27 +01:00 committed by GitHub
parent e6985f79db
commit ec54333f78
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 329 additions and 449 deletions

View file

@ -33,7 +33,7 @@ function getOutputParserSchema(outputParser: N8nOutputParser): ZodObject<any, an
}
async function extractBinaryMessages(ctx: IExecuteFunctions) {
const binaryData = ctx.getInputData(0, 'main')?.[0]?.binary ?? {};
const binaryData = ctx.getInputData()?.[0]?.binary ?? {};
const binaryMessages = await Promise.all(
Object.values(binaryData)
.filter((data) => data.mimeType.startsWith('image/'))
@ -260,7 +260,7 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
['human', '{input}'],
];
const hasBinaryData = this.getInputData(0, 'main')?.[0]?.binary !== undefined;
const hasBinaryData = this.getInputData()?.[0]?.binary !== undefined;
if (hasBinaryData && passthroughBinaryImages) {
const binaryMessage = await extractBinaryMessages(this);
messages.push(binaryMessage);

View file

@ -1036,9 +1036,6 @@ export async function getBase(
mode: WorkflowExecuteMode,
envProviderState: EnvProviderState,
executeData?: IExecuteData,
defaultReturnRunIndex?: number,
selfData?: IDataObject,
contextNodeName?: string,
) {
return await Container.get(TaskManager).startTask(
additionalData,
@ -1057,9 +1054,6 @@ export async function getBase(
mode,
envProviderState,
executeData,
defaultReturnRunIndex,
selfData,
contextNodeName,
);
},
logAiEvent: (eventName: keyof AiEventMap, payload: AiEventPayload) =>

View file

@ -1,61 +0,0 @@
import type {
IExecuteFunctions,
Workflow,
IRunExecutionData,
INodeExecutionData,
ITaskDataConnections,
INode,
IWorkflowExecuteAdditionalData,
WorkflowExecuteMode,
INodeParameters,
IExecuteData,
IDataObject,
Result,
} from 'n8n-workflow';
import { createEnvProviderState } from 'n8n-workflow';
export const createAgentStartJob = (
additionalData: IWorkflowExecuteAdditionalData,
inputData: ITaskDataConnections,
node: INode,
workflow: Workflow,
runExecutionData: IRunExecutionData,
runIndex: number,
activeNodeName: string,
connectionInputData: INodeExecutionData[],
siblingParameters: INodeParameters,
mode: WorkflowExecuteMode,
executeData?: IExecuteData,
defaultReturnRunIndex?: number,
selfData?: IDataObject,
contextNodeName?: string,
): IExecuteFunctions['startJob'] => {
return async function startJob<T = unknown, E = unknown>(
this: IExecuteFunctions,
jobType: string,
settings: unknown,
itemIndex: number,
): Promise<Result<T, E>> {
return await additionalData.startAgentJob<T, E>(
additionalData,
jobType,
settings,
this,
inputData,
node,
workflow,
runExecutionData,
runIndex,
itemIndex,
activeNodeName,
connectionInputData,
siblingParameters,
mode,
createEnvProviderState(),
executeData,
defaultReturnRunIndex,
selfData,
contextNodeName,
);
};
};

View file

@ -54,9 +54,7 @@ import type {
IPollFunctions,
IRequestOptions,
IRunExecutionData,
ITaskData,
ITaskDataConnections,
ITaskMetadata,
ITriggerFunctions,
IWebhookData,
IWebhookDescription,
@ -2021,113 +2019,6 @@ export function getWebhookDescription(
return undefined;
}
// TODO: Change options to an object
export const addExecutionDataFunctions = async (
type: 'input' | 'output',
nodeName: string,
data: INodeExecutionData[][] | ExecutionBaseError,
runExecutionData: IRunExecutionData,
connectionType: NodeConnectionType,
additionalData: IWorkflowExecuteAdditionalData,
sourceNodeName: string,
sourceNodeRunIndex: number,
currentNodeRunIndex: number,
metadata?: ITaskMetadata,
): Promise<void> => {
if (connectionType === NodeConnectionType.Main) {
throw new ApplicationError('Setting type is not supported for main connection', {
extra: { type },
});
}
let taskData: ITaskData | undefined;
if (type === 'input') {
taskData = {
startTime: new Date().getTime(),
executionTime: 0,
executionStatus: 'running',
source: [null],
};
} else {
// At the moment we expect that there is always an input sent before the output
taskData = get(
runExecutionData,
['resultData', 'runData', nodeName, currentNodeRunIndex],
undefined,
);
if (taskData === undefined) {
return;
}
taskData.metadata = metadata;
}
taskData = taskData!;
if (data instanceof Error) {
taskData.executionStatus = 'error';
taskData.error = data;
} else {
if (type === 'output') {
taskData.executionStatus = 'success';
}
taskData.data = {
[connectionType]: data,
} as ITaskDataConnections;
}
if (type === 'input') {
if (!(data instanceof Error)) {
taskData.inputOverride = {
[connectionType]: data,
} as ITaskDataConnections;
}
if (!runExecutionData.resultData.runData.hasOwnProperty(nodeName)) {
runExecutionData.resultData.runData[nodeName] = [];
}
runExecutionData.resultData.runData[nodeName][currentNodeRunIndex] = taskData;
if (additionalData.sendDataToUI) {
additionalData.sendDataToUI('nodeExecuteBefore', {
executionId: additionalData.executionId,
nodeName,
});
}
} else {
// Outputs
taskData.executionTime = new Date().getTime() - taskData.startTime;
if (additionalData.sendDataToUI) {
additionalData.sendDataToUI('nodeExecuteAfter', {
executionId: additionalData.executionId,
nodeName,
data: taskData,
});
}
if (get(runExecutionData, 'executionData.metadata', undefined) === undefined) {
runExecutionData.executionData!.metadata = {};
}
let sourceTaskData = get(runExecutionData, ['executionData', 'metadata', sourceNodeName]);
if (!sourceTaskData) {
runExecutionData.executionData!.metadata[sourceNodeName] = [];
sourceTaskData = runExecutionData.executionData!.metadata[sourceNodeName];
}
if (!sourceTaskData[sourceNodeRunIndex]) {
sourceTaskData[sourceNodeRunIndex] = {
subRun: [],
};
}
sourceTaskData[sourceNodeRunIndex]!.subRun!.push({
node: nodeName,
runIndex: currentNodeRunIndex,
});
}
};
export async function getInputConnectionData(
this: IAllExecuteFunctions,
workflow: Workflow,
@ -2139,7 +2030,7 @@ export async function getInputConnectionData(
executeData: IExecuteData,
mode: WorkflowExecuteMode,
closeFunctions: CloseFunction[],
inputName: NodeConnectionType,
connectionType: NodeConnectionType,
itemIndex: number,
abortSignal?: AbortSignal,
): Promise<unknown> {
@ -2150,14 +2041,14 @@ export async function getInputConnectionData(
let inputConfiguration = inputs.find((input) => {
if (typeof input === 'string') {
return input === inputName;
return input === connectionType;
}
return input.type === inputName;
return input.type === connectionType;
});
if (inputConfiguration === undefined) {
throw new ApplicationError('Node does not have input of type', {
extra: { nodeName: node.name, inputName },
extra: { nodeName: node.name, connectionType },
});
}
@ -2167,114 +2058,103 @@ export async function getInputConnectionData(
} as INodeInputConfiguration;
}
const parentNodes = workflow.getParentNodes(node.name, inputName, 1);
if (parentNodes.length === 0) {
const connectedNodes = workflow
.getParentNodes(node.name, connectionType, 1)
.map((nodeName) => workflow.getNode(nodeName) as INode)
.filter((connectedNode) => connectedNode.disabled !== true);
if (connectedNodes.length === 0) {
if (inputConfiguration.required) {
throw new NodeOperationError(
node,
`A ${inputConfiguration?.displayName ?? inputName} sub-node must be connected`,
`A ${inputConfiguration?.displayName ?? connectionType} sub-node must be connected and enabled`,
);
}
return inputConfiguration.maxConnections === 1 ? undefined : [];
}
const constParentNodes = parentNodes
.map((nodeName) => {
return workflow.getNode(nodeName) as INode;
})
.filter((connectedNode) => connectedNode.disabled !== true)
.map(async (connectedNode) => {
const nodeType = workflow.nodeTypes.getByNameAndVersion(
connectedNode.type,
connectedNode.typeVersion,
);
const context = new SupplyDataContext(
workflow,
connectedNode,
additionalData,
mode,
runExecutionData,
runIndex,
connectionInputData,
inputData,
executeData,
closeFunctions,
abortSignal,
);
if (!nodeType.supplyData) {
if (nodeType.description.outputs.includes(NodeConnectionType.AiTool)) {
nodeType.supplyData = async function (this: ISupplyDataFunctions) {
return createNodeAsTool(this, nodeType, this.getNode().parameters);
};
} else {
throw new ApplicationError('Node does not have a `supplyData` method defined', {
extra: { nodeName: connectedNode.name },
});
}
}
try {
const response = await nodeType.supplyData.call(context, itemIndex);
if (response.closeFunction) {
closeFunctions.push(response.closeFunction);
}
return response;
} catch (error) {
// Propagate errors from sub-nodes
if (error.functionality === 'configuration-node') throw error;
if (!(error instanceof ExecutionBaseError)) {
error = new NodeOperationError(connectedNode, error, {
itemIndex,
});
}
let currentNodeRunIndex = 0;
if (runExecutionData.resultData.runData.hasOwnProperty(node.name)) {
currentNodeRunIndex = runExecutionData.resultData.runData[node.name].length;
}
// Display the error on the node which is causing it
await addExecutionDataFunctions(
'input',
connectedNode.name,
error,
runExecutionData,
inputName,
additionalData,
node.name,
runIndex,
currentNodeRunIndex,
);
// Display on the calling node which node has the error
throw new NodeOperationError(connectedNode, `Error in sub-node ${connectedNode.name}`, {
itemIndex,
functionality: 'configuration-node',
description: error.message,
});
}
});
// Validate the inputs
const nodes = await Promise.all(constParentNodes);
if (inputConfiguration.required && nodes.length === 0) {
throw new NodeOperationError(
node,
`A ${inputConfiguration?.displayName ?? inputName} sub-node must be connected`,
);
}
if (
inputConfiguration.maxConnections !== undefined &&
nodes.length > inputConfiguration.maxConnections
connectedNodes.length > inputConfiguration.maxConnections
) {
throw new NodeOperationError(
node,
`Only ${inputConfiguration.maxConnections} ${inputName} sub-nodes are/is allowed to be connected`,
`Only ${inputConfiguration.maxConnections} ${connectionType} sub-nodes are/is allowed to be connected`,
);
}
const constParentNodes = connectedNodes.map(async (connectedNode) => {
const nodeType = workflow.nodeTypes.getByNameAndVersion(
connectedNode.type,
connectedNode.typeVersion,
);
const context = new SupplyDataContext(
workflow,
connectedNode,
additionalData,
mode,
runExecutionData,
runIndex,
connectionInputData,
inputData,
executeData,
closeFunctions,
abortSignal,
);
if (!nodeType.supplyData) {
if (nodeType.description.outputs.includes(NodeConnectionType.AiTool)) {
nodeType.supplyData = async function (this: ISupplyDataFunctions) {
return createNodeAsTool(this, nodeType, this.getNode().parameters);
};
} else {
throw new ApplicationError('Node does not have a `supplyData` method defined', {
extra: { nodeName: connectedNode.name },
});
}
}
try {
const response = await nodeType.supplyData.call(context, itemIndex);
if (response.closeFunction) {
closeFunctions.push(response.closeFunction);
}
return response;
} catch (error) {
// Propagate errors from sub-nodes
if (error.functionality === 'configuration-node') throw error;
if (!(error instanceof ExecutionBaseError)) {
error = new NodeOperationError(connectedNode, error, {
itemIndex,
});
}
let currentNodeRunIndex = 0;
if (runExecutionData.resultData.runData.hasOwnProperty(node.name)) {
currentNodeRunIndex = runExecutionData.resultData.runData[node.name].length;
}
// Display the error on the node which is causing it
await context.addExecutionDataFunctions(
'input',
error,
connectionType,
node.name,
currentNodeRunIndex,
);
// Display on the calling node which node has the error
throw new NodeOperationError(connectedNode, `Error in sub-node ${connectedNode.name}`, {
itemIndex,
functionality: 'configuration-node',
description: error.message,
});
}
});
// Validate the inputs
const nodes = await Promise.all(constParentNodes);
return inputConfiguration.maxConnections === 1
? (nodes || [])[0]?.response
: nodes.map((node) => node.response);

View file

@ -1018,8 +1018,8 @@ export class WorkflowExecute {
// Update the pairedItem information on items
const newTaskDataConnections: ITaskDataConnections = {};
for (const inputName of Object.keys(executionData.data)) {
newTaskDataConnections[inputName] = executionData.data[inputName].map(
for (const connectionType of Object.keys(executionData.data)) {
newTaskDataConnections[connectionType] = executionData.data[connectionType].map(
(input, inputIndex) => {
if (input === null) {
return input;

View file

@ -14,7 +14,7 @@ import type {
INodeTypes,
ICredentialDataDecryptedObject,
} from 'n8n-workflow';
import { ApplicationError, ExpressionError } from 'n8n-workflow';
import { ApplicationError, ExpressionError, NodeConnectionType } from 'n8n-workflow';
import { describeCommonTests } from './shared-tests';
import { ExecuteContext } from '../execute-context';
@ -92,33 +92,39 @@ describe('ExecuteContext', () => {
describe('getInputData', () => {
const inputIndex = 0;
const inputName = 'main';
const connectionType = NodeConnectionType.Main;
afterEach(() => {
inputData[inputName] = [[{ json: { test: 'data' } }]];
inputData[connectionType] = [[{ json: { test: 'data' } }]];
});
it('should return the input data correctly', () => {
const expectedData = [{ json: { test: 'data' } }];
expect(executeContext.getInputData(inputIndex, inputName)).toEqual(expectedData);
expect(executeContext.getInputData(inputIndex, connectionType)).toEqual(expectedData);
});
it('should return an empty array if the input name does not exist', () => {
const inputName = 'nonExistent';
expect(executeContext.getInputData(inputIndex, inputName)).toEqual([]);
const connectionType = 'nonExistent';
expect(executeContext.getInputData(inputIndex, connectionType as NodeConnectionType)).toEqual(
[],
);
});
it('should throw an error if the input index is out of range', () => {
const inputIndex = 2;
expect(() => executeContext.getInputData(inputIndex, inputName)).toThrow(ApplicationError);
expect(() => executeContext.getInputData(inputIndex, connectionType)).toThrow(
ApplicationError,
);
});
it('should throw an error if the input index was not set', () => {
inputData.main[inputIndex] = null;
expect(() => executeContext.getInputData(inputIndex, inputName)).toThrow(ApplicationError);
expect(() => executeContext.getInputData(inputIndex, connectionType)).toThrow(
ApplicationError,
);
});
});

View file

@ -14,7 +14,7 @@ import type {
INodeTypes,
ICredentialDataDecryptedObject,
} from 'n8n-workflow';
import { ApplicationError } from 'n8n-workflow';
import { ApplicationError, NodeConnectionType } from 'n8n-workflow';
import { describeCommonTests } from './shared-tests';
import { ExecuteSingleContext } from '../execute-single-context';
@ -91,29 +91,31 @@ describe('ExecuteSingleContext', () => {
describe('getInputData', () => {
const inputIndex = 0;
const inputName = 'main';
const connectionType = NodeConnectionType.Main;
afterEach(() => {
inputData[inputName] = [[{ json: { test: 'data' } }]];
inputData[connectionType] = [[{ json: { test: 'data' } }]];
});
it('should return the input data correctly', () => {
const expectedData = { json: { test: 'data' } };
expect(executeSingleContext.getInputData(inputIndex, inputName)).toEqual(expectedData);
expect(executeSingleContext.getInputData(inputIndex, connectionType)).toEqual(expectedData);
});
it('should return an empty object if the input name does not exist', () => {
const inputName = 'nonExistent';
const connectionType = 'nonExistent';
const expectedData = { json: {} };
expect(executeSingleContext.getInputData(inputIndex, inputName)).toEqual(expectedData);
expect(
executeSingleContext.getInputData(inputIndex, connectionType as NodeConnectionType),
).toEqual(expectedData);
});
it('should throw an error if the input index is out of range', () => {
const inputIndex = 1;
expect(() => executeSingleContext.getInputData(inputIndex, inputName)).toThrow(
expect(() => executeSingleContext.getInputData(inputIndex, connectionType)).toThrow(
ApplicationError,
);
});
@ -121,7 +123,7 @@ describe('ExecuteSingleContext', () => {
it('should throw an error if the input index was not set', () => {
inputData.main[inputIndex] = null;
expect(() => executeSingleContext.getInputData(inputIndex, inputName)).toThrow(
expect(() => executeSingleContext.getInputData(inputIndex, connectionType)).toThrow(
ApplicationError,
);
});
@ -129,7 +131,7 @@ describe('ExecuteSingleContext', () => {
it('should throw an error if the value of input with given index was not set', () => {
delete inputData.main[inputIndex]![itemIndex];
expect(() => executeSingleContext.getInputData(inputIndex, inputName)).toThrow(
expect(() => executeSingleContext.getInputData(inputIndex, connectionType)).toThrow(
ApplicationError,
);
});

View file

@ -14,7 +14,7 @@ import type {
INodeTypes,
ICredentialDataDecryptedObject,
} from 'n8n-workflow';
import { ApplicationError } from 'n8n-workflow';
import { ApplicationError, NodeConnectionType } from 'n8n-workflow';
import { describeCommonTests } from './shared-tests';
import { SupplyDataContext } from '../supply-data-context';
@ -56,7 +56,8 @@ describe('SupplyDataContext', () => {
const mode: WorkflowExecuteMode = 'manual';
const runExecutionData = mock<IRunExecutionData>();
const connectionInputData: INodeExecutionData[] = [];
const inputData: ITaskDataConnections = { main: [[{ json: { test: 'data' } }]] };
const connectionType = NodeConnectionType.Main;
const inputData: ITaskDataConnections = { [connectionType]: [[{ json: { test: 'data' } }]] };
const executeData = mock<IExecuteData>();
const runIndex = 0;
const closeFn = jest.fn();
@ -91,33 +92,38 @@ describe('SupplyDataContext', () => {
describe('getInputData', () => {
const inputIndex = 0;
const inputName = 'main';
afterEach(() => {
inputData[inputName] = [[{ json: { test: 'data' } }]];
inputData[connectionType] = [[{ json: { test: 'data' } }]];
});
it('should return the input data correctly', () => {
const expectedData = [{ json: { test: 'data' } }];
expect(supplyDataContext.getInputData(inputIndex, inputName)).toEqual(expectedData);
expect(supplyDataContext.getInputData(inputIndex, connectionType)).toEqual(expectedData);
});
it('should return an empty array if the input name does not exist', () => {
const inputName = 'nonExistent';
expect(supplyDataContext.getInputData(inputIndex, inputName)).toEqual([]);
const connectionType = 'nonExistent';
expect(
supplyDataContext.getInputData(inputIndex, connectionType as NodeConnectionType),
).toEqual([]);
});
it('should throw an error if the input index is out of range', () => {
const inputIndex = 2;
expect(() => supplyDataContext.getInputData(inputIndex, inputName)).toThrow(ApplicationError);
expect(() => supplyDataContext.getInputData(inputIndex, connectionType)).toThrow(
ApplicationError,
);
});
it('should throw an error if the input index was not set', () => {
inputData.main[inputIndex] = null;
expect(() => supplyDataContext.getInputData(inputIndex, inputName)).toThrow(ApplicationError);
expect(() => supplyDataContext.getInputData(inputIndex, connectionType)).toThrow(
ApplicationError,
);
});
});

View file

@ -21,6 +21,7 @@ import type {
IWorkflowDataProxyData,
ISourceData,
AiEvent,
NodeConnectionType,
} from 'n8n-workflow';
import { ApplicationError, NodeHelpers, WAIT_INDEFINITELY, WorkflowDataProxy } from 'n8n-workflow';
import { Container } from 'typedi';
@ -137,6 +138,24 @@ export class BaseExecuteContext extends NodeExecutionContext {
return { ...result, data };
}
protected getInputItems(inputIndex: number, connectionType: NodeConnectionType) {
const inputData = this.inputData[connectionType];
if (inputData.length < inputIndex) {
throw new ApplicationError('Could not get input with given index', {
extra: { inputIndex, connectionType },
});
}
const allItems = inputData[inputIndex] as INodeExecutionData[] | null | undefined;
if (allItems === null) {
throw new ApplicationError('Input index was not set', {
extra: { inputIndex, connectionType },
});
}
return allItems;
}
getNodeInputs(): INodeInputConfiguration[] {
const nodeType = this.workflow.nodeTypes.getByNameAndVersion(
this.node.type,
@ -157,12 +176,12 @@ export class BaseExecuteContext extends NodeExecutionContext {
);
}
getInputSourceData(inputIndex = 0, inputName = 'main'): ISourceData {
getInputSourceData(inputIndex = 0, connectionType = 'main'): ISourceData {
if (this.executeData?.source === null) {
// Should never happen as n8n sets it automatically
throw new ApplicationError('Source data is missing');
}
return this.executeData.source[inputName][inputIndex]!;
return this.executeData.source[connectionType][inputIndex]!;
}
getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData {

View file

@ -1,7 +1,6 @@
import type {
CallbackManager,
CloseFunction,
ExecutionBaseError,
IExecuteData,
IExecuteFunctions,
IExecuteResponsePromiseData,
@ -10,15 +9,18 @@ import type {
INodeExecutionData,
IRunExecutionData,
ITaskDataConnections,
ITaskMetadata,
IWorkflowExecuteAdditionalData,
NodeConnectionType,
Result,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
import {
ApplicationError,
createDeferredPromise,
createEnvProviderState,
NodeConnectionType,
} from 'n8n-workflow';
import { createAgentStartJob } from '@/Agent';
// eslint-disable-next-line import/no-cycle
import {
returnJsonArray,
@ -26,7 +28,6 @@ import {
normalizeItems,
constructExecutionMetaData,
getInputConnectionData,
addExecutionDataFunctions,
assertBinaryData,
getBinaryDataBuffer,
copyBinaryFile,
@ -46,8 +47,6 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
readonly getNodeParameter: IExecuteFunctions['getNodeParameter'];
readonly startJob: IExecuteFunctions['startJob'];
constructor(
workflow: Workflow,
node: INode,
@ -122,23 +121,37 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
fallbackValue,
options,
)) as IExecuteFunctions['getNodeParameter'];
}
this.startJob = createAgentStartJob(
async startJob<T = unknown, E = unknown>(
jobType: string,
settings: unknown,
itemIndex: number,
): Promise<Result<T, E>> {
return await this.additionalData.startAgentJob<T, E>(
this.additionalData,
jobType,
settings,
this,
this.inputData,
this.node,
this.workflow,
this.runExecutionData,
this.runIndex,
itemIndex,
this.node.name,
this.connectionInputData,
{},
this.mode,
createEnvProviderState(),
this.executeData,
);
}
async getInputConnectionData(inputName: NodeConnectionType, itemIndex: number): Promise<unknown> {
async getInputConnectionData(
connectionType: NodeConnectionType,
itemIndex: number,
): Promise<unknown> {
return await getInputConnectionData.call(
this,
this.workflow,
@ -150,33 +163,18 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
this.executeData,
this.mode,
this.closeFunctions,
inputName,
connectionType,
itemIndex,
this.abortSignal,
);
}
getInputData(inputIndex = 0, inputName = 'main') {
if (!this.inputData.hasOwnProperty(inputName)) {
getInputData(inputIndex = 0, connectionType = NodeConnectionType.Main) {
if (!this.inputData.hasOwnProperty(connectionType)) {
// Return empty array because else it would throw error when nothing is connected to input
return [];
}
const inputData = this.inputData[inputName];
// TODO: Check if nodeType has input with that index defined
if (inputData.length < inputIndex) {
throw new ApplicationError('Could not get input with given index', {
extra: { inputIndex, inputName },
});
}
if (inputData[inputIndex] === null) {
throw new ApplicationError('Value of input was not set', {
extra: { inputIndex, inputName },
});
}
return inputData[inputIndex];
return super.getInputItems(inputIndex, connectionType) ?? [];
}
logNodeOutput(...args: unknown[]): void {
@ -194,60 +192,14 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
await this.additionalData.hooks?.executeHookFunctions('sendResponse', [response]);
}
addInputData(
connectionType: NodeConnectionType,
data: INodeExecutionData[][] | ExecutionBaseError,
): { index: number } {
const nodeName = this.node.name;
let currentNodeRunIndex = 0;
if (this.runExecutionData.resultData.runData.hasOwnProperty(nodeName)) {
currentNodeRunIndex = this.runExecutionData.resultData.runData[nodeName].length;
}
void addExecutionDataFunctions(
'input',
nodeName,
data,
this.runExecutionData,
connectionType,
this.additionalData,
nodeName,
this.runIndex,
currentNodeRunIndex,
).catch((error) => {
this.logger.warn(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem logging input data of node "${nodeName}": ${error.message}`,
);
});
return { index: currentNodeRunIndex };
/** @deprecated use ISupplyDataFunctions.addInputData */
addInputData(): { index: number } {
throw new ApplicationError('addInputData should not be called on IExecuteFunctions');
}
addOutputData(
connectionType: NodeConnectionType,
currentNodeRunIndex: number,
data: INodeExecutionData[][] | ExecutionBaseError,
metadata?: ITaskMetadata,
): void {
const nodeName = this.node.name;
addExecutionDataFunctions(
'output',
nodeName,
data,
this.runExecutionData,
connectionType,
this.additionalData,
nodeName,
this.runIndex,
currentNodeRunIndex,
metadata,
).catch((error) => {
this.logger.warn(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
`There was a problem logging output data of node "${nodeName}": ${error.message}`,
);
});
/** @deprecated use ISupplyDataFunctions.addOutputData */
addOutputData(): void {
throw new ApplicationError('addOutputData should not be called on IExecuteFunctions');
}
getParentCallbackManager(): CallbackManager | undefined {

View file

@ -11,7 +11,7 @@ import type {
ITaskDataConnections,
IExecuteData,
} from 'n8n-workflow';
import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
import { ApplicationError, createDeferredPromise, NodeConnectionType } from 'n8n-workflow';
// eslint-disable-next-line import/no-cycle
import {
@ -76,31 +76,18 @@ export class ExecuteSingleContext extends BaseExecuteContext implements IExecute
return super.evaluateExpression(expression, itemIndex);
}
getInputData(inputIndex = 0, inputName = 'main') {
if (!this.inputData.hasOwnProperty(inputName)) {
getInputData(inputIndex = 0, connectionType = NodeConnectionType.Main) {
if (!this.inputData.hasOwnProperty(connectionType)) {
// Return empty array because else it would throw error when nothing is connected to input
return { json: {} };
}
// TODO: Check if nodeType has input with that index defined
if (this.inputData[inputName].length < inputIndex) {
throw new ApplicationError('Could not get input index', {
extra: { inputIndex, inputName },
});
}
const allItems = super.getInputItems(inputIndex, connectionType);
const allItems = this.inputData[inputName][inputIndex];
if (allItems === null || allItems === undefined) {
throw new ApplicationError('Input index was not set', {
extra: { inputIndex, inputName },
});
}
const data = allItems[this.itemIndex];
if (data === null || data === undefined) {
const data = allItems?.[this.itemIndex];
if (data === undefined) {
throw new ApplicationError('Value of input with given index was not set', {
extra: { inputIndex, inputName, itemIndex: this.itemIndex },
extra: { inputIndex, connectionType, itemIndex: this.itemIndex },
});
}

View file

@ -1,19 +1,21 @@
import get from 'lodash/get';
import type {
CloseFunction,
ExecutionBaseError,
IExecuteData,
IGetNodeParameterOptions,
INode,
INodeExecutionData,
IRunExecutionData,
ISupplyDataFunctions,
ITaskData,
ITaskDataConnections,
ITaskMetadata,
IWorkflowExecuteAdditionalData,
NodeConnectionType,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
import { ApplicationError, NodeConnectionType, createDeferredPromise } from 'n8n-workflow';
// eslint-disable-next-line import/no-cycle
import {
@ -29,7 +31,6 @@ import {
normalizeItems,
returnJsonArray,
getInputConnectionData,
addExecutionDataFunctions,
} from '@/NodeExecuteFunctions';
import { BaseExecuteContext } from './base-execute-context';
@ -104,7 +105,10 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
)) as ISupplyDataFunctions['getNodeParameter'];
}
async getInputConnectionData(inputName: NodeConnectionType, itemIndex: number): Promise<unknown> {
async getInputConnectionData(
connectionType: NodeConnectionType,
itemIndex: number,
): Promise<unknown> {
return await getInputConnectionData.call(
this,
this.workflow,
@ -116,34 +120,21 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
this.executeData,
this.mode,
this.closeFunctions,
inputName,
connectionType,
itemIndex,
this.abortSignal,
);
}
getInputData(inputIndex = 0, inputName = 'main') {
if (!this.inputData.hasOwnProperty(inputName)) {
getInputData(inputIndex = 0, connectionType = NodeConnectionType.Main) {
if (!this.inputData.hasOwnProperty(connectionType)) {
// Return empty array because else it would throw error when nothing is connected to input
return [];
}
// TODO: Check if nodeType has input with that index defined
if (this.inputData[inputName].length < inputIndex) {
throw new ApplicationError('Could not get input with given index', {
extra: { inputIndex, inputName },
});
}
if (this.inputData[inputName][inputIndex] === null) {
throw new ApplicationError('Value of input was not set', {
extra: { inputIndex, inputName },
});
}
return this.inputData[inputName][inputIndex];
return super.getInputItems(inputIndex, connectionType) ?? [];
}
/** @deprecated create a context object with inputData for every runIndex */
addInputData(
connectionType: NodeConnectionType,
data: INodeExecutionData[][],
@ -154,15 +145,11 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
currentNodeRunIndex = this.runExecutionData.resultData.runData[nodeName].length;
}
addExecutionDataFunctions(
this.addExecutionDataFunctions(
'input',
nodeName,
data,
this.runExecutionData,
connectionType,
this.additionalData,
nodeName,
this.runIndex,
currentNodeRunIndex,
).catch((error) => {
this.logger.warn(
@ -176,6 +163,7 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
return { index: currentNodeRunIndex };
}
/** @deprecated Switch to WorkflowExecute to store output on runExecutionData.resultData.runData */
addOutputData(
connectionType: NodeConnectionType,
currentNodeRunIndex: number,
@ -183,15 +171,11 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
metadata?: ITaskMetadata,
): void {
const nodeName = this.node.name;
addExecutionDataFunctions(
this.addExecutionDataFunctions(
'output',
nodeName,
data,
this.runExecutionData,
connectionType,
this.additionalData,
nodeName,
this.runIndex,
currentNodeRunIndex,
metadata,
).catch((error) => {
@ -203,4 +187,115 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
);
});
}
async addExecutionDataFunctions(
type: 'input' | 'output',
data: INodeExecutionData[][] | ExecutionBaseError,
connectionType: NodeConnectionType,
sourceNodeName: string,
currentNodeRunIndex: number,
metadata?: ITaskMetadata,
): Promise<void> {
if (connectionType === NodeConnectionType.Main) {
throw new ApplicationError('Setting type is not supported for main connection', {
extra: { type },
});
}
const {
additionalData,
runExecutionData,
runIndex: sourceNodeRunIndex,
node: { name: nodeName },
} = this;
let taskData: ITaskData | undefined;
if (type === 'input') {
taskData = {
startTime: new Date().getTime(),
executionTime: 0,
executionStatus: 'running',
source: [null],
};
} else {
// At the moment we expect that there is always an input sent before the output
taskData = get(
runExecutionData,
['resultData', 'runData', nodeName, currentNodeRunIndex],
undefined,
);
if (taskData === undefined) {
return;
}
taskData.metadata = metadata;
}
taskData = taskData!;
if (data instanceof Error) {
taskData.executionStatus = 'error';
taskData.error = data;
} else {
if (type === 'output') {
taskData.executionStatus = 'success';
}
taskData.data = {
[connectionType]: data,
} as ITaskDataConnections;
}
if (type === 'input') {
if (!(data instanceof Error)) {
this.inputData[connectionType] = data;
// TODO: remove inputOverride
taskData.inputOverride = {
[connectionType]: data,
} as ITaskDataConnections;
}
if (!runExecutionData.resultData.runData.hasOwnProperty(nodeName)) {
runExecutionData.resultData.runData[nodeName] = [];
}
runExecutionData.resultData.runData[nodeName][currentNodeRunIndex] = taskData;
if (additionalData.sendDataToUI) {
additionalData.sendDataToUI('nodeExecuteBefore', {
executionId: additionalData.executionId,
nodeName,
});
}
} else {
// Outputs
taskData.executionTime = new Date().getTime() - taskData.startTime;
if (additionalData.sendDataToUI) {
additionalData.sendDataToUI('nodeExecuteAfter', {
executionId: additionalData.executionId,
nodeName,
data: taskData,
});
}
if (get(runExecutionData, 'executionData.metadata', undefined) === undefined) {
runExecutionData.executionData!.metadata = {};
}
let sourceTaskData = runExecutionData.executionData?.metadata?.[sourceNodeName];
if (!sourceTaskData) {
runExecutionData.executionData!.metadata[sourceNodeName] = [];
sourceTaskData = runExecutionData.executionData!.metadata[sourceNodeName];
}
if (!sourceTaskData[sourceNodeRunIndex]) {
sourceTaskData[sourceNodeRunIndex] = {
subRun: [],
};
}
sourceTaskData[sourceNodeRunIndex].subRun!.push({
node: nodeName,
runIndex: currentNodeRunIndex,
});
}
}
}

View file

@ -138,7 +138,10 @@ export class WebhookContext extends NodeExecutionContext implements IWebhookFunc
return this.webhookData.webhookDescription.name;
}
async getInputConnectionData(inputName: NodeConnectionType, itemIndex: number): Promise<unknown> {
async getInputConnectionData(
connectionType: NodeConnectionType,
itemIndex: number,
): Promise<unknown> {
// To be able to use expressions like "$json.sessionId" set the
// body data the webhook received to what is normally used for
// incoming node data.
@ -170,7 +173,7 @@ export class WebhookContext extends NodeExecutionContext implements IWebhookFunc
executeData,
this.mode,
this.closeFunctions,
inputName,
connectionType,
itemIndex,
);
}

View file

@ -943,7 +943,7 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & {
getContext(type: ContextType): IContextObject;
getExecuteData(): IExecuteData;
getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData;
getInputSourceData(inputIndex?: number, inputName?: string): ISourceData;
getInputSourceData(inputIndex?: number, connectionType?: NodeConnectionType): ISourceData;
getExecutionCancelSignal(): AbortSignal | undefined;
onExecutionCancellation(handler: () => unknown): void;
logAiEvent(eventName: AiEvent, msg?: string | undefined): void;
@ -962,11 +962,11 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
},
): Promise<ExecuteWorkflowData>;
getInputConnectionData(
inputName: NodeConnectionType,
connectionType: NodeConnectionType,
itemIndex: number,
inputIndex?: number,
): Promise<unknown>;
getInputData(inputIndex?: number, inputName?: string): INodeExecutionData[];
getInputData(inputIndex?: number, connectionType?: NodeConnectionType): INodeExecutionData[];
getNodeInputs(): INodeInputConfiguration[];
getNodeOutputs(): INodeOutputConfiguration[];
putExecutionToWait(waitTill: Date): Promise<void>;
@ -1013,7 +1013,7 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
};
export interface IExecuteSingleFunctions extends BaseExecutionFunctions {
getInputData(inputIndex?: number, inputName?: string): INodeExecutionData;
getInputData(inputIndex?: number, connectionType?: NodeConnectionType): INodeExecutionData;
getItemIndex(): number;
getNodeParameter(
parameterName: string,
@ -1127,7 +1127,7 @@ export interface IWebhookFunctions extends FunctionsBaseWithRequiredKeys<'getMod
getBodyData(): IDataObject;
getHeaderData(): IncomingHttpHeaders;
getInputConnectionData(
inputName: NodeConnectionType,
connectionType: NodeConnectionType,
itemIndex: number,
inputIndex?: number,
): Promise<unknown>;
@ -2372,9 +2372,6 @@ export interface IWorkflowExecuteAdditionalData {
mode: WorkflowExecuteMode,
envProviderState: EnvProviderState,
executeData?: IExecuteData,
defaultReturnRunIndex?: number,
selfData?: IDataObject,
contextNodeName?: string,
): Promise<Result<T, E>>;
}

View file

@ -1357,8 +1357,8 @@ export class Workflow {
if (node.executeOnce === true) {
// If node should be executed only once so use only the first input item
const newInputData: ITaskDataConnections = {};
for (const inputName of Object.keys(inputData)) {
newInputData[inputName] = inputData[inputName].map((input) => {
for (const connectionType of Object.keys(inputData)) {
newInputData[connectionType] = inputData[connectionType].map((input) => {
// eslint-disable-next-line @typescript-eslint/prefer-optional-chain
return input && input.slice(0, 1);
});