extract out execute-single and webhook contexts

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-10-25 22:02:54 +02:00
parent 7b945afebd
commit b1c5559832
No known key found for this signature in database
15 changed files with 549 additions and 354 deletions

View file

@ -138,7 +138,7 @@ export class N8nLlmTracing extends BaseCallbackHandler {
this.executionFunctions.addOutputData(this.connectionType, runDetails.index, [
[{ json: { ...response } }],
]);
void logAiEvent(this.executionFunctions, 'ai-llm-generated-output', {
logAiEvent(this.executionFunctions, 'ai-llm-generated-output', {
messages: parsedMessages,
options: runDetails.options,
response,
@ -186,7 +186,7 @@ export class N8nLlmTracing extends BaseCallbackHandler {
});
}
void logAiEvent(this.executionFunctions, 'ai-llm-errored', {
logAiEvent(this.executionFunctions, 'ai-llm-errored', {
error: Object.keys(error).length === 0 ? error.toString() : error,
runId,
parentRunId,

View file

@ -281,7 +281,7 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
});
resultData.push(...serializedDocs);
void logAiEvent(this, 'ai-vector-store-searched', { query: prompt });
logAiEvent(this, 'ai-vector-store-searched', { query: prompt });
}
return [resultData];
@ -311,7 +311,7 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
try {
await args.populateVectorStore(this, embeddings, processedDocuments, itemIndex);
void logAiEvent(this, 'ai-vector-store-populated');
logAiEvent(this, 'ai-vector-store-populated');
} catch (error) {
throw error;
}
@ -365,7 +365,7 @@ export const createVectorStoreNode = (args: VectorStoreNodeConstructorArgs) =>
ids: [documentId],
});
void logAiEvent(this, 'ai-vector-store-updated');
logAiEvent(this, 'ai-vector-store-updated');
} catch (error) {
throw error;
}

View file

@ -133,13 +133,13 @@ export function getSessionId(
return sessionId;
}
export async function logAiEvent(
export function logAiEvent(
executeFunctions: IExecuteFunctions,
event: AiEvent,
data?: IDataObject,
) {
try {
await executeFunctions.logAiEvent(event, data ? jsonStringify(data) : undefined);
executeFunctions.logAiEvent(event, data ? jsonStringify(data) : undefined);
} catch (error) {
executeFunctions.logger.debug(`Error logging AI event: ${event}`);
}

View file

@ -190,7 +190,7 @@ export function logWrapper(
const payload = { action: 'getMessages', response };
executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]);
void logAiEvent(executeFunctions, 'ai-messages-retrieved-from-memory', { response });
logAiEvent(executeFunctions, 'ai-messages-retrieved-from-memory', { response });
return response;
};
} else if (prop === 'addMessage' && 'addMessage' in target) {
@ -207,7 +207,7 @@ export function logWrapper(
arguments: [message],
});
void logAiEvent(executeFunctions, 'ai-message-added-to-memory', { message });
logAiEvent(executeFunctions, 'ai-message-added-to-memory', { message });
executeFunctions.addOutputData(connectionType, index, [[{ json: payload }]]);
};
}
@ -233,7 +233,7 @@ export function logWrapper(
arguments: [query, config],
})) as Array<Document<Record<string, any>>>;
void logAiEvent(executeFunctions, 'ai-documents-retrieved', { query });
logAiEvent(executeFunctions, 'ai-documents-retrieved', { query });
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
@ -258,7 +258,7 @@ export function logWrapper(
arguments: [documents],
})) as number[][];
void logAiEvent(executeFunctions, 'ai-document-embedded');
logAiEvent(executeFunctions, 'ai-document-embedded');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
@ -278,7 +278,7 @@ export function logWrapper(
method: target[prop],
arguments: [query],
})) as number[];
void logAiEvent(executeFunctions, 'ai-query-embedded');
logAiEvent(executeFunctions, 'ai-query-embedded');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
@ -323,7 +323,7 @@ export function logWrapper(
arguments: [item, itemIndex],
})) as number[];
void logAiEvent(executeFunctions, 'ai-document-processed');
logAiEvent(executeFunctions, 'ai-document-processed');
executeFunctions.addOutputData(connectionType, index, [
[{ json: { response }, pairedItem: { item: itemIndex } }],
]);
@ -349,7 +349,7 @@ export function logWrapper(
arguments: [text],
})) as string[];
void logAiEvent(executeFunctions, 'ai-text-split');
logAiEvent(executeFunctions, 'ai-text-split');
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
@ -373,7 +373,7 @@ export function logWrapper(
arguments: [query],
})) as string;
void logAiEvent(executeFunctions, 'ai-tool-called', { query, response });
logAiEvent(executeFunctions, 'ai-tool-called', { query, response });
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;
};
@ -403,7 +403,7 @@ export function logWrapper(
arguments: [query, k, filter, _callbacks],
})) as Array<Document<Record<string, any>>>;
void logAiEvent(executeFunctions, 'ai-vector-store-searched', { query });
logAiEvent(executeFunctions, 'ai-vector-store-searched', { query });
executeFunctions.addOutputData(connectionType, index, [[{ json: { response } }]]);
return response;

View file

@ -48,7 +48,7 @@ export class N8nOutputFixingParser extends BaseOutputParser {
try {
// First attempt to parse the completion
const response = await this.outputParser.parse(completion, callbacks, (e) => e);
void logAiEvent(this.context, 'ai-output-parsed', { text: completion, response });
logAiEvent(this.context, 'ai-output-parsed', { text: completion, response });
this.context.addOutputData(NodeConnectionType.AiOutputParser, index, [
[{ json: { action: 'parse', response } }],

View file

@ -39,7 +39,7 @@ export class N8nStructuredOutputParser extends StructuredOutputParser<
get(parsed, STRUCTURED_OUTPUT_KEY) ??
parsed) as Record<string, unknown>;
void logAiEvent(this.context, 'ai-output-parsed', { text, response: result });
logAiEvent(this.context, 'ai-output-parsed', { text, response: result });
this.context.addOutputData(NodeConnectionType.AiOutputParser, index, [
[{ json: { action: 'parse', response: result } }],
@ -56,7 +56,7 @@ export class N8nStructuredOutputParser extends StructuredOutputParser<
},
);
void logAiEvent(this.context, 'ai-output-parsed', {
logAiEvent(this.context, 'ai-output-parsed', {
text,
response: e.message ?? e,
});

View file

@ -78,7 +78,6 @@ import type {
IPollFunctions,
IRequestOptions,
IRunExecutionData,
ISourceData,
ITaskData,
ITaskDataConnections,
ITriggerFunctions,
@ -169,6 +168,8 @@ import { getSecretsProxy } from './Secrets';
import { SSHClientsManager } from './SSHClientsManager';
import { PollContext } from './node-execution-context';
import { TriggerContext } from './node-execution-context/trigger-context';
import { ExecuteSingleContext } from './node-execution-context/execute-single-context';
import { WebhookContext } from './node-execution-context/webhook-context';
axios.defaults.timeout = 300000;
// Prevent axios from adding x-form-www-urlencoded headers by default
@ -2712,7 +2713,7 @@ const addExecutionDataFunctions = async (
}
};
async function getInputConnectionData(
export async function getInputConnectionData(
context: IAllExecuteFunctions,
workflow: Workflow,
runExecutionData: IRunExecutionData,
@ -2998,14 +2999,6 @@ const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({
await Container.get(SSHClientsManager).getClient(credentials),
});
const getSchedulingFunctions = (workflow: Workflow): SchedulingFunctions => {
const scheduledTaskManager = Container.get(ScheduledTaskManager);
return {
registerCron: (cronExpression, onTick) =>
scheduledTaskManager.registerCron(workflow, cronExpression, onTick),
};
};
const getAllowedPaths = () => {
const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO];
if (!restrictFileAccessTo) {
@ -3530,7 +3523,7 @@ export function getExecuteFunctions(
constructExecutionMetaData,
},
nodeHelpers: getNodeHelperFunctions(additionalData, workflow.id),
logAiEvent: async (eventName: AiEvent, msg: string) => {
logAiEvent: (eventName: AiEvent, msg: string) => {
return additionalData.logAiEvent(eventName, {
executionId: additionalData.executionId ?? 'unsaved-execution',
nodeName: node.name,
@ -3558,161 +3551,33 @@ export function getExecuteFunctions(
})(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions;
}
/**
* Returns the execute functions regular nodes have access to when single-function is defined.
*/
/** @deprecated */
export function getExecuteSingleFunctions(
workflow: Workflow,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
runExecutionData: IRunExecutionData,
runIndex: number,
connectionInputData: INodeExecutionData[],
inputData: ITaskDataConnections,
node: INode,
itemIndex: number,
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
): IExecuteSingleFunctions {
return ((workflow, runExecutionData, connectionInputData, inputData, node, itemIndex) => {
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
...executionCancellationFunctions(abortSignal),
continueOnFail: () => continueOnFail(node),
evaluateExpression: (expression: string, evaluateItemIndex: number | undefined) => {
evaluateItemIndex = evaluateItemIndex === undefined ? itemIndex : evaluateItemIndex;
return workflow.expression.resolveSimpleParameterValue(
`=${expression}`,
{},
runExecutionData,
runIndex,
evaluateItemIndex,
node.name,
connectionInputData,
mode,
getAdditionalKeys(additionalData, mode, runExecutionData),
executeData,
);
},
getContext(type: ContextType): IContextObject {
return NodeHelpers.getContext(runExecutionData, type, node);
},
getCredentials: async (type) =>
await getCredentials(
workflow,
node,
type,
additionalData,
mode,
executeData,
runExecutionData,
runIndex,
connectionInputData,
itemIndex,
),
getInputData: (inputIndex = 0, inputName = 'main') => {
if (!inputData.hasOwnProperty(inputName)) {
// Return empty array because else it would throw error when nothing is connected to input
return { json: {} };
}
// TODO: Check if nodeType has input with that index defined
if (inputData[inputName].length < inputIndex) {
throw new ApplicationError('Could not get input index', {
extra: { inputIndex, inputName },
});
}
const allItems = inputData[inputName][inputIndex];
if (allItems === null) {
throw new ApplicationError('Input index was not set', {
extra: { inputIndex, inputName },
});
}
if (allItems[itemIndex] === null) {
throw new ApplicationError('Value of input with given index was not set', {
extra: { inputIndex, inputName, itemIndex },
});
}
return allItems[itemIndex];
},
getInputSourceData: (inputIndex = 0, inputName = 'main') => {
if (executeData?.source === null) {
// Should never happen as n8n sets it automatically
throw new ApplicationError('Source data is missing');
}
return executeData.source[inputName][inputIndex] as ISourceData;
},
getItemIndex: () => itemIndex,
getMode: () => mode,
getExecuteData: () => executeData,
getNodeParameter: (
parameterName: string,
fallbackValue?: any,
options?: IGetNodeParameterOptions,
): NodeParameterValueType | object => {
return getNodeParameter(
workflow,
runExecutionData,
runIndex,
connectionInputData,
node,
parameterName,
itemIndex,
mode,
getAdditionalKeys(additionalData, mode, runExecutionData),
executeData,
fallbackValue,
options,
);
},
getWorkflowDataProxy: (): IWorkflowDataProxyData => {
const dataProxy = new WorkflowDataProxy(
workflow,
runExecutionData,
runIndex,
itemIndex,
node.name,
connectionInputData,
{},
mode,
getAdditionalKeys(additionalData, mode, runExecutionData),
executeData,
);
return dataProxy.getDataProxy();
},
helpers: {
createDeferredPromise,
returnJsonArray,
...getRequestHelperFunctions(
workflow,
node,
additionalData,
runExecutionData,
connectionInputData,
),
...getBinaryHelperFunctions(additionalData, workflow.id),
assertBinaryData: (propertyName, inputIndex = 0) =>
assertBinaryData(inputData, node, itemIndex, propertyName, inputIndex),
getBinaryDataBuffer: async (propertyName, inputIndex = 0) =>
await getBinaryDataBuffer(inputData, itemIndex, propertyName, inputIndex),
},
logAiEvent: async (eventName: AiEvent, msg: string) => {
return additionalData.logAiEvent(eventName, {
executionId: additionalData.executionId ?? 'unsaved-execution',
nodeName: node.name,
workflowName: workflow.name ?? 'Unnamed workflow',
nodeType: node.type,
workflowId: workflow.id ?? 'unsaved-workflow',
msg,
});
},
};
})(workflow, runExecutionData, connectionInputData, inputData, node, itemIndex);
return new ExecuteSingleContext(
workflow,
node,
additionalData,
runExecutionData,
runIndex,
connectionInputData,
inputData,
itemIndex,
executeData,
mode,
abortSignal,
);
}
export function getCredentialTestFunctions(): ICredentialTestFunctions {
@ -3871,10 +3736,7 @@ export function getExecuteHookFunctions(
})(workflow, node);
}
/**
* Returns the execute functions regular nodes have access to when webhook-function is defined.
*/
// TODO: check where it is used and make sure close functions are called
/** @deprecated */
export function getExecuteWebhookFunctions(
workflow: Workflow,
node: INode,
@ -3884,169 +3746,13 @@ export function getExecuteWebhookFunctions(
closeFunctions: CloseFunction[],
runExecutionData: IRunExecutionData | null,
): IWebhookFunctions {
return ((workflow: Workflow, node: INode, runExecutionData: IRunExecutionData | null) => {
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
getBodyData(): IDataObject {
if (additionalData.httpRequest === undefined) {
throw new ApplicationError('Request is missing');
}
return additionalData.httpRequest.body;
},
getCredentials: async (type) =>
await getCredentials(workflow, node, type, additionalData, mode),
getHeaderData(): IncomingHttpHeaders {
if (additionalData.httpRequest === undefined) {
throw new ApplicationError('Request is missing');
}
return additionalData.httpRequest.headers;
},
async getInputConnectionData(
inputName: NodeConnectionType,
itemIndex: number,
): Promise<unknown> {
// To be able to use expressions like "$json.sessionId" set the
// body data the webhook received to what is normally used for
// incoming node data.
const connectionInputData: INodeExecutionData[] = [
{ json: additionalData.httpRequest?.body || {} },
];
const runExecutionData: IRunExecutionData = {
resultData: {
runData: {},
},
};
const executeData: IExecuteData = {
data: {
main: [connectionInputData],
},
node,
source: null,
};
const runIndex = 0;
return await getInputConnectionData(
this,
workflow,
runExecutionData,
runIndex,
connectionInputData,
additionalData,
executeData,
mode,
closeFunctions,
inputName,
itemIndex,
);
},
getMode: () => mode,
evaluateExpression: (expression: string, evaluateItemIndex?: number) => {
const itemIndex = evaluateItemIndex === undefined ? 0 : evaluateItemIndex;
const runIndex = 0;
let connectionInputData: INodeExecutionData[] = [];
let executionData: IExecuteData | undefined;
if (runExecutionData?.executionData !== undefined) {
executionData = runExecutionData.executionData.nodeExecutionStack[0];
if (executionData !== undefined) {
connectionInputData = executionData.data.main[0]!;
}
}
const additionalKeys = getAdditionalKeys(additionalData, mode, runExecutionData);
return workflow.expression.resolveSimpleParameterValue(
`=${expression}`,
{},
runExecutionData,
runIndex,
itemIndex,
node.name,
connectionInputData,
mode,
additionalKeys,
executionData,
);
},
getNodeParameter: (
parameterName: string,
fallbackValue?: any,
options?: IGetNodeParameterOptions,
): NodeParameterValueType | object => {
const itemIndex = 0;
const runIndex = 0;
let connectionInputData: INodeExecutionData[] = [];
let executionData: IExecuteData | undefined;
if (runExecutionData?.executionData !== undefined) {
executionData = runExecutionData.executionData.nodeExecutionStack[0];
if (executionData !== undefined) {
connectionInputData = executionData.data.main[0]!;
}
}
const additionalKeys = getAdditionalKeys(additionalData, mode, runExecutionData);
return getNodeParameter(
workflow,
runExecutionData,
runIndex,
connectionInputData,
node,
parameterName,
itemIndex,
mode,
additionalKeys,
executionData,
fallbackValue,
options,
);
},
getParamsData(): object {
if (additionalData.httpRequest === undefined) {
throw new ApplicationError('Request is missing');
}
return additionalData.httpRequest.params;
},
getQueryData(): object {
if (additionalData.httpRequest === undefined) {
throw new ApplicationError('Request is missing');
}
return additionalData.httpRequest.query;
},
getRequestObject(): Request {
if (additionalData.httpRequest === undefined) {
throw new ApplicationError('Request is missing');
}
return additionalData.httpRequest;
},
getResponseObject(): Response {
if (additionalData.httpResponse === undefined) {
throw new ApplicationError('Response is missing');
}
return additionalData.httpResponse;
},
getNodeWebhookUrl: (name: string): string | undefined =>
getNodeWebhookUrl(
name,
workflow,
node,
additionalData,
mode,
getAdditionalKeys(additionalData, mode, null),
),
getWebhookName: () => webhookData.webhookDescription.name,
helpers: {
createDeferredPromise,
...getRequestHelperFunctions(workflow, node, additionalData),
...getBinaryHelperFunctions(additionalData, workflow.id),
returnJsonArray,
},
nodeHelpers: getNodeHelperFunctions(additionalData, workflow.id),
};
})(workflow, node, runExecutionData);
return new WebhookContext(
workflow,
node,
additionalData,
mode,
webhookData,
closeFunctions,
runExecutionData,
);
}

View file

@ -0,0 +1,230 @@
import type {
ICredentialDataDecryptedObject,
IGetNodeParameterOptions,
INode,
INodeExecutionData,
IRunExecutionData,
IExecuteSingleFunctions,
IWorkflowExecuteAdditionalData,
Workflow,
WorkflowExecuteMode,
ITaskDataConnections,
IExecuteData,
ContextType,
AiEvent,
ISourceData,
} from 'n8n-workflow';
import {
ApplicationError,
createDeferredPromise,
NodeHelpers,
WorkflowDataProxy,
} from 'n8n-workflow';
import {
assertBinaryData,
continueOnFail,
getAdditionalKeys,
getBinaryDataBuffer,
getCredentials,
getNodeParameter,
returnJsonArray,
} from '@/NodeExecuteFunctions';
import { BaseContext } from './base-contexts';
import { BinaryHelpers } from './helpers/binary-helpers';
import { RequestHelpers } from './helpers/request-helpers';
export class ExecuteSingleContext extends BaseContext implements IExecuteSingleFunctions {
readonly helpers: IExecuteSingleFunctions['helpers'];
constructor(
workflow: Workflow,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
private readonly runExecutionData: IRunExecutionData,
private readonly runIndex: number,
private readonly connectionInputData: INodeExecutionData[],
private readonly inputData: ITaskDataConnections,
private readonly itemIndex: number,
private readonly executeData: IExecuteData,
private readonly mode: WorkflowExecuteMode,
private readonly abortSignal?: AbortSignal,
) {
super(workflow, node, additionalData);
const binaryHelpers = new BinaryHelpers(workflow, additionalData);
const requestHelpers = new RequestHelpers(this, workflow, node, additionalData);
this.helpers = {
createDeferredPromise: () => createDeferredPromise(),
returnJsonArray: (items) => returnJsonArray(items),
getBinaryPath: (id) => binaryHelpers.getBinaryPath(id),
getBinaryMetadata: (id) => binaryHelpers.getBinaryMetadata(id),
getBinaryStream: (id) => binaryHelpers.getBinaryStream(id),
binaryToBuffer: (body) => binaryHelpers.binaryToBuffer(body),
binaryToString: (body) => binaryHelpers.binaryToString(body),
prepareBinaryData: binaryHelpers.prepareBinaryData.bind(binaryHelpers),
setBinaryDataBuffer: binaryHelpers.setBinaryDataBuffer.bind(binaryHelpers),
copyBinaryFile: () => binaryHelpers.copyBinaryFile(),
assertBinaryData: (propertyName, inputIndex = 0) =>
assertBinaryData(inputData, node, itemIndex, propertyName, inputIndex),
getBinaryDataBuffer: async (propertyName, inputIndex = 0) =>
await getBinaryDataBuffer(inputData, itemIndex, propertyName, inputIndex),
httpRequest: requestHelpers.httpRequest.bind(requestHelpers),
httpRequestWithAuthentication:
requestHelpers.httpRequestWithAuthentication.bind(requestHelpers),
requestWithAuthenticationPaginated:
requestHelpers.requestWithAuthenticationPaginated.bind(requestHelpers),
request: requestHelpers.request.bind(requestHelpers),
requestWithAuthentication: requestHelpers.requestWithAuthentication.bind(requestHelpers),
requestOAuth1: requestHelpers.requestOAuth1.bind(requestHelpers),
requestOAuth2: requestHelpers.requestOAuth2.bind(requestHelpers),
};
}
getExecutionCancelSignal() {
return this.abortSignal;
}
onExecutionCancellation(handler: () => unknown) {
const fn = () => {
this.abortSignal?.removeEventListener('abort', fn);
handler();
};
this.abortSignal?.addEventListener('abort', fn);
}
continueOnFail() {
return continueOnFail(this.node);
}
evaluateExpression(expression: string, evaluateItemIndex: number | undefined) {
evaluateItemIndex = evaluateItemIndex === undefined ? this.itemIndex : evaluateItemIndex;
return this.workflow.expression.resolveSimpleParameterValue(
`=${expression}`,
{},
this.runExecutionData,
this.runIndex,
evaluateItemIndex,
this.node.name,
this.connectionInputData,
this.mode,
getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData),
this.executeData,
);
}
getContext(type: ContextType) {
return NodeHelpers.getContext(this.runExecutionData, type, this.node);
}
getInputData(inputIndex = 0, inputName = 'main') {
if (!this.inputData.hasOwnProperty(inputName)) {
// Return empty array because else it would throw error when nothing is connected to input
return { json: {} };
}
// TODO: Check if nodeType has input with that index defined
if (this.inputData[inputName].length < inputIndex) {
throw new ApplicationError('Could not get input index', {
extra: { inputIndex, inputName },
});
}
const allItems = this.inputData[inputName][inputIndex];
if (allItems === null) {
throw new ApplicationError('Input index was not set', {
extra: { inputIndex, inputName },
});
}
if (allItems[this.itemIndex] === null) {
throw new ApplicationError('Value of input with given index was not set', {
extra: { inputIndex, inputName, itemIndex: this.itemIndex },
});
}
return allItems[this.itemIndex];
}
getItemIndex() {
return this.itemIndex;
}
getNodeParameter(parameterName: string, fallbackValue?: any, options?: IGetNodeParameterOptions) {
return getNodeParameter(
this.workflow,
this.runExecutionData,
this.runIndex,
this.connectionInputData,
this.node,
parameterName,
this.itemIndex,
this.mode,
getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData),
this.executeData,
fallbackValue,
options,
);
}
async getCredentials<T extends object = ICredentialDataDecryptedObject>(type: string) {
return await getCredentials<T>(
this.workflow,
this.node,
type,
this.additionalData,
this.mode,
this.executeData,
this.runExecutionData,
this.runIndex,
this.connectionInputData,
this.itemIndex,
);
}
getMode() {
return this.mode;
}
getExecuteData() {
return this.executeData;
}
getWorkflowDataProxy() {
return new WorkflowDataProxy(
this.workflow,
this.runExecutionData,
this.runIndex,
this.itemIndex,
this.node.name,
this.connectionInputData,
{},
this.mode,
getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData),
this.executeData,
).getDataProxy();
}
getInputSourceData(inputIndex = 0, inputName = 'main'): ISourceData {
if (this.executeData?.source === null) {
// Should never happen as n8n sets it automatically
throw new ApplicationError('Source data is missing');
}
return this.executeData.source[inputName][inputIndex] as ISourceData;
}
logAiEvent(eventName: AiEvent, msg: string) {
return this.additionalData.logAiEvent(eventName, {
executionId: this.additionalData.executionId ?? 'unsaved-execution',
nodeName: this.node.name,
workflowName: this.workflow.name ?? 'Unnamed workflow',
nodeType: this.node.type,
workflowId: this.workflow.id ?? 'unsaved-workflow',
msg,
});
}
}

View file

@ -1,2 +1,4 @@
export { ExecuteSingleContext } from './execute-single-context';
export { PollContext } from './poll-context';
export { TriggerContext } from './trigger-context';
export { WebhookContext } from './webhook-context';

View file

@ -48,7 +48,7 @@ export class PollContext extends BaseContext implements IPollFunctions {
const binaryHelpers = new BinaryHelpers(workflow, additionalData);
const requestHelpers = new RequestHelpers(this, workflow, node, additionalData);
const schedulingHelepers = new SchedulingHelpers(workflow);
const schedulingHelpers = new SchedulingHelpers(workflow);
this.helpers = {
createDeferredPromise: () => createDeferredPromise(),
@ -73,7 +73,7 @@ export class PollContext extends BaseContext implements IPollFunctions {
requestOAuth1: requestHelpers.requestOAuth1.bind(requestHelpers),
requestOAuth2: requestHelpers.requestOAuth2.bind(requestHelpers),
registerCron: schedulingHelepers.registerCron.bind(schedulingHelepers),
registerCron: schedulingHelpers.registerCron.bind(schedulingHelpers),
};
}

View file

@ -49,7 +49,7 @@ export class TriggerContext extends BaseContext implements ITriggerFunctions {
const binaryHelpers = new BinaryHelpers(workflow, additionalData);
const requestHelpers = new RequestHelpers(this, workflow, node, additionalData);
const schedulingHelepers = new SchedulingHelpers(workflow);
const schedulingHelpers = new SchedulingHelpers(workflow);
const sshTunnelHelpers = new SSHTunnelHelpers();
// TODO: This is almost identical to the helpers in PollContext.
@ -76,7 +76,7 @@ export class TriggerContext extends BaseContext implements ITriggerFunctions {
requestOAuth1: requestHelpers.requestOAuth1.bind(requestHelpers),
requestOAuth2: requestHelpers.requestOAuth2.bind(requestHelpers),
registerCron: schedulingHelepers.registerCron.bind(schedulingHelepers),
registerCron: schedulingHelpers.registerCron.bind(schedulingHelpers),
getSSHClient: sshTunnelHelpers.getSSHClient.bind(sshTunnelHelpers),
};

View file

@ -0,0 +1,257 @@
import type { Request, Response } from 'express';
import type {
CloseFunction,
ICredentialDataDecryptedObject,
IDataObject,
IExecuteData,
IGetNodeParameterOptions,
INode,
INodeExecutionData,
IRunExecutionData,
IWebhookData,
IWebhookFunctions,
IWorkflowExecuteAdditionalData,
NodeConnectionType,
NodeParameterValueType,
Workflow,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { ApplicationError, createDeferredPromise } from 'n8n-workflow';
import {
copyBinaryFile,
getAdditionalKeys,
getCredentials,
getInputConnectionData,
getNodeParameter,
getNodeWebhookUrl,
returnJsonArray,
} from '@/NodeExecuteFunctions';
import { BaseContext } from './base-contexts';
import { BinaryHelpers } from './helpers/binary-helpers';
import { RequestHelpers } from './helpers/request-helpers';
export class WebhookContext extends BaseContext implements IWebhookFunctions {
readonly helpers: IWebhookFunctions['helpers'];
readonly nodeHelpers: IWebhookFunctions['nodeHelpers'];
constructor(
workflow: Workflow,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
private readonly mode: WorkflowExecuteMode,
private readonly webhookData: IWebhookData,
private readonly closeFunctions: CloseFunction[],
private readonly runExecutionData: IRunExecutionData | null,
) {
super(workflow, node, additionalData);
const binaryHelpers = new BinaryHelpers(workflow, additionalData);
const requestHelpers = new RequestHelpers(this, workflow, node, additionalData);
this.helpers = {
createDeferredPromise: () => createDeferredPromise(),
returnJsonArray: (items) => returnJsonArray(items),
getBinaryPath: (id) => binaryHelpers.getBinaryPath(id),
getBinaryMetadata: (id) => binaryHelpers.getBinaryMetadata(id),
getBinaryStream: (id) => binaryHelpers.getBinaryStream(id),
binaryToBuffer: (body) => binaryHelpers.binaryToBuffer(body),
binaryToString: (body) => binaryHelpers.binaryToString(body),
prepareBinaryData: binaryHelpers.prepareBinaryData.bind(binaryHelpers),
setBinaryDataBuffer: binaryHelpers.setBinaryDataBuffer.bind(binaryHelpers),
copyBinaryFile: () => binaryHelpers.copyBinaryFile(),
httpRequest: requestHelpers.httpRequest.bind(requestHelpers),
httpRequestWithAuthentication:
requestHelpers.httpRequestWithAuthentication.bind(requestHelpers),
requestWithAuthenticationPaginated:
requestHelpers.requestWithAuthenticationPaginated.bind(requestHelpers),
request: requestHelpers.request.bind(requestHelpers),
requestWithAuthentication: requestHelpers.requestWithAuthentication.bind(requestHelpers),
requestOAuth1: requestHelpers.requestOAuth1.bind(requestHelpers),
requestOAuth2: requestHelpers.requestOAuth2.bind(requestHelpers),
};
this.nodeHelpers = {
copyBinaryFile: async (filePath, fileName, mimeType) =>
await copyBinaryFile(
this.workflow.id,
this.additionalData.executionId!,
filePath,
fileName,
mimeType,
),
};
}
getMode() {
return this.mode;
}
async getCredentials<T extends object = ICredentialDataDecryptedObject>(type: string) {
return await getCredentials<T>(this.workflow, this.node, type, this.additionalData, this.mode);
}
getBodyData() {
if (this.additionalData.httpRequest === undefined) {
throw new ApplicationError('Request is missing');
}
return this.additionalData.httpRequest.body as IDataObject;
}
getHeaderData() {
if (this.additionalData.httpRequest === undefined) {
throw new ApplicationError('Request is missing');
}
return this.additionalData.httpRequest.headers;
}
getParamsData(): object {
if (this.additionalData.httpRequest === undefined) {
throw new ApplicationError('Request is missing');
}
return this.additionalData.httpRequest.params;
}
getQueryData(): object {
if (this.additionalData.httpRequest === undefined) {
throw new ApplicationError('Request is missing');
}
return this.additionalData.httpRequest.query;
}
getRequestObject(): Request {
if (this.additionalData.httpRequest === undefined) {
throw new ApplicationError('Request is missing');
}
return this.additionalData.httpRequest;
}
getResponseObject(): Response {
if (this.additionalData.httpResponse === undefined) {
throw new ApplicationError('Response is missing');
}
return this.additionalData.httpResponse;
}
getNodeWebhookUrl(name: string): string | undefined {
return getNodeWebhookUrl(
name,
this.workflow,
this.node,
this.additionalData,
this.mode,
getAdditionalKeys(this.additionalData, this.mode, null),
);
}
getWebhookName() {
return this.webhookData.webhookDescription.name;
}
async getInputConnectionData(inputName: NodeConnectionType, itemIndex: number): Promise<unknown> {
// To be able to use expressions like "$json.sessionId" set the
// body data the webhook received to what is normally used for
// incoming node data.
const connectionInputData: INodeExecutionData[] = [
{ json: this.additionalData.httpRequest?.body || {} },
];
const runExecutionData: IRunExecutionData = {
resultData: {
runData: {},
},
};
const executeData: IExecuteData = {
data: {
main: [connectionInputData],
},
node: this.node,
source: null,
};
const runIndex = 0;
return await getInputConnectionData(
this,
this.workflow,
runExecutionData,
runIndex,
connectionInputData,
this.additionalData,
executeData,
this.mode,
this.closeFunctions,
inputName,
itemIndex,
);
}
evaluateExpression(expression: string, evaluateItemIndex?: number) {
const itemIndex = evaluateItemIndex === undefined ? 0 : evaluateItemIndex;
const runIndex = 0;
let connectionInputData: INodeExecutionData[] = [];
let executionData: IExecuteData | undefined;
if (this.runExecutionData?.executionData !== undefined) {
executionData = this.runExecutionData.executionData.nodeExecutionStack[0];
if (executionData !== undefined) {
connectionInputData = executionData.data.main[0]!;
}
}
const additionalKeys = getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData);
return this.workflow.expression.resolveSimpleParameterValue(
`=${expression}`,
{},
this.runExecutionData,
runIndex,
itemIndex,
this.node.name,
connectionInputData,
this.mode,
additionalKeys,
executionData,
);
}
getNodeParameter(
parameterName: string,
fallbackValue?: any,
options?: IGetNodeParameterOptions,
): NodeParameterValueType | object {
const itemIndex = 0;
const runIndex = 0;
let connectionInputData: INodeExecutionData[] = [];
let executionData: IExecuteData | undefined;
if (this.runExecutionData?.executionData !== undefined) {
executionData = this.runExecutionData.executionData.nodeExecutionStack[0];
if (executionData !== undefined) {
connectionInputData = executionData.data.main[0]!;
}
}
const additionalKeys = getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData);
return getNodeParameter(
this.workflow,
this.runExecutionData,
runIndex,
connectionInputData,
this.node,
parameterName,
itemIndex,
this.mode,
additionalKeys,
executionData,
fallbackValue,
options,
);
}
}

View file

@ -438,13 +438,13 @@ export interface IGetExecuteFunctions {
export interface IGetExecuteSingleFunctions {
(
workflow: Workflow,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
runExecutionData: IRunExecutionData,
runIndex: number,
connectionInputData: INodeExecutionData[],
inputData: ITaskDataConnections,
node: INode,
itemIndex: number,
additionalData: IWorkflowExecuteAdditionalData,
executeData: IExecuteData,
mode: WorkflowExecuteMode,
abortSignal?: AbortSignal,
@ -938,7 +938,7 @@ type BaseExecutionFunctions = FunctionsBaseWithRequiredKeys<'getMode'> & {
getInputSourceData(inputIndex?: number, inputName?: string): ISourceData;
getExecutionCancelSignal(): AbortSignal | undefined;
onExecutionCancellation(handler: () => unknown): void;
logAiEvent(eventName: AiEvent, msg?: string | undefined): Promise<void>;
logAiEvent(eventName: AiEvent, msg?: string | undefined): void;
};
// TODO: Create later own type only for Config-Nodes

View file

@ -170,13 +170,13 @@ export class RoutingNode {
itemContext.push({
thisArgs: nodeExecuteFunctions.getExecuteSingleFunctions(
this.workflow,
this.node,
this.additionalData,
this.runExecutionData,
runIndex,
this.connectionInputData,
inputData,
this.node,
itemIndex,
this.additionalData,
executeData,
this.mode,
abortSignal,

View file

@ -2128,13 +2128,13 @@ describe('RoutingNode', () => {
const routingNodeExecutionContext = nodeExecuteFunctions.getExecuteSingleFunctions(
routingNode.workflow,
routingNode.node,
routingNode.additionalData,
routingNode.runExecutionData,
runIndex,
routingNode.connectionInputData,
inputData,
routingNode.node,
iteration,
routingNode.additionalData,
executeData,
routingNode.mode,
);