fix(Call n8n Workflow Tool Node): Support concurrent invocations of the tool (#13526)

Co-authored-by: Oleg Ivaniv <me@olegivaniv.com>
This commit is contained in:
Charlie Kolb 2025-02-26 16:23:31 +01:00 committed by GitHub
parent 24a38b99a1
commit 5334661b76
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 171 additions and 86 deletions

View file

@ -392,13 +392,14 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
const returnData: INodeExecutionData[] = [];
const items = this.getInputData();
const outputParsers = await getOptionalOutputParsers(this);
const outputParser = outputParsers?.[0];
const tools = await getTools(this, outputParser);
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
try {
const model = await getChatModel(this);
const memory = await getOptionalMemory(this);
const outputParsers = await getOptionalOutputParsers(this);
const outputParser = outputParsers?.[0];
const tools = await getTools(this, outputParser);
const input = getPromptInputByType({
ctx: this,

View file

@ -5,7 +5,7 @@ import { OutputParserException } from '@langchain/core/output_parsers';
import type { MockProxy } from 'jest-mock-extended';
import { mock } from 'jest-mock-extended';
import { normalizeItems } from 'n8n-core';
import type { IExecuteFunctions, IWorkflowDataProxyData } from 'n8n-workflow';
import type { ISupplyDataFunctions, IWorkflowDataProxyData } from 'n8n-workflow';
import { ApplicationError, NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import type {
@ -18,13 +18,13 @@ import { NAIVE_FIX_PROMPT } from '../prompt';
describe('OutputParserAutofixing', () => {
let outputParser: OutputParserAutofixing;
let thisArg: MockProxy<IExecuteFunctions>;
let thisArg: MockProxy<ISupplyDataFunctions>;
let mockModel: MockProxy<BaseLanguageModel>;
let mockStructuredOutputParser: MockProxy<N8nStructuredOutputParser>;
beforeEach(() => {
outputParser = new OutputParserAutofixing();
thisArg = mock<IExecuteFunctions>({
thisArg = mock<ISupplyDataFunctions>({
helpers: { normalizeItems },
});
mockModel = mock<BaseLanguageModel>();

View file

@ -2,7 +2,7 @@ import { mock } from 'jest-mock-extended';
import { normalizeItems } from 'n8n-core';
import {
ApplicationError,
type IExecuteFunctions,
type ISupplyDataFunctions,
type IWorkflowDataProxyData,
} from 'n8n-workflow';
@ -12,7 +12,7 @@ import { OutputParserItemList } from '../OutputParserItemList.node';
describe('OutputParserItemList', () => {
let outputParser: OutputParserItemList;
const thisArg = mock<IExecuteFunctions>({
const thisArg = mock<ISupplyDataFunctions>({
helpers: { normalizeItems },
});
const workflowDataProxy = mock<IWorkflowDataProxyData>({ $input: mock() });

View file

@ -2,8 +2,8 @@ import { mock } from 'jest-mock-extended';
import { normalizeItems } from 'n8n-core';
import {
jsonParse,
type IExecuteFunctions,
type INode,
type ISupplyDataFunctions,
type IWorkflowDataProxyData,
} from 'n8n-workflow';
@ -13,7 +13,7 @@ import { OutputParserStructured } from '../OutputParserStructured.node';
describe('OutputParserStructured', () => {
let outputParser: OutputParserStructured;
const thisArg = mock<IExecuteFunctions>({
const thisArg = mock<ISupplyDataFunctions>({
helpers: { normalizeItems },
});
const workflowDataProxy = mock<IWorkflowDataProxyData>({ $input: mock() });

View file

@ -1,5 +1,5 @@
import { mock } from 'jest-mock-extended';
import type { IExecuteFunctions, INode } from 'n8n-workflow';
import type { INode, ISupplyDataFunctions } from 'n8n-workflow';
import { jsonParse } from 'n8n-workflow';
import type { N8nTool } from '@utils/N8nTool';
@ -8,8 +8,8 @@ import { ToolHttpRequest } from '../ToolHttpRequest.node';
describe('ToolHttpRequest', () => {
const httpTool = new ToolHttpRequest();
const helpers = mock<IExecuteFunctions['helpers']>();
const executeFunctions = mock<IExecuteFunctions>({ helpers });
const helpers = mock<ISupplyDataFunctions['helpers']>();
const executeFunctions = mock<ISupplyDataFunctions>({ helpers });
beforeEach(() => {
jest.resetAllMocks();

View file

@ -11,12 +11,8 @@ import type {
import { WorkflowToolService } from './utils/WorkflowToolService';
type ISupplyDataFunctionsWithRunIndex = ISupplyDataFunctions & { runIndex: number };
// Mock ISupplyDataFunctions interface
function createMockContext(
overrides?: Partial<ISupplyDataFunctions>,
): ISupplyDataFunctionsWithRunIndex {
function createMockContext(overrides?: Partial<ISupplyDataFunctions>): ISupplyDataFunctions {
return {
runIndex: 0,
getNodeParameter: jest.fn(),
@ -33,6 +29,7 @@ function createMockContext(
getTimezone: jest.fn(),
getWorkflow: jest.fn(),
getWorkflowStaticData: jest.fn(),
cloneWith: jest.fn(),
logger: {
debug: jest.fn(),
error: jest.fn(),
@ -40,11 +37,11 @@ function createMockContext(
warn: jest.fn(),
},
...overrides,
} as ISupplyDataFunctionsWithRunIndex;
} as ISupplyDataFunctions;
}
describe('WorkflowTool::WorkflowToolService', () => {
let context: ISupplyDataFunctionsWithRunIndex;
let context: ISupplyDataFunctions;
let service: WorkflowToolService;
beforeEach(() => {
@ -92,13 +89,25 @@ describe('WorkflowTool::WorkflowToolService', () => {
$execution: { id: 'exec-id' },
$workflow: { id: 'workflow-id' },
} as unknown as IWorkflowDataProxyData);
jest.spyOn(context, 'cloneWith').mockReturnValue(context);
const tool = await service.createTool(toolParams);
const result = await tool.func('test query');
expect(result).toBe(JSON.stringify(TEST_RESPONSE, null, 2));
expect(context.addOutputData).toHaveBeenCalled();
expect(context.runIndex).toBe(1);
// Here we validate that the runIndex is correctly updated
expect(context.cloneWith).toHaveBeenCalledWith({
runIndex: 0,
inputData: [[{ json: { query: 'test query' } }]],
});
await tool.func('another query');
expect(context.cloneWith).toHaveBeenCalledWith({
runIndex: 1,
inputData: [[{ json: { query: 'another query' } }]],
});
});
it('should handle errors during tool execution', async () => {
@ -113,6 +122,7 @@ describe('WorkflowTool::WorkflowToolService', () => {
.mockRejectedValueOnce(new Error('Workflow execution failed'));
jest.spyOn(context, 'addInputData').mockReturnValue({ index: 0 });
jest.spyOn(context, 'getNodeParameter').mockReturnValue('database');
jest.spyOn(context, 'cloneWith').mockReturnValue(context);
const tool = await service.createTool(toolParams);
const result = await tool.func('test query');
@ -166,7 +176,12 @@ describe('WorkflowTool::WorkflowToolService', () => {
jest.spyOn(context, 'executeWorkflow').mockResolvedValueOnce(mockResponse);
const result = await service['executeSubWorkflow'](workflowInfo, items, workflowProxyMock);
const result = await service['executeSubWorkflow'](
context,
workflowInfo,
items,
workflowProxyMock,
);
expect(result.response).toBe(TEST_RESPONSE);
expect(result.subExecutionId).toBe('test-execution');
@ -175,7 +190,7 @@ describe('WorkflowTool::WorkflowToolService', () => {
it('should throw error when workflow execution fails', async () => {
jest.spyOn(context, 'executeWorkflow').mockRejectedValueOnce(new Error('Execution failed'));
await expect(service['executeSubWorkflow']({}, [], {} as never)).rejects.toThrow(
await expect(service['executeSubWorkflow'](context, {}, [], {} as never)).rejects.toThrow(
NodeOperationError,
);
});
@ -188,7 +203,7 @@ describe('WorkflowTool::WorkflowToolService', () => {
jest.spyOn(context, 'executeWorkflow').mockResolvedValueOnce(mockResponse);
await expect(service['executeSubWorkflow']({}, [], {} as never)).rejects.toThrow();
await expect(service['executeSubWorkflow'](context, {}, [], {} as never)).rejects.toThrow();
});
});
@ -202,7 +217,12 @@ describe('WorkflowTool::WorkflowToolService', () => {
jest.spyOn(context, 'getNodeParameter').mockReturnValueOnce({ value: 'workflow-id' });
const result = await service['getSubWorkflowInfo'](source, itemIndex, workflowProxyMock);
const result = await service['getSubWorkflowInfo'](
context,
source,
itemIndex,
workflowProxyMock,
);
expect(result.workflowInfo).toHaveProperty('id', 'workflow-id');
expect(result.subWorkflowId).toBe('workflow-id');
@ -218,7 +238,12 @@ describe('WorkflowTool::WorkflowToolService', () => {
jest.spyOn(context, 'getNodeParameter').mockReturnValueOnce(JSON.stringify(mockWorkflow));
const result = await service['getSubWorkflowInfo'](source, itemIndex, workflowProxyMock);
const result = await service['getSubWorkflowInfo'](
context,
source,
itemIndex,
workflowProxyMock,
);
expect(result.workflowInfo.code).toEqual(mockWorkflow);
expect(result.subWorkflowId).toBe('proxy-id');
@ -234,7 +259,7 @@ describe('WorkflowTool::WorkflowToolService', () => {
jest.spyOn(context, 'getNodeParameter').mockReturnValueOnce('invalid json');
await expect(
service['getSubWorkflowInfo'](source, itemIndex, workflowProxyMock),
service['getSubWorkflowInfo'](context, source, itemIndex, workflowProxyMock),
).rejects.toThrow(NodeOperationError);
});
});

View file

@ -43,8 +43,8 @@ export class WorkflowToolService {
// Sub-workflow execution id, will be set after the sub-workflow is executed
private subExecutionId: string | undefined;
constructor(private context: ISupplyDataFunctions) {
const subWorkflowInputs = this.context.getNode().parameters
constructor(private baseContext: ISupplyDataFunctions) {
const subWorkflowInputs = this.baseContext.getNode().parameters
.workflowInputs as ResourceMapperValue;
this.useSchema = (subWorkflowInputs?.schema ?? []).length > 0;
}
@ -59,18 +59,23 @@ export class WorkflowToolService {
description: string;
itemIndex: number;
}): Promise<DynamicTool | DynamicStructuredTool> {
let runIndex = 0;
// Handler for the tool execution, will be called when the tool is executed
// This function will execute the sub-workflow and return the response
const toolHandler = async (
query: string | IDataObject,
runManager?: CallbackManagerForToolRun,
): Promise<string> => {
const { index } = this.context.addInputData(NodeConnectionType.AiTool, [
[{ json: { query } }],
]);
const localRunIndex = runIndex++;
// We need to clone the context here to handle runIndex correctly
// Otherwise the runIndex will be shared between different executions
// Causing incorrect data to be passed to the sub-workflow and via $fromAI
const context = this.baseContext.cloneWith({
runIndex: localRunIndex,
inputData: [[{ json: { query } }]],
});
try {
const response = await this.runFunction(query, itemIndex, runManager);
const response = await this.runFunction(context, query, itemIndex, runManager);
const processedResponse = this.handleToolResponse(response);
// Once the sub-workflow is executed, add the output data to the context
@ -87,7 +92,12 @@ export class WorkflowToolService {
const json = jsonParse<IDataObject>(processedResponse, {
fallbackValue: { response: processedResponse },
});
void this.context.addOutputData(NodeConnectionType.AiTool, index, [[{ json }]], metadata);
void context.addOutputData(
NodeConnectionType.AiTool,
localRunIndex,
[[{ json }]],
metadata,
);
return processedResponse;
} catch (error) {
@ -95,11 +105,13 @@ export class WorkflowToolService {
const errorResponse = `There was an error: "${executionError.message}"`;
const metadata = parseErrorMetadata(error);
void this.context.addOutputData(NodeConnectionType.AiTool, index, executionError, metadata);
void context.addOutputData(
NodeConnectionType.AiTool,
localRunIndex,
executionError,
metadata,
);
return errorResponse;
} finally {
// @ts-expect-error this accesses a private member on the actual implementation to fix https://linear.app/n8n/issue/ADO-3186/bug-workflowtool-v2-always-uses-first-row-of-input-data
this.context.runIndex++;
}
};
@ -119,7 +131,7 @@ export class WorkflowToolService {
}
if (typeof response !== 'string') {
throw new NodeOperationError(this.context.getNode(), 'Wrong output type returned', {
throw new NodeOperationError(this.baseContext.getNode(), 'Wrong output type returned', {
description: `The response property should be a string, but it is an ${typeof response}`,
});
}
@ -131,6 +143,7 @@ export class WorkflowToolService {
* Executes specified sub-workflow with provided inputs
*/
private async executeSubWorkflow(
context: ISupplyDataFunctions,
workflowInfo: IExecuteWorkflowInfo,
items: INodeExecutionData[],
workflowProxy: IWorkflowDataProxyData,
@ -138,27 +151,22 @@ export class WorkflowToolService {
): Promise<{ response: string; subExecutionId: string }> {
let receivedData: ExecuteWorkflowData;
try {
receivedData = await this.context.executeWorkflow(
workflowInfo,
items,
runManager?.getChild(),
{
parentExecution: {
executionId: workflowProxy.$execution.id,
workflowId: workflowProxy.$workflow.id,
},
receivedData = await context.executeWorkflow(workflowInfo, items, runManager?.getChild(), {
parentExecution: {
executionId: workflowProxy.$execution.id,
workflowId: workflowProxy.$workflow.id,
},
);
});
// Set sub-workflow execution id so it can be used in other places
this.subExecutionId = receivedData.executionId;
} catch (error) {
throw new NodeOperationError(this.context.getNode(), error as Error);
throw new NodeOperationError(context.getNode(), error as Error);
}
const response: string | undefined = get(receivedData, 'data[0][0].json') as string | undefined;
if (response === undefined) {
throw new NodeOperationError(
this.context.getNode(),
context.getNode(),
'There was an error: "The workflow did not return a response"',
);
}
@ -171,20 +179,27 @@ export class WorkflowToolService {
* This function will be called as part of the tool execution (from the toolHandler)
*/
private async runFunction(
context: ISupplyDataFunctions,
query: string | IDataObject,
itemIndex: number,
runManager?: CallbackManagerForToolRun,
): Promise<string> {
const source = this.context.getNodeParameter('source', itemIndex) as string;
const workflowProxy = this.context.getWorkflowDataProxy(0);
const source = context.getNodeParameter('source', itemIndex) as string;
const workflowProxy = context.getWorkflowDataProxy(0);
const { workflowInfo } = await this.getSubWorkflowInfo(source, itemIndex, workflowProxy);
const rawData = this.prepareRawData(query, itemIndex);
const items = await this.prepareWorkflowItems(query, itemIndex, rawData);
const { workflowInfo } = await this.getSubWorkflowInfo(
context,
source,
itemIndex,
workflowProxy,
);
const rawData = this.prepareRawData(context, query, itemIndex);
const items = await this.prepareWorkflowItems(context, query, itemIndex, rawData);
this.subWorkflowId = workflowInfo.id;
const { response } = await this.executeSubWorkflow(
context,
workflowInfo,
items,
workflowProxy,
@ -197,6 +212,7 @@ export class WorkflowToolService {
* Gets the sub-workflow info based on the source (database or parameter)
*/
private async getSubWorkflowInfo(
context: ISupplyDataFunctions,
source: string,
itemIndex: number,
workflowProxy: IWorkflowDataProxyData,
@ -208,7 +224,7 @@ export class WorkflowToolService {
let subWorkflowId: string;
if (source === 'database') {
const { value } = this.context.getNodeParameter(
const { value } = context.getNodeParameter(
'workflowId',
itemIndex,
{},
@ -216,14 +232,14 @@ export class WorkflowToolService {
workflowInfo.id = value as string;
subWorkflowId = workflowInfo.id;
} else if (source === 'parameter') {
const workflowJson = this.context.getNodeParameter('workflowJson', itemIndex) as string;
const workflowJson = context.getNodeParameter('workflowJson', itemIndex) as string;
try {
workflowInfo.code = JSON.parse(workflowJson) as IWorkflowBase;
// subworkflow is same as parent workflow
subWorkflowId = workflowProxy.$workflow.id;
} catch (error) {
throw new NodeOperationError(
this.context.getNode(),
context.getNode(),
`The provided workflow is not valid JSON: "${(error as Error).message}"`,
{ itemIndex },
);
@ -233,9 +249,13 @@ export class WorkflowToolService {
return { workflowInfo, subWorkflowId: subWorkflowId! };
}
private prepareRawData(query: string | IDataObject, itemIndex: number): IDataObject {
private prepareRawData(
context: ISupplyDataFunctions,
query: string | IDataObject,
itemIndex: number,
): IDataObject {
const rawData: IDataObject = { query };
const workflowFieldsJson = this.context.getNodeParameter('fields.values', itemIndex, [], {
const workflowFieldsJson = context.getNodeParameter('fields.values', itemIndex, [], {
rawExpressions: true,
}) as SetField[];
@ -253,6 +273,7 @@ export class WorkflowToolService {
* Prepares the sub-workflow items for execution
*/
private async prepareWorkflowItems(
context: ISupplyDataFunctions,
query: string | IDataObject,
itemIndex: number,
rawData: IDataObject,
@ -261,17 +282,17 @@ export class WorkflowToolService {
let jsonData = typeof query === 'object' ? query : { query };
if (this.useSchema) {
const currentWorkflowInputs = getCurrentWorkflowInputData.call(this.context);
const currentWorkflowInputs = getCurrentWorkflowInputData.call(context);
jsonData = currentWorkflowInputs[itemIndex].json;
}
const newItem = await manual.execute.call(
this.context,
context,
{ json: jsonData },
itemIndex,
options,
rawData,
this.context.getNode(),
context.getNode(),
);
return [newItem] as INodeExecutionData[];
@ -299,7 +320,7 @@ export class WorkflowToolService {
private async extractFromAIParameters(): Promise<FromAIArgument[]> {
const collectedArguments: FromAIArgument[] = [];
traverseNodeParameters(this.context.getNode().parameters, collectedArguments);
traverseNodeParameters(this.baseContext.getNode().parameters, collectedArguments);
const uniqueArgsMap = new Map<string, FromAIArgument>();
for (const arg of collectedArguments) {

View file

@ -67,13 +67,13 @@ export interface VectorStoreNodeConstructorArgs<T extends VectorStore = VectorSt
retrieveFields?: INodeProperties[];
updateFields?: INodeProperties[];
populateVectorStore: (
context: ISupplyDataFunctions,
context: IExecuteFunctions | ISupplyDataFunctions,
embeddings: Embeddings,
documents: Array<Document<Record<string, unknown>>>,
itemIndex: number,
) => Promise<void>;
getVectorStoreClient: (
context: ISupplyDataFunctions,
context: IExecuteFunctions | ISupplyDataFunctions,
filter: Record<string, never> | undefined,
embeddings: Embeddings,
itemIndex: number,

View file

@ -1,6 +1,6 @@
import { DynamicStructuredTool, DynamicTool } from '@langchain/core/tools';
import { createMockExecuteFunction } from 'n8n-nodes-base/test/nodes/Helpers';
import type { INode } from 'n8n-workflow';
import type { INode, ISupplyDataFunctions } from 'n8n-workflow';
import { z } from 'zod';
import { N8nTool } from './N8nTool';
@ -20,7 +20,7 @@ describe('Test N8nTool wrapper as DynamicStructuredTool', () => {
it('should wrap a tool', () => {
const func = jest.fn();
const ctx = createMockExecuteFunction({}, mockNode);
const ctx = createMockExecuteFunction<ISupplyDataFunctions>({}, mockNode);
const tool = new N8nTool(ctx, {
name: 'Dummy Tool',
@ -39,7 +39,7 @@ describe('Test N8nTool wrapper - DynamicTool fallback', () => {
it('should convert the tool to a dynamic tool', () => {
const func = jest.fn();
const ctx = createMockExecuteFunction({}, mockNode);
const ctx = createMockExecuteFunction<ISupplyDataFunctions>({}, mockNode);
const tool = new N8nTool(ctx, {
name: 'Dummy Tool',
@ -58,7 +58,7 @@ describe('Test N8nTool wrapper - DynamicTool fallback', () => {
it('should format fallback description correctly', () => {
const func = jest.fn();
const ctx = createMockExecuteFunction({}, mockNode);
const ctx = createMockExecuteFunction<ISupplyDataFunctions>({}, mockNode);
const tool = new N8nTool(ctx, {
name: 'Dummy Tool',
@ -86,7 +86,7 @@ describe('Test N8nTool wrapper - DynamicTool fallback', () => {
it('should handle empty parameter list correctly', () => {
const func = jest.fn();
const ctx = createMockExecuteFunction({}, mockNode);
const ctx = createMockExecuteFunction<ISupplyDataFunctions>({}, mockNode);
const tool = new N8nTool(ctx, {
name: 'Dummy Tool',
@ -103,7 +103,7 @@ describe('Test N8nTool wrapper - DynamicTool fallback', () => {
it('should parse correct parameters', async () => {
const func = jest.fn();
const ctx = createMockExecuteFunction({}, mockNode);
const ctx = createMockExecuteFunction<ISupplyDataFunctions>({}, mockNode);
const tool = new N8nTool(ctx, {
name: 'Dummy Tool',
@ -127,7 +127,7 @@ describe('Test N8nTool wrapper - DynamicTool fallback', () => {
it('should recover when 1 parameter is passed directly', async () => {
const func = jest.fn();
const ctx = createMockExecuteFunction({}, mockNode);
const ctx = createMockExecuteFunction<ISupplyDataFunctions>({}, mockNode);
const tool = new N8nTool(ctx, {
name: 'Dummy Tool',
@ -150,7 +150,7 @@ describe('Test N8nTool wrapper - DynamicTool fallback', () => {
it('should recover when JS object is passed instead of JSON', async () => {
const func = jest.fn();
const ctx = createMockExecuteFunction({}, mockNode);
const ctx = createMockExecuteFunction<ISupplyDataFunctions>({}, mockNode);
const tool = new N8nTool(ctx, {
name: 'Dummy Tool',

View file

@ -1,7 +1,7 @@
import { DynamicTool, type Tool } from '@langchain/core/tools';
import { createMockExecuteFunction } from 'n8n-nodes-base/test/nodes/Helpers';
import { NodeOperationError } from 'n8n-workflow';
import type { IExecuteFunctions, INode } from 'n8n-workflow';
import type { ISupplyDataFunctions, IExecuteFunctions, INode } from 'n8n-workflow';
import { z } from 'zod';
import { escapeSingleCurlyBrackets, getConnectedTools } from '../helpers';
@ -171,7 +171,7 @@ describe('getConnectedTools', () => {
mockExecuteFunctions = createMockExecuteFunction({}, mockNode);
mockN8nTool = new N8nTool(mockExecuteFunctions, {
mockN8nTool = new N8nTool(mockExecuteFunctions as unknown as ISupplyDataFunctions, {
name: 'Dummy Tool',
description: 'A dummy tool for testing',
func: jest.fn(),

View file

@ -54,7 +54,9 @@ describe('SupplyDataContext', () => {
const credentialsHelper = mock<ICredentialsHelper>();
const additionalData = mock<IWorkflowExecuteAdditionalData>({ credentialsHelper });
const mode: WorkflowExecuteMode = 'manual';
const runExecutionData = mock<IRunExecutionData>();
const runExecutionData = mock<IRunExecutionData>({
resultData: { runData: {} },
});
const connectionInputData: INodeExecutionData[] = [];
const connectionType = NodeConnectionType.Main;
const inputData: ITaskDataConnections = { [connectionType]: [[{ json: { test: 'data' } }]] };
@ -175,4 +177,12 @@ describe('SupplyDataContext', () => {
]);
});
});
describe('cloneWith', () => {
it('should return a new copy', () => {
const clone = supplyDataContext.cloneWith({ runIndex: 12, inputData: [[{ json: {} }]] });
expect(clone.runIndex).toBe(12);
expect(clone).not.toBe(supplyDataContext);
});
});
});

View file

@ -13,11 +13,10 @@ import type {
ITaskDataConnections,
ITaskMetadata,
IWorkflowExecuteAdditionalData,
NodeConnectionType,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { createDeferredPromise } from 'n8n-workflow';
import { createDeferredPromise, NodeConnectionType } from 'n8n-workflow';
import { BaseExecuteContext } from './base-execute-context';
import {
@ -109,6 +108,28 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
)) as ISupplyDataFunctions['getNodeParameter'];
}
cloneWith(replacements: {
runIndex: number;
inputData: INodeExecutionData[][];
}): SupplyDataContext {
const context = new SupplyDataContext(
this.workflow,
this.node,
this.additionalData,
this.mode,
this.runExecutionData,
replacements.runIndex,
this.connectionInputData,
{},
this.connectionType,
this.executeData,
this.closeFunctions,
this.abortSignal,
);
context.addInputData(NodeConnectionType.AiTool, replacements.inputData);
return context;
}
async getInputConnectionData(
connectionType: AINodeConnectionType,
itemIndex: number,

View file

@ -386,7 +386,7 @@ export const getWorkflowFilenames = (dirname: string) => {
return workflows;
};
export const createMockExecuteFunction = (
export const createMockExecuteFunction = <T = IExecuteFunctions>(
nodeParameters: IDataObject,
nodeMock: INode,
continueBool = false,
@ -410,6 +410,6 @@ export const createMockExecuteFunction = (
helpers: {
constructExecutionMetaData,
},
} as unknown as IExecuteFunctions;
} as unknown as T;
return fakeExecuteFunction;
};

View file

@ -9,8 +9,9 @@ import type {
IDataObject,
ResourceMapperField,
ILocalLoadOptionsFunctions,
ISupplyDataFunctions,
WorkflowInputsData,
IExecuteFunctions,
ISupplyDataFunctions,
} from 'n8n-workflow';
import { jsonParse, NodeOperationError, EXECUTE_WORKFLOW_TRIGGER_NODE_TYPE } from 'n8n-workflow';
@ -100,7 +101,9 @@ export function getFieldEntries(context: IWorkflowNodeContext): {
throw new NodeOperationError(context.getNode(), result);
}
export function getWorkflowInputValues(this: ISupplyDataFunctions): INodeExecutionData[] {
export function getWorkflowInputValues(
this: IExecuteFunctions | ISupplyDataFunctions,
): INodeExecutionData[] {
const inputData = this.getInputData();
return inputData.map(({ json, binary }, itemIndex) => {
@ -124,7 +127,7 @@ export function getWorkflowInputValues(this: ISupplyDataFunctions): INodeExecuti
});
}
export function getCurrentWorkflowInputData(this: ISupplyDataFunctions) {
export function getCurrentWorkflowInputData(this: IExecuteFunctions | ISupplyDataFunctions) {
const inputData: INodeExecutionData[] = getWorkflowInputValues.call(this);
const schema = this.getNodeParameter('workflowInputs.schema', 0, []) as ResourceMapperField[];

View file

@ -985,6 +985,10 @@ export type ISupplyDataFunctions = ExecuteFunctions.GetNodeParameterFn &
getExecutionCancelSignal(): AbortSignal | undefined;
onExecutionCancellation(handler: () => unknown): void;
logAiEvent(eventName: AiEvent, msg?: string | undefined): void;
cloneWith(replacements: {
runIndex: number;
inputData: INodeExecutionData[][];
}): ISupplyDataFunctions;
};
export interface IExecutePaginationFunctions extends IExecuteSingleFunctions {