mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
refactor(core): Extract SupplyDataContext out of NodeExecutionFunctions (no-changelog) (#11834)
This commit is contained in:
parent
bce2366947
commit
fbaa17951f
|
@ -1,5 +1,10 @@
|
||||||
import { DynamicStructuredTool } from '@langchain/core/tools';
|
import { DynamicStructuredTool } from '@langchain/core/tools';
|
||||||
import type { IExecuteFunctions, INodeParameters, INodeType } from 'n8n-workflow';
|
import type {
|
||||||
|
IExecuteFunctions,
|
||||||
|
INodeParameters,
|
||||||
|
INodeType,
|
||||||
|
ISupplyDataFunctions,
|
||||||
|
} from 'n8n-workflow';
|
||||||
import { jsonParse, NodeConnectionType, NodeOperationError } from 'n8n-workflow';
|
import { jsonParse, NodeConnectionType, NodeOperationError } from 'n8n-workflow';
|
||||||
import { z } from 'zod';
|
import { z } from 'zod';
|
||||||
|
|
||||||
|
@ -18,13 +23,13 @@ interface FromAIArgument {
|
||||||
* generating Zod schemas, and creating LangChain tools.
|
* generating Zod schemas, and creating LangChain tools.
|
||||||
*/
|
*/
|
||||||
class AIParametersParser {
|
class AIParametersParser {
|
||||||
private ctx: IExecuteFunctions;
|
private ctx: ISupplyDataFunctions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs an instance of AIParametersParser.
|
* Constructs an instance of AIParametersParser.
|
||||||
* @param ctx The execution context.
|
* @param ctx The execution context.
|
||||||
*/
|
*/
|
||||||
constructor(ctx: IExecuteFunctions) {
|
constructor(ctx: ISupplyDataFunctions) {
|
||||||
this.ctx = ctx;
|
this.ctx = ctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -388,7 +393,7 @@ class AIParametersParser {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Execute the node with the proxied context
|
// Execute the node with the proxied context
|
||||||
const result = await node.execute?.bind(this.ctx)();
|
const result = await node.execute?.bind(this.ctx as IExecuteFunctions)();
|
||||||
|
|
||||||
// Process and map the results
|
// Process and map the results
|
||||||
const mappedResults = result?.[0]?.flatMap((item) => item.json);
|
const mappedResults = result?.[0]?.flatMap((item) => item.json);
|
||||||
|
@ -423,7 +428,7 @@ class AIParametersParser {
|
||||||
* @returns An object containing the DynamicStructuredTool instance.
|
* @returns An object containing the DynamicStructuredTool instance.
|
||||||
*/
|
*/
|
||||||
export function createNodeAsTool(
|
export function createNodeAsTool(
|
||||||
ctx: IExecuteFunctions,
|
ctx: ISupplyDataFunctions,
|
||||||
node: INodeType,
|
node: INodeType,
|
||||||
nodeParameters: INodeParameters,
|
nodeParameters: INodeParameters,
|
||||||
) {
|
) {
|
||||||
|
|
|
@ -173,6 +173,7 @@ import {
|
||||||
ExecuteSingleContext,
|
ExecuteSingleContext,
|
||||||
HookContext,
|
HookContext,
|
||||||
PollContext,
|
PollContext,
|
||||||
|
SupplyDataContext,
|
||||||
TriggerContext,
|
TriggerContext,
|
||||||
WebhookContext,
|
WebhookContext,
|
||||||
} from './node-execution-context';
|
} from './node-execution-context';
|
||||||
|
@ -2714,7 +2715,7 @@ export function getWebhookDescription(
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Change options to an object
|
// TODO: Change options to an object
|
||||||
const addExecutionDataFunctions = async (
|
export const addExecutionDataFunctions = async (
|
||||||
type: 'input' | 'output',
|
type: 'input' | 'output',
|
||||||
nodeName: string,
|
nodeName: string,
|
||||||
data: INodeExecutionData[][] | ExecutionBaseError,
|
data: INodeExecutionData[][] | ExecutionBaseError,
|
||||||
|
@ -2880,25 +2881,23 @@ export async function getInputConnectionData(
|
||||||
connectedNode.type,
|
connectedNode.type,
|
||||||
connectedNode.typeVersion,
|
connectedNode.typeVersion,
|
||||||
);
|
);
|
||||||
|
const context = new SupplyDataContext(
|
||||||
// eslint-disable-next-line @typescript-eslint/no-use-before-define
|
|
||||||
const context = getSupplyDataFunctions(
|
|
||||||
workflow,
|
workflow,
|
||||||
|
connectedNode,
|
||||||
|
additionalData,
|
||||||
|
mode,
|
||||||
runExecutionData,
|
runExecutionData,
|
||||||
runIndex,
|
runIndex,
|
||||||
connectionInputData,
|
connectionInputData,
|
||||||
inputData,
|
inputData,
|
||||||
connectedNode,
|
|
||||||
additionalData,
|
|
||||||
executeData,
|
executeData,
|
||||||
mode,
|
|
||||||
closeFunctions,
|
closeFunctions,
|
||||||
abortSignal,
|
abortSignal,
|
||||||
);
|
);
|
||||||
|
|
||||||
if (!nodeType.supplyData) {
|
if (!nodeType.supplyData) {
|
||||||
if (nodeType.description.outputs.includes(NodeConnectionType.AiTool)) {
|
if (nodeType.description.outputs.includes(NodeConnectionType.AiTool)) {
|
||||||
nodeType.supplyData = async function (this: IExecuteFunctions) {
|
nodeType.supplyData = async function (this: ISupplyDataFunctions) {
|
||||||
return createNodeAsTool(this, nodeType, this.getNode().parameters);
|
return createNodeAsTool(this, nodeType, this.getNode().parameters);
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
|
@ -3942,277 +3941,6 @@ export function getExecuteFunctions(
|
||||||
})(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions;
|
})(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function getSupplyDataFunctions(
|
|
||||||
workflow: Workflow,
|
|
||||||
runExecutionData: IRunExecutionData,
|
|
||||||
runIndex: number,
|
|
||||||
connectionInputData: INodeExecutionData[],
|
|
||||||
inputData: ITaskDataConnections,
|
|
||||||
node: INode,
|
|
||||||
additionalData: IWorkflowExecuteAdditionalData,
|
|
||||||
executeData: IExecuteData,
|
|
||||||
mode: WorkflowExecuteMode,
|
|
||||||
closeFunctions: CloseFunction[],
|
|
||||||
abortSignal?: AbortSignal,
|
|
||||||
): ISupplyDataFunctions {
|
|
||||||
return {
|
|
||||||
...getCommonWorkflowFunctions(workflow, node, additionalData),
|
|
||||||
...executionCancellationFunctions(abortSignal),
|
|
||||||
getMode: () => mode,
|
|
||||||
getCredentials: async (type, itemIndex) =>
|
|
||||||
await getCredentials(
|
|
||||||
workflow,
|
|
||||||
node,
|
|
||||||
type,
|
|
||||||
additionalData,
|
|
||||||
mode,
|
|
||||||
executeData,
|
|
||||||
runExecutionData,
|
|
||||||
runIndex,
|
|
||||||
connectionInputData,
|
|
||||||
itemIndex,
|
|
||||||
),
|
|
||||||
continueOnFail: () => continueOnFail(node),
|
|
||||||
evaluateExpression: (expression: string, itemIndex: number) =>
|
|
||||||
workflow.expression.resolveSimpleParameterValue(
|
|
||||||
`=${expression}`,
|
|
||||||
{},
|
|
||||||
runExecutionData,
|
|
||||||
runIndex,
|
|
||||||
itemIndex,
|
|
||||||
node.name,
|
|
||||||
connectionInputData,
|
|
||||||
mode,
|
|
||||||
getAdditionalKeys(additionalData, mode, runExecutionData),
|
|
||||||
executeData,
|
|
||||||
),
|
|
||||||
executeWorkflow: async (
|
|
||||||
workflowInfo: IExecuteWorkflowInfo,
|
|
||||||
inputData?: INodeExecutionData[],
|
|
||||||
parentCallbackManager?: CallbackManager,
|
|
||||||
options?: {
|
|
||||||
doNotWaitToFinish?: boolean;
|
|
||||||
parentExecution?: RelatedExecution;
|
|
||||||
},
|
|
||||||
): Promise<ExecuteWorkflowData> =>
|
|
||||||
await additionalData
|
|
||||||
.executeWorkflow(workflowInfo, additionalData, {
|
|
||||||
parentWorkflowId: workflow.id?.toString(),
|
|
||||||
inputData,
|
|
||||||
parentWorkflowSettings: workflow.settings,
|
|
||||||
node,
|
|
||||||
parentCallbackManager,
|
|
||||||
...options,
|
|
||||||
})
|
|
||||||
.then(async (result) => {
|
|
||||||
const data = await Container.get(BinaryDataService).duplicateBinaryData(
|
|
||||||
workflow.id,
|
|
||||||
additionalData.executionId!,
|
|
||||||
result.data,
|
|
||||||
);
|
|
||||||
return { ...result, data };
|
|
||||||
}),
|
|
||||||
getNodeOutputs() {
|
|
||||||
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
|
|
||||||
return NodeHelpers.getNodeOutputs(workflow, node, nodeType.description).map((output) => {
|
|
||||||
if (typeof output === 'string') {
|
|
||||||
return {
|
|
||||||
type: output,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
return output;
|
|
||||||
});
|
|
||||||
},
|
|
||||||
async getInputConnectionData(
|
|
||||||
inputName: NodeConnectionType,
|
|
||||||
itemIndex: number,
|
|
||||||
): Promise<unknown> {
|
|
||||||
return await getInputConnectionData.call(
|
|
||||||
this,
|
|
||||||
workflow,
|
|
||||||
runExecutionData,
|
|
||||||
runIndex,
|
|
||||||
connectionInputData,
|
|
||||||
inputData,
|
|
||||||
additionalData,
|
|
||||||
executeData,
|
|
||||||
mode,
|
|
||||||
closeFunctions,
|
|
||||||
inputName,
|
|
||||||
itemIndex,
|
|
||||||
abortSignal,
|
|
||||||
);
|
|
||||||
},
|
|
||||||
getInputData: (inputIndex = 0, inputName = 'main') => {
|
|
||||||
if (!inputData.hasOwnProperty(inputName)) {
|
|
||||||
// Return empty array because else it would throw error when nothing is connected to input
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Check if nodeType has input with that index defined
|
|
||||||
if (inputData[inputName].length < inputIndex) {
|
|
||||||
throw new ApplicationError('Could not get input with given index', {
|
|
||||||
extra: { inputIndex, inputName },
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if (inputData[inputName][inputIndex] === null) {
|
|
||||||
throw new ApplicationError('Value of input was not set', {
|
|
||||||
extra: { inputIndex, inputName },
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
return inputData[inputName][inputIndex];
|
|
||||||
},
|
|
||||||
getNodeParameter: ((
|
|
||||||
parameterName: string,
|
|
||||||
itemIndex: number,
|
|
||||||
fallbackValue?: any,
|
|
||||||
options?: IGetNodeParameterOptions,
|
|
||||||
) =>
|
|
||||||
getNodeParameter(
|
|
||||||
workflow,
|
|
||||||
runExecutionData,
|
|
||||||
runIndex,
|
|
||||||
connectionInputData,
|
|
||||||
node,
|
|
||||||
parameterName,
|
|
||||||
itemIndex,
|
|
||||||
mode,
|
|
||||||
getAdditionalKeys(additionalData, mode, runExecutionData),
|
|
||||||
executeData,
|
|
||||||
fallbackValue,
|
|
||||||
options,
|
|
||||||
)) as ISupplyDataFunctions['getNodeParameter'],
|
|
||||||
getWorkflowDataProxy: (itemIndex: number) =>
|
|
||||||
new WorkflowDataProxy(
|
|
||||||
workflow,
|
|
||||||
runExecutionData,
|
|
||||||
runIndex,
|
|
||||||
itemIndex,
|
|
||||||
node.name,
|
|
||||||
connectionInputData,
|
|
||||||
{},
|
|
||||||
mode,
|
|
||||||
getAdditionalKeys(additionalData, mode, runExecutionData),
|
|
||||||
executeData,
|
|
||||||
).getDataProxy(),
|
|
||||||
sendMessageToUI(...args: any[]): void {
|
|
||||||
if (mode !== 'manual') {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
if (additionalData.sendDataToUI) {
|
|
||||||
args = args.map((arg) => {
|
|
||||||
// prevent invalid dates from being logged as null
|
|
||||||
if (arg.isLuxonDateTime && arg.invalidReason) return { ...arg };
|
|
||||||
|
|
||||||
// log valid dates in human readable format, as in browser
|
|
||||||
if (arg.isLuxonDateTime) return new Date(arg.ts).toString();
|
|
||||||
if (arg instanceof Date) return arg.toString();
|
|
||||||
|
|
||||||
return arg;
|
|
||||||
});
|
|
||||||
|
|
||||||
additionalData.sendDataToUI('sendConsoleMessage', {
|
|
||||||
source: `[Node: "${node.name}"]`,
|
|
||||||
messages: args,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
Logger.warn(`There was a problem sending message to UI: ${error.message}`);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
logAiEvent: (eventName: AiEvent, msg: string) =>
|
|
||||||
additionalData.logAiEvent(eventName, {
|
|
||||||
executionId: additionalData.executionId ?? 'unsaved-execution',
|
|
||||||
nodeName: node.name,
|
|
||||||
workflowName: workflow.name ?? 'Unnamed workflow',
|
|
||||||
nodeType: node.type,
|
|
||||||
workflowId: workflow.id ?? 'unsaved-workflow',
|
|
||||||
msg,
|
|
||||||
}),
|
|
||||||
addInputData(
|
|
||||||
connectionType: NodeConnectionType,
|
|
||||||
data: INodeExecutionData[][],
|
|
||||||
): { index: number } {
|
|
||||||
const nodeName = this.getNode().name;
|
|
||||||
let currentNodeRunIndex = 0;
|
|
||||||
if (runExecutionData.resultData.runData.hasOwnProperty(nodeName)) {
|
|
||||||
currentNodeRunIndex = runExecutionData.resultData.runData[nodeName].length;
|
|
||||||
}
|
|
||||||
|
|
||||||
addExecutionDataFunctions(
|
|
||||||
'input',
|
|
||||||
this.getNode().name,
|
|
||||||
data,
|
|
||||||
runExecutionData,
|
|
||||||
connectionType,
|
|
||||||
additionalData,
|
|
||||||
node.name,
|
|
||||||
runIndex,
|
|
||||||
currentNodeRunIndex,
|
|
||||||
).catch((error) => {
|
|
||||||
Logger.warn(
|
|
||||||
`There was a problem logging input data of node "${this.getNode().name}": ${
|
|
||||||
error.message
|
|
||||||
}`,
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
return { index: currentNodeRunIndex };
|
|
||||||
},
|
|
||||||
addOutputData(
|
|
||||||
connectionType: NodeConnectionType,
|
|
||||||
currentNodeRunIndex: number,
|
|
||||||
data: INodeExecutionData[][],
|
|
||||||
metadata?: ITaskMetadata,
|
|
||||||
): void {
|
|
||||||
addExecutionDataFunctions(
|
|
||||||
'output',
|
|
||||||
this.getNode().name,
|
|
||||||
data,
|
|
||||||
runExecutionData,
|
|
||||||
connectionType,
|
|
||||||
additionalData,
|
|
||||||
node.name,
|
|
||||||
runIndex,
|
|
||||||
currentNodeRunIndex,
|
|
||||||
metadata,
|
|
||||||
).catch((error) => {
|
|
||||||
Logger.warn(
|
|
||||||
`There was a problem logging output data of node "${this.getNode().name}": ${
|
|
||||||
error.message
|
|
||||||
}`,
|
|
||||||
);
|
|
||||||
});
|
|
||||||
},
|
|
||||||
helpers: {
|
|
||||||
createDeferredPromise,
|
|
||||||
copyInputItems,
|
|
||||||
...getRequestHelperFunctions(
|
|
||||||
workflow,
|
|
||||||
node,
|
|
||||||
additionalData,
|
|
||||||
runExecutionData,
|
|
||||||
connectionInputData,
|
|
||||||
),
|
|
||||||
...getSSHTunnelFunctions(),
|
|
||||||
...getFileSystemHelperFunctions(node),
|
|
||||||
...getBinaryHelperFunctions(additionalData, workflow.id),
|
|
||||||
...getCheckProcessedHelperFunctions(workflow, node),
|
|
||||||
assertBinaryData: (itemIndex, propertyName) =>
|
|
||||||
assertBinaryData(inputData, node, itemIndex, propertyName, 0),
|
|
||||||
getBinaryDataBuffer: async (itemIndex, propertyName) =>
|
|
||||||
await getBinaryDataBuffer(inputData, itemIndex, propertyName, 0),
|
|
||||||
|
|
||||||
returnJsonArray,
|
|
||||||
normalizeItems,
|
|
||||||
constructExecutionMetaData,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the execute functions regular nodes have access to when single-function is defined.
|
* Returns the execute functions regular nodes have access to when single-function is defined.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -302,20 +302,6 @@ describe('ExecuteSingleContext', () => {
|
||||||
|
|
||||||
describe('setMetadata', () => {
|
describe('setMetadata', () => {
|
||||||
it('sets metadata on execution data', () => {
|
it('sets metadata on execution data', () => {
|
||||||
const context = new ExecuteSingleContext(
|
|
||||||
workflow,
|
|
||||||
node,
|
|
||||||
additionalData,
|
|
||||||
mode,
|
|
||||||
runExecutionData,
|
|
||||||
runIndex,
|
|
||||||
connectionInputData,
|
|
||||||
inputData,
|
|
||||||
itemIndex,
|
|
||||||
executeData,
|
|
||||||
abortSignal,
|
|
||||||
);
|
|
||||||
|
|
||||||
const metadata: ITaskMetadata = {
|
const metadata: ITaskMetadata = {
|
||||||
subExecution: {
|
subExecution: {
|
||||||
workflowId: '123',
|
workflowId: '123',
|
||||||
|
@ -323,9 +309,11 @@ describe('ExecuteSingleContext', () => {
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
expect(context.getExecuteData().metadata?.subExecution).toEqual(undefined);
|
expect(executeSingleContext.getExecuteData().metadata?.subExecution).toEqual(undefined);
|
||||||
context.setMetadata(metadata);
|
executeSingleContext.setMetadata(metadata);
|
||||||
expect(context.getExecuteData().metadata?.subExecution).toEqual(metadata.subExecution);
|
expect(executeSingleContext.getExecuteData().metadata?.subExecution).toEqual(
|
||||||
|
metadata.subExecution,
|
||||||
|
);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -0,0 +1,240 @@
|
||||||
|
import { mock } from 'jest-mock-extended';
|
||||||
|
import type {
|
||||||
|
INode,
|
||||||
|
IWorkflowExecuteAdditionalData,
|
||||||
|
IRunExecutionData,
|
||||||
|
INodeExecutionData,
|
||||||
|
ITaskDataConnections,
|
||||||
|
IExecuteData,
|
||||||
|
Workflow,
|
||||||
|
WorkflowExecuteMode,
|
||||||
|
ICredentialsHelper,
|
||||||
|
Expression,
|
||||||
|
INodeType,
|
||||||
|
INodeTypes,
|
||||||
|
OnError,
|
||||||
|
ICredentialDataDecryptedObject,
|
||||||
|
} from 'n8n-workflow';
|
||||||
|
import { ApplicationError } from 'n8n-workflow';
|
||||||
|
|
||||||
|
import { SupplyDataContext } from '../supply-data-context';
|
||||||
|
|
||||||
|
describe('SupplyDataContext', () => {
|
||||||
|
const testCredentialType = 'testCredential';
|
||||||
|
const nodeType = mock<INodeType>({
|
||||||
|
description: {
|
||||||
|
credentials: [
|
||||||
|
{
|
||||||
|
name: testCredentialType,
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
properties: [
|
||||||
|
{
|
||||||
|
name: 'testParameter',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const nodeTypes = mock<INodeTypes>();
|
||||||
|
const expression = mock<Expression>();
|
||||||
|
const workflow = mock<Workflow>({ expression, nodeTypes });
|
||||||
|
const node = mock<INode>({
|
||||||
|
credentials: {
|
||||||
|
[testCredentialType]: {
|
||||||
|
id: 'testCredentialId',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
node.parameters = {
|
||||||
|
testParameter: 'testValue',
|
||||||
|
};
|
||||||
|
const credentialsHelper = mock<ICredentialsHelper>();
|
||||||
|
const additionalData = mock<IWorkflowExecuteAdditionalData>({ credentialsHelper });
|
||||||
|
const mode: WorkflowExecuteMode = 'manual';
|
||||||
|
const runExecutionData = mock<IRunExecutionData>();
|
||||||
|
const connectionInputData = mock<INodeExecutionData[]>();
|
||||||
|
const inputData: ITaskDataConnections = { main: [[{ json: { test: 'data' } }]] };
|
||||||
|
const executeData = mock<IExecuteData>();
|
||||||
|
const runIndex = 0;
|
||||||
|
const closeFn = jest.fn();
|
||||||
|
const abortSignal = mock<AbortSignal>();
|
||||||
|
|
||||||
|
const supplyDataContext = new SupplyDataContext(
|
||||||
|
workflow,
|
||||||
|
node,
|
||||||
|
additionalData,
|
||||||
|
mode,
|
||||||
|
runExecutionData,
|
||||||
|
runIndex,
|
||||||
|
connectionInputData,
|
||||||
|
inputData,
|
||||||
|
executeData,
|
||||||
|
[closeFn],
|
||||||
|
abortSignal,
|
||||||
|
);
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
nodeTypes.getByNameAndVersion.mockReturnValue(nodeType);
|
||||||
|
expression.getParameterValue.mockImplementation((value) => value);
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getExecutionCancelSignal', () => {
|
||||||
|
it('should return the abort signal', () => {
|
||||||
|
expect(supplyDataContext.getExecutionCancelSignal()).toBe(abortSignal);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('continueOnFail', () => {
|
||||||
|
afterEach(() => {
|
||||||
|
node.onError = undefined;
|
||||||
|
node.continueOnFail = false;
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return false for nodes by default', () => {
|
||||||
|
expect(supplyDataContext.continueOnFail()).toEqual(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return true if node has continueOnFail set to true', () => {
|
||||||
|
node.continueOnFail = true;
|
||||||
|
expect(supplyDataContext.continueOnFail()).toEqual(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
test.each([
|
||||||
|
['continueRegularOutput', true],
|
||||||
|
['continueErrorOutput', true],
|
||||||
|
['stopWorkflow', false],
|
||||||
|
])('if node has onError set to %s, it should return %s', (onError, expected) => {
|
||||||
|
node.onError = onError as OnError;
|
||||||
|
expect(supplyDataContext.continueOnFail()).toEqual(expected);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('evaluateExpression', () => {
|
||||||
|
it('should evaluate the expression correctly', () => {
|
||||||
|
const expression = '$json.test';
|
||||||
|
const expectedResult = 'data';
|
||||||
|
const resolveSimpleParameterValueSpy = jest.spyOn(
|
||||||
|
workflow.expression,
|
||||||
|
'resolveSimpleParameterValue',
|
||||||
|
);
|
||||||
|
resolveSimpleParameterValueSpy.mockReturnValue(expectedResult);
|
||||||
|
|
||||||
|
expect(supplyDataContext.evaluateExpression(expression, 0)).toEqual(expectedResult);
|
||||||
|
|
||||||
|
expect(resolveSimpleParameterValueSpy).toHaveBeenCalledWith(
|
||||||
|
`=${expression}`,
|
||||||
|
{},
|
||||||
|
runExecutionData,
|
||||||
|
runIndex,
|
||||||
|
0,
|
||||||
|
node.name,
|
||||||
|
connectionInputData,
|
||||||
|
mode,
|
||||||
|
expect.objectContaining({}),
|
||||||
|
executeData,
|
||||||
|
);
|
||||||
|
|
||||||
|
resolveSimpleParameterValueSpy.mockRestore();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getInputData', () => {
|
||||||
|
const inputIndex = 0;
|
||||||
|
const inputName = 'main';
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
inputData[inputName] = [[{ json: { test: 'data' } }]];
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return the input data correctly', () => {
|
||||||
|
const expectedData = [{ json: { test: 'data' } }];
|
||||||
|
|
||||||
|
expect(supplyDataContext.getInputData(inputIndex, inputName)).toEqual(expectedData);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return an empty array if the input name does not exist', () => {
|
||||||
|
const inputName = 'nonExistent';
|
||||||
|
expect(supplyDataContext.getInputData(inputIndex, inputName)).toEqual([]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw an error if the input index is out of range', () => {
|
||||||
|
const inputIndex = 2;
|
||||||
|
|
||||||
|
expect(() => supplyDataContext.getInputData(inputIndex, inputName)).toThrow(ApplicationError);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw an error if the input index was not set', () => {
|
||||||
|
inputData.main[inputIndex] = null;
|
||||||
|
|
||||||
|
expect(() => supplyDataContext.getInputData(inputIndex, inputName)).toThrow(ApplicationError);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getNodeParameter', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
nodeTypes.getByNameAndVersion.mockReturnValue(nodeType);
|
||||||
|
expression.getParameterValue.mockImplementation((value) => value);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return parameter value when it exists', () => {
|
||||||
|
const parameter = supplyDataContext.getNodeParameter('testParameter', 0);
|
||||||
|
|
||||||
|
expect(parameter).toBe('testValue');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return the fallback value when the parameter does not exist', () => {
|
||||||
|
const parameter = supplyDataContext.getNodeParameter('otherParameter', 0, 'fallback');
|
||||||
|
|
||||||
|
expect(parameter).toBe('fallback');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getCredentials', () => {
|
||||||
|
it('should get decrypted credentials', async () => {
|
||||||
|
nodeTypes.getByNameAndVersion.mockReturnValue(nodeType);
|
||||||
|
credentialsHelper.getDecrypted.mockResolvedValue({ secret: 'token' });
|
||||||
|
|
||||||
|
const credentials = await supplyDataContext.getCredentials<ICredentialDataDecryptedObject>(
|
||||||
|
testCredentialType,
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(credentials).toEqual({ secret: 'token' });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getWorkflowDataProxy', () => {
|
||||||
|
it('should return the workflow data proxy correctly', () => {
|
||||||
|
const workflowDataProxy = supplyDataContext.getWorkflowDataProxy(0);
|
||||||
|
expect(workflowDataProxy.isProxy).toBe(true);
|
||||||
|
expect(Object.keys(workflowDataProxy.$input)).toEqual([
|
||||||
|
'all',
|
||||||
|
'context',
|
||||||
|
'first',
|
||||||
|
'item',
|
||||||
|
'last',
|
||||||
|
'params',
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('logAiEvent', () => {
|
||||||
|
it('should log the AI event correctly', () => {
|
||||||
|
const eventName = 'ai-tool-called';
|
||||||
|
const msg = 'test message';
|
||||||
|
|
||||||
|
supplyDataContext.logAiEvent(eventName, msg);
|
||||||
|
|
||||||
|
expect(additionalData.logAiEvent).toHaveBeenCalledWith(eventName, {
|
||||||
|
executionId: additionalData.executionId,
|
||||||
|
nodeName: node.name,
|
||||||
|
workflowName: workflow.name,
|
||||||
|
nodeType: node.type,
|
||||||
|
workflowId: workflow.id,
|
||||||
|
msg,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
|
@ -3,5 +3,6 @@ export { ExecuteSingleContext } from './execute-single-context';
|
||||||
export { HookContext } from './hook-context';
|
export { HookContext } from './hook-context';
|
||||||
export { LoadOptionsContext } from './load-options-context';
|
export { LoadOptionsContext } from './load-options-context';
|
||||||
export { PollContext } from './poll-context';
|
export { PollContext } from './poll-context';
|
||||||
|
export { SupplyDataContext } from './supply-data-context';
|
||||||
export { TriggerContext } from './trigger-context';
|
export { TriggerContext } from './trigger-context';
|
||||||
export { WebhookContext } from './webhook-context';
|
export { WebhookContext } from './webhook-context';
|
||||||
|
|
371
packages/core/src/node-execution-context/supply-data-context.ts
Normal file
371
packages/core/src/node-execution-context/supply-data-context.ts
Normal file
|
@ -0,0 +1,371 @@
|
||||||
|
import type {
|
||||||
|
AiEvent,
|
||||||
|
CallbackManager,
|
||||||
|
CloseFunction,
|
||||||
|
ExecuteWorkflowData,
|
||||||
|
ICredentialDataDecryptedObject,
|
||||||
|
IExecuteData,
|
||||||
|
IExecuteWorkflowInfo,
|
||||||
|
IGetNodeParameterOptions,
|
||||||
|
INode,
|
||||||
|
INodeExecutionData,
|
||||||
|
IRunExecutionData,
|
||||||
|
ISupplyDataFunctions,
|
||||||
|
ITaskDataConnections,
|
||||||
|
ITaskMetadata,
|
||||||
|
IWorkflowExecuteAdditionalData,
|
||||||
|
NodeConnectionType,
|
||||||
|
RelatedExecution,
|
||||||
|
Workflow,
|
||||||
|
WorkflowExecuteMode,
|
||||||
|
} from 'n8n-workflow';
|
||||||
|
import {
|
||||||
|
ApplicationError,
|
||||||
|
createDeferredPromise,
|
||||||
|
NodeHelpers,
|
||||||
|
WorkflowDataProxy,
|
||||||
|
} from 'n8n-workflow';
|
||||||
|
import Container from 'typedi';
|
||||||
|
|
||||||
|
import { BinaryDataService } from '@/BinaryData/BinaryData.service';
|
||||||
|
// eslint-disable-next-line import/no-cycle
|
||||||
|
import {
|
||||||
|
assertBinaryData,
|
||||||
|
continueOnFail,
|
||||||
|
constructExecutionMetaData,
|
||||||
|
copyInputItems,
|
||||||
|
getAdditionalKeys,
|
||||||
|
getBinaryDataBuffer,
|
||||||
|
getBinaryHelperFunctions,
|
||||||
|
getCheckProcessedHelperFunctions,
|
||||||
|
getCredentials,
|
||||||
|
getFileSystemHelperFunctions,
|
||||||
|
getNodeParameter,
|
||||||
|
getRequestHelperFunctions,
|
||||||
|
getSSHTunnelFunctions,
|
||||||
|
normalizeItems,
|
||||||
|
returnJsonArray,
|
||||||
|
getInputConnectionData,
|
||||||
|
addExecutionDataFunctions,
|
||||||
|
} from '@/NodeExecuteFunctions';
|
||||||
|
|
||||||
|
import { NodeExecutionContext } from './node-execution-context';
|
||||||
|
|
||||||
|
export class SupplyDataContext extends NodeExecutionContext implements ISupplyDataFunctions {
|
||||||
|
readonly helpers: ISupplyDataFunctions['helpers'];
|
||||||
|
|
||||||
|
readonly getNodeParameter: ISupplyDataFunctions['getNodeParameter'];
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
workflow: Workflow,
|
||||||
|
node: INode,
|
||||||
|
additionalData: IWorkflowExecuteAdditionalData,
|
||||||
|
mode: WorkflowExecuteMode,
|
||||||
|
private readonly runExecutionData: IRunExecutionData,
|
||||||
|
private readonly runIndex: number,
|
||||||
|
private readonly connectionInputData: INodeExecutionData[],
|
||||||
|
private readonly inputData: ITaskDataConnections,
|
||||||
|
private readonly executeData: IExecuteData,
|
||||||
|
private readonly closeFunctions: CloseFunction[],
|
||||||
|
private readonly abortSignal?: AbortSignal,
|
||||||
|
) {
|
||||||
|
super(workflow, node, additionalData, mode);
|
||||||
|
|
||||||
|
this.helpers = {
|
||||||
|
createDeferredPromise,
|
||||||
|
copyInputItems,
|
||||||
|
...getRequestHelperFunctions(
|
||||||
|
workflow,
|
||||||
|
node,
|
||||||
|
additionalData,
|
||||||
|
runExecutionData,
|
||||||
|
connectionInputData,
|
||||||
|
),
|
||||||
|
...getSSHTunnelFunctions(),
|
||||||
|
...getFileSystemHelperFunctions(node),
|
||||||
|
...getBinaryHelperFunctions(additionalData, workflow.id),
|
||||||
|
...getCheckProcessedHelperFunctions(workflow, node),
|
||||||
|
assertBinaryData: (itemIndex, propertyName) =>
|
||||||
|
assertBinaryData(inputData, node, itemIndex, propertyName, 0),
|
||||||
|
getBinaryDataBuffer: async (itemIndex, propertyName) =>
|
||||||
|
await getBinaryDataBuffer(inputData, itemIndex, propertyName, 0),
|
||||||
|
|
||||||
|
returnJsonArray,
|
||||||
|
normalizeItems,
|
||||||
|
constructExecutionMetaData,
|
||||||
|
};
|
||||||
|
|
||||||
|
this.getNodeParameter = ((
|
||||||
|
parameterName: string,
|
||||||
|
itemIndex: number,
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
|
fallbackValue?: any,
|
||||||
|
options?: IGetNodeParameterOptions,
|
||||||
|
) =>
|
||||||
|
getNodeParameter(
|
||||||
|
this.workflow,
|
||||||
|
this.runExecutionData,
|
||||||
|
this.runIndex,
|
||||||
|
this.connectionInputData,
|
||||||
|
this.node,
|
||||||
|
parameterName,
|
||||||
|
itemIndex,
|
||||||
|
this.mode,
|
||||||
|
getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData),
|
||||||
|
this.executeData,
|
||||||
|
fallbackValue,
|
||||||
|
options,
|
||||||
|
)) as ISupplyDataFunctions['getNodeParameter'];
|
||||||
|
}
|
||||||
|
|
||||||
|
getExecutionCancelSignal() {
|
||||||
|
return this.abortSignal;
|
||||||
|
}
|
||||||
|
|
||||||
|
onExecutionCancellation(handler: () => unknown) {
|
||||||
|
const fn = () => {
|
||||||
|
this.abortSignal?.removeEventListener('abort', fn);
|
||||||
|
handler();
|
||||||
|
};
|
||||||
|
this.abortSignal?.addEventListener('abort', fn);
|
||||||
|
}
|
||||||
|
|
||||||
|
async getCredentials<T extends object = ICredentialDataDecryptedObject>(
|
||||||
|
type: string,
|
||||||
|
itemIndex: number,
|
||||||
|
) {
|
||||||
|
return await getCredentials<T>(
|
||||||
|
this.workflow,
|
||||||
|
this.node,
|
||||||
|
type,
|
||||||
|
this.additionalData,
|
||||||
|
this.mode,
|
||||||
|
this.executeData,
|
||||||
|
this.runExecutionData,
|
||||||
|
this.runIndex,
|
||||||
|
this.connectionInputData,
|
||||||
|
itemIndex,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
continueOnFail() {
|
||||||
|
return continueOnFail(this.node);
|
||||||
|
}
|
||||||
|
|
||||||
|
evaluateExpression(expression: string, itemIndex: number) {
|
||||||
|
return this.workflow.expression.resolveSimpleParameterValue(
|
||||||
|
`=${expression}`,
|
||||||
|
{},
|
||||||
|
this.runExecutionData,
|
||||||
|
this.runIndex,
|
||||||
|
itemIndex,
|
||||||
|
this.node.name,
|
||||||
|
this.connectionInputData,
|
||||||
|
this.mode,
|
||||||
|
getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData),
|
||||||
|
this.executeData,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async executeWorkflow(
|
||||||
|
workflowInfo: IExecuteWorkflowInfo,
|
||||||
|
inputData?: INodeExecutionData[],
|
||||||
|
parentCallbackManager?: CallbackManager,
|
||||||
|
options?: {
|
||||||
|
doNotWaitToFinish?: boolean;
|
||||||
|
parentExecution?: RelatedExecution;
|
||||||
|
},
|
||||||
|
): Promise<ExecuteWorkflowData> {
|
||||||
|
return await this.additionalData
|
||||||
|
.executeWorkflow(workflowInfo, this.additionalData, {
|
||||||
|
parentWorkflowId: this.workflow.id?.toString(),
|
||||||
|
inputData,
|
||||||
|
parentWorkflowSettings: this.workflow.settings,
|
||||||
|
node: this.node,
|
||||||
|
parentCallbackManager,
|
||||||
|
...options,
|
||||||
|
})
|
||||||
|
.then(async (result) => {
|
||||||
|
const data = await Container.get(BinaryDataService).duplicateBinaryData(
|
||||||
|
this.workflow.id,
|
||||||
|
this.additionalData.executionId!,
|
||||||
|
result.data,
|
||||||
|
);
|
||||||
|
return { ...result, data };
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getNodeOutputs() {
|
||||||
|
const nodeType = this.workflow.nodeTypes.getByNameAndVersion(
|
||||||
|
this.node.type,
|
||||||
|
this.node.typeVersion,
|
||||||
|
);
|
||||||
|
return NodeHelpers.getNodeOutputs(this.workflow, this.node, nodeType.description).map(
|
||||||
|
(output) => {
|
||||||
|
if (typeof output === 'string') {
|
||||||
|
return {
|
||||||
|
type: output,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return output;
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async getInputConnectionData(inputName: NodeConnectionType, itemIndex: number): Promise<unknown> {
|
||||||
|
return await getInputConnectionData.call(
|
||||||
|
this,
|
||||||
|
this.workflow,
|
||||||
|
this.runExecutionData,
|
||||||
|
this.runIndex,
|
||||||
|
this.connectionInputData,
|
||||||
|
this.inputData,
|
||||||
|
this.additionalData,
|
||||||
|
this.executeData,
|
||||||
|
this.mode,
|
||||||
|
this.closeFunctions,
|
||||||
|
inputName,
|
||||||
|
itemIndex,
|
||||||
|
this.abortSignal,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
getInputData(inputIndex = 0, inputName = 'main') {
|
||||||
|
if (!this.inputData.hasOwnProperty(inputName)) {
|
||||||
|
// Return empty array because else it would throw error when nothing is connected to input
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Check if nodeType has input with that index defined
|
||||||
|
if (this.inputData[inputName].length < inputIndex) {
|
||||||
|
throw new ApplicationError('Could not get input with given index', {
|
||||||
|
extra: { inputIndex, inputName },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.inputData[inputName][inputIndex] === null) {
|
||||||
|
throw new ApplicationError('Value of input was not set', {
|
||||||
|
extra: { inputIndex, inputName },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.inputData[inputName][inputIndex];
|
||||||
|
}
|
||||||
|
|
||||||
|
getWorkflowDataProxy(itemIndex: number) {
|
||||||
|
return new WorkflowDataProxy(
|
||||||
|
this.workflow,
|
||||||
|
this.runExecutionData,
|
||||||
|
this.runIndex,
|
||||||
|
itemIndex,
|
||||||
|
this.node.name,
|
||||||
|
this.connectionInputData,
|
||||||
|
{},
|
||||||
|
this.mode,
|
||||||
|
getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData),
|
||||||
|
this.executeData,
|
||||||
|
).getDataProxy();
|
||||||
|
}
|
||||||
|
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
|
sendMessageToUI(...args: any[]): void {
|
||||||
|
if (this.mode !== 'manual') {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
if (this.additionalData.sendDataToUI) {
|
||||||
|
args = args.map((arg) => {
|
||||||
|
// prevent invalid dates from being logged as null
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-return
|
||||||
|
if (arg.isLuxonDateTime && arg.invalidReason) return { ...arg };
|
||||||
|
|
||||||
|
// log valid dates in human readable format, as in browser
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-argument
|
||||||
|
if (arg.isLuxonDateTime) return new Date(arg.ts).toString();
|
||||||
|
if (arg instanceof Date) return arg.toString();
|
||||||
|
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
|
||||||
|
return arg;
|
||||||
|
});
|
||||||
|
|
||||||
|
this.additionalData.sendDataToUI('sendConsoleMessage', {
|
||||||
|
source: `[Node: "${this.node.name}"]`,
|
||||||
|
messages: args,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
||||||
|
this.logger.warn(`There was a problem sending message to UI: ${error.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logAiEvent(eventName: AiEvent, msg: string) {
|
||||||
|
return this.additionalData.logAiEvent(eventName, {
|
||||||
|
executionId: this.additionalData.executionId ?? 'unsaved-execution',
|
||||||
|
nodeName: this.node.name,
|
||||||
|
workflowName: this.workflow.name ?? 'Unnamed workflow',
|
||||||
|
nodeType: this.node.type,
|
||||||
|
workflowId: this.workflow.id ?? 'unsaved-workflow',
|
||||||
|
msg,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
addInputData(
|
||||||
|
connectionType: NodeConnectionType,
|
||||||
|
data: INodeExecutionData[][],
|
||||||
|
): { index: number } {
|
||||||
|
const nodeName = this.getNode().name;
|
||||||
|
let currentNodeRunIndex = 0;
|
||||||
|
if (this.runExecutionData.resultData.runData.hasOwnProperty(nodeName)) {
|
||||||
|
currentNodeRunIndex = this.runExecutionData.resultData.runData[nodeName].length;
|
||||||
|
}
|
||||||
|
|
||||||
|
addExecutionDataFunctions(
|
||||||
|
'input',
|
||||||
|
this.node.name,
|
||||||
|
data,
|
||||||
|
this.runExecutionData,
|
||||||
|
connectionType,
|
||||||
|
this.additionalData,
|
||||||
|
this.node.name,
|
||||||
|
this.runIndex,
|
||||||
|
currentNodeRunIndex,
|
||||||
|
).catch((error) => {
|
||||||
|
this.logger.warn(
|
||||||
|
`There was a problem logging input data of node "${this.node.name}": ${
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
||||||
|
error.message
|
||||||
|
}`,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
return { index: currentNodeRunIndex };
|
||||||
|
}
|
||||||
|
|
||||||
|
addOutputData(
|
||||||
|
connectionType: NodeConnectionType,
|
||||||
|
currentNodeRunIndex: number,
|
||||||
|
data: INodeExecutionData[][],
|
||||||
|
metadata?: ITaskMetadata,
|
||||||
|
): void {
|
||||||
|
addExecutionDataFunctions(
|
||||||
|
'output',
|
||||||
|
this.node.name,
|
||||||
|
data,
|
||||||
|
this.runExecutionData,
|
||||||
|
connectionType,
|
||||||
|
this.additionalData,
|
||||||
|
this.node.name,
|
||||||
|
this.runIndex,
|
||||||
|
currentNodeRunIndex,
|
||||||
|
metadata,
|
||||||
|
).catch((error) => {
|
||||||
|
this.logger.warn(
|
||||||
|
`There was a problem logging output data of node "${this.node.name}": ${
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
||||||
|
error.message
|
||||||
|
}`,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue