diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 80464f262d..a6900f1513 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -25,8 +25,6 @@ import axios from 'axios'; import crypto, { createHmac } from 'crypto'; import FileType from 'file-type'; import FormData from 'form-data'; -import { createReadStream } from 'fs'; -import { access as fsAccess, writeFile as fsWriteFile } from 'fs/promises'; import { IncomingMessage } from 'http'; import { Agent, type AgentOptions } from 'https'; import get from 'lodash/get'; @@ -36,26 +34,19 @@ import pick from 'lodash/pick'; import { DateTime } from 'luxon'; import { extension, lookup } from 'mime-types'; import type { - BinaryHelperFunctions, CloseFunction, - ContextType, FieldType, - FileSystemHelperFunctions, - FunctionsBase, GenericValue, IAdditionalCredentialOptions, IAllExecuteFunctions, IBinaryData, - IContextObject, ICredentialDataDecryptedObject, ICredentialTestFunctions, ICredentialsExpressionResolveValues, IDataObject, IExecuteData, IExecuteFunctions, - IExecuteResponsePromiseData, IExecuteSingleFunctions, - IExecuteWorkflowInfo, IGetNodeParameterOptions, IHookFunctions, IHttpRequestOptions, @@ -66,7 +57,6 @@ import type { INodeCredentialsDetails, INodeExecutionData, INodeInputConfiguration, - INodeOutputConfiguration, INodeProperties, INodePropertyCollection, INodePropertyOptions, @@ -83,29 +73,16 @@ import type { IWebhookDescription, IWebhookFunctions, IWorkflowDataProxyAdditionalKeys, - IWorkflowDataProxyData, IWorkflowExecuteAdditionalData, NodeExecutionWithMetadata, - NodeHelperFunctions, NodeParameterValueType, - NodeTypeAndVersion, PaginationOptions, - RequestHelperFunctions, Workflow, WorkflowActivateMode, WorkflowExecuteMode, - CallbackManager, INodeParameters, EnsureTypeOptions, SSHTunnelFunctions, - DeduplicationHelperFunctions, - IDeduplicationOutput, - IDeduplicationOutputItems, - ICheckProcessedOptions, - DeduplicationScope, - DeduplicationItemTypes, - ICheckProcessedContextData, - AiEvent, ISupplyDataFunctions, } from 'n8n-workflow'; import { @@ -116,8 +93,6 @@ import { NodeHelpers, NodeOperationError, NodeSslError, - WorkflowDataProxy, - createDeferredPromise, deepCopy, fileTypeFromMimeType, isObjectEmpty, @@ -134,24 +109,14 @@ import { Readable } from 'stream'; import Container from 'typedi'; import url, { URL, URLSearchParams } from 'url'; -import { createAgentStartJob } from './Agent'; import { BinaryDataService } from './BinaryData/BinaryData.service'; -import type { BinaryData } from './BinaryData/types'; import { binaryToBuffer } from './BinaryData/utils'; import { - BINARY_DATA_STORAGE_PATH, - BLOCK_FILE_ACCESS_TO_N8N_FILES, - CONFIG_FILES, - CUSTOM_EXTENSION_ENV, HTTP_REQUEST_NODE_TYPE, HTTP_REQUEST_TOOL_NODE_TYPE, PLACEHOLDER_EMPTY_EXECUTION_ID, - RESTRICT_FILE_ACCESS_TO, - UM_EMAIL_TEMPLATES_INVITE, - UM_EMAIL_TEMPLATES_PWRESET, } from './Constants'; import { createNodeAsTool } from './CreateNodeAsTool'; -import { DataDeduplicationService } from './data-deduplication-service'; import { getAllWorkflowExecutionMetadata, getWorkflowExecutionMetadata, @@ -159,7 +124,6 @@ import { setWorkflowExecutionMetadata, } from './ExecutionMetadata'; import { extractValue } from './ExtractValue'; -import { InstanceSettings } from './InstanceSettings'; import type { ExtendedValidationResult, IResponseError } from './Interfaces'; import { getSecretsProxy } from './Secrets'; import { SSHClientsManager } from './SSHClientsManager'; @@ -167,6 +131,8 @@ import { HookContext, PollContext } from './node-execution-context'; import { TriggerContext } from './node-execution-context/trigger-context'; import { ExecuteSingleContext } from './node-execution-context/execute-single-context'; import { WebhookContext } from './node-execution-context/webhook-context'; +import { ExecutionContext } from './node-execution-context/execute-context'; +import { SupplyDataContext } from './node-execution-context/supply-data-context'; axios.defaults.timeout = 300000; // Prevent axios from adding x-form-www-urlencoded headers by default @@ -1051,21 +1017,6 @@ export async function httpRequest( return result.data; } -// TODO: DELETE THIS -export function getBinaryPath(binaryDataId: string): string { - throw new Error('Not implemented'); -} - -// TODO: DELETE THIS -export async function getBinaryMetadata(binaryDataId: string): Promise { - throw new Error('Not implemented'); -} - -// TODO: DELETE THIS -export async function getBinaryStream(binaryDataId: string, chunkSize?: number): Promise { - throw new Error('Not implemented'); -} - // TODO: Move to BinaryHelpers export function assertBinaryData( inputData: ITaskDataConnections, @@ -1113,16 +1064,6 @@ export async function getBinaryDataBuffer( return await Container.get(BinaryDataService).getAsBuffer(binaryData); } -// TODO: Move to BinaryHelpers -export async function setBinaryDataBuffer( - binaryData: IBinaryData, - bufferOrStream: Buffer | Readable, - workflowId: string, - executionId: string, -): Promise { - throw new Error('Not implemented'); -} - // TODO: Move to BinaryHelpers export async function copyBinaryFile( workflowId: string, @@ -1183,85 +1124,6 @@ export async function copyBinaryFile( ); } -// TODO: Move to BinaryHelpers -export async function prepareBinaryData( - binaryData: Buffer | Readable, - executionId: string, - workflowId: string, - filePath?: string, - mimeType?: string, -): Promise { - throw new Error('Not implemented'); -} - -export async function checkProcessedAndRecord( - items: DeduplicationItemTypes[], - scope: DeduplicationScope, - contextData: ICheckProcessedContextData, - options: ICheckProcessedOptions, -): Promise { - return await DataDeduplicationService.getInstance().checkProcessedAndRecord( - items, - scope, - contextData, - options, - ); -} - -export async function checkProcessedItemsAndRecord( - key: string, - items: IDataObject[], - scope: DeduplicationScope, - contextData: ICheckProcessedContextData, - options: ICheckProcessedOptions, -): Promise { - return await DataDeduplicationService.getInstance().checkProcessedItemsAndRecord( - key, - items, - scope, - contextData, - options, - ); -} - -export async function removeProcessed( - items: DeduplicationItemTypes[], - scope: DeduplicationScope, - contextData: ICheckProcessedContextData, - options: ICheckProcessedOptions, -): Promise { - return await DataDeduplicationService.getInstance().removeProcessed( - items, - scope, - contextData, - options, - ); -} - -export async function clearAllProcessedItems( - scope: DeduplicationScope, - contextData: ICheckProcessedContextData, - options: ICheckProcessedOptions, -): Promise { - return await DataDeduplicationService.getInstance().clearAllProcessedItems( - scope, - contextData, - options, - ); -} - -export async function getProcessedDataCount( - scope: DeduplicationScope, - contextData: ICheckProcessedContextData, - options: ICheckProcessedOptions, -): Promise { - return await DataDeduplicationService.getInstance().getProcessedDataCount( - scope, - contextData, - options, - ); -} - export function applyPaginationRequestData( requestData: IRequestOptions, paginationRequestData: PaginationOptions['request'], @@ -1900,6 +1762,7 @@ export async function requestWithAuthentication( * Returns the additional keys for Expressions and Function-Nodes * */ +// TODO: Move to BaseContext export function getAdditionalKeys( additionalData: IWorkflowExecuteAdditionalData, mode: WorkflowExecuteMode, @@ -2599,8 +2462,9 @@ export function getWebhookDescription( return undefined; } +// TODO: Move to BaseExecutionContext // TODO: Change options to an object -const addExecutionDataFunctions = async ( +export const addExecutionDataFunctions = async ( type: 'input' | 'output', nodeName: string, data: INodeExecutionData[][] | ExecutionBaseError, @@ -2767,19 +2631,19 @@ export async function getInputConnectionData( ); // eslint-disable-next-line @typescript-eslint/no-use-before-define - const newContext = getSupplyDataFunctions( + const newContext = new SupplyDataContext( workflow, + node, + additionalData, + mode, runExecutionData, runIndex, connectionInputData, inputData, - connectedNode, - additionalData, executeData, - mode, closeFunctions, abortSignal, - ); + ) as unknown as ISupplyDataFunctions; if (!nodeType.supplyData) { if (nodeType.description.outputs.includes(NodeConnectionType.AiTool)) { @@ -2859,261 +2723,11 @@ export async function getInputConnectionData( : nodes.map((node) => node.response); } -const getCommonWorkflowFunctions = ( - workflow: Workflow, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, -): Omit => ({ - logger: Logger, - getExecutionId: () => additionalData.executionId!, - getNode: () => deepCopy(node), - getWorkflow: () => ({ - id: workflow.id, - name: workflow.name, - active: workflow.active, - }), - getWorkflowStaticData: (type) => workflow.getStaticData(type, node), - getChildNodes: (nodeName: string) => { - const output: NodeTypeAndVersion[] = []; - const nodes = workflow.getChildNodes(nodeName); - - for (const nodeName of nodes) { - const node = workflow.nodes[nodeName]; - output.push({ - name: node.name, - type: node.type, - typeVersion: node.typeVersion, - }); - } - return output; - }, - getParentNodes: (nodeName: string) => { - const output: NodeTypeAndVersion[] = []; - const nodes = workflow.getParentNodes(nodeName); - - for (const nodeName of nodes) { - const node = workflow.nodes[nodeName]; - output.push({ - name: node.name, - type: node.type, - typeVersion: node.typeVersion, - }); - } - return output; - }, - getKnownNodeTypes: () => workflow.nodeTypes.getKnownTypes(), - getRestApiUrl: () => additionalData.restApiUrl, - getInstanceBaseUrl: () => additionalData.instanceBaseUrl, - getInstanceId: () => Container.get(InstanceSettings).instanceId, - getTimezone: () => workflow.timezone, - getCredentialsProperties: (type: string) => - additionalData.credentialsHelper.getCredentialsProperties(type), - prepareOutputData: async (outputData) => [outputData], -}); - -const executionCancellationFunctions = ( - abortSignal?: AbortSignal, -): Pick => ({ - getExecutionCancelSignal: () => abortSignal, - onExecutionCancellation: (handler) => { - const fn = () => { - abortSignal?.removeEventListener('abort', fn); - handler(); - }; - abortSignal?.addEventListener('abort', fn); - }, -}); - -const getRequestHelperFunctions = ( - workflow: Workflow, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, - runExecutionData: IRunExecutionData | null = null, - connectionInputData: INodeExecutionData[] = [], -): RequestHelperFunctions => { - return {} as RequestHelperFunctions; -}; - const getSSHTunnelFunctions = (): SSHTunnelFunctions => ({ getSSHClient: async (credentials) => await Container.get(SSHClientsManager).getClient(credentials), }); -const getAllowedPaths = () => { - const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO]; - if (!restrictFileAccessTo) { - return []; - } - const allowedPaths = restrictFileAccessTo - .split(';') - .map((path) => path.trim()) - .filter((path) => path); - return allowedPaths; -}; - -export function isFilePathBlocked(filePath: string): boolean { - const allowedPaths = getAllowedPaths(); - const resolvedFilePath = path.resolve(filePath); - const blockFileAccessToN8nFiles = process.env[BLOCK_FILE_ACCESS_TO_N8N_FILES] !== 'false'; - - //if allowed paths are defined, allow access only to those paths - if (allowedPaths.length) { - for (const path of allowedPaths) { - if (resolvedFilePath.startsWith(path)) { - return false; - } - } - - return true; - } - - //restrict access to .n8n folder, ~/.cache/n8n/public, and other .env config related paths - if (blockFileAccessToN8nFiles) { - const { n8nFolder, staticCacheDir } = Container.get(InstanceSettings); - const restrictedPaths = [n8nFolder, staticCacheDir]; - - if (process.env[CONFIG_FILES]) { - restrictedPaths.push(...process.env[CONFIG_FILES].split(',')); - } - - if (process.env[CUSTOM_EXTENSION_ENV]) { - const customExtensionFolders = process.env[CUSTOM_EXTENSION_ENV].split(';'); - restrictedPaths.push(...customExtensionFolders); - } - - if (process.env[BINARY_DATA_STORAGE_PATH]) { - restrictedPaths.push(process.env[BINARY_DATA_STORAGE_PATH]); - } - - if (process.env[UM_EMAIL_TEMPLATES_INVITE]) { - restrictedPaths.push(process.env[UM_EMAIL_TEMPLATES_INVITE]); - } - - if (process.env[UM_EMAIL_TEMPLATES_PWRESET]) { - restrictedPaths.push(process.env[UM_EMAIL_TEMPLATES_PWRESET]); - } - - //check if the file path is restricted - for (const path of restrictedPaths) { - if (resolvedFilePath.startsWith(path)) { - return true; - } - } - } - - //path is not restricted - return false; -} - -const getFileSystemHelperFunctions = (node: INode): FileSystemHelperFunctions => ({ - async createReadStream(filePath) { - try { - await fsAccess(filePath); - } catch (error) { - throw error.code === 'ENOENT' - ? new NodeOperationError(node, error, { - message: `The file "${String(filePath)}" could not be accessed.`, - level: 'warning', - }) - : error; - } - if (isFilePathBlocked(filePath as string)) { - const allowedPaths = getAllowedPaths(); - const message = allowedPaths.length ? ` Allowed paths: ${allowedPaths.join(', ')}` : ''; - throw new NodeOperationError(node, `Access to the file is not allowed.${message}`, { - level: 'warning', - }); - } - return createReadStream(filePath); - }, - - getStoragePath() { - return path.join(Container.get(InstanceSettings).n8nFolder, `storage/${node.type}`); - }, - - async writeContentToFile(filePath, content, flag) { - if (isFilePathBlocked(filePath as string)) { - throw new NodeOperationError(node, `The file "${String(filePath)}" is not writable.`, { - level: 'warning', - }); - } - return await fsWriteFile(filePath, content, { encoding: 'binary', flag }); - }, -}); - -const getNodeHelperFunctions = ( - { executionId }: IWorkflowExecuteAdditionalData, - workflowId: string, -): NodeHelperFunctions => ({ - copyBinaryFile: async (filePath, fileName, mimeType) => - await copyBinaryFile(workflowId, executionId!, filePath, fileName, mimeType), -}); - -/** @deprecated */ -const getBinaryHelperFunctions = ( - { executionId }: IWorkflowExecuteAdditionalData, - workflowId: string, -): BinaryHelperFunctions => ({ - getBinaryPath, - getBinaryStream, - getBinaryMetadata, - binaryToBuffer, - binaryToString, - prepareBinaryData: async (binaryData, filePath, mimeType) => - await prepareBinaryData(binaryData, executionId!, workflowId, filePath, mimeType), - setBinaryDataBuffer: async (data, binaryData) => - await setBinaryDataBuffer(data, binaryData, workflowId, executionId!), - copyBinaryFile: async () => { - throw new ApplicationError('`copyBinaryFile` has been removed. Please upgrade this node.'); - }, -}); - -const getCheckProcessedHelperFunctions = ( - workflow: Workflow, - node: INode, -): DeduplicationHelperFunctions => ({ - async checkProcessedAndRecord( - items: DeduplicationItemTypes[], - scope: DeduplicationScope, - options: ICheckProcessedOptions, - ): Promise { - return await checkProcessedAndRecord(items, scope, { node, workflow }, options); - }, - async checkProcessedItemsAndRecord( - propertyName: string, - items: IDataObject[], - scope: DeduplicationScope, - options: ICheckProcessedOptions, - ): Promise { - return await checkProcessedItemsAndRecord( - propertyName, - items, - scope, - { node, workflow }, - options, - ); - }, - async removeProcessed( - items: DeduplicationItemTypes[], - scope: DeduplicationScope, - options: ICheckProcessedOptions, - ): Promise { - return await removeProcessed(items, scope, { node, workflow }, options); - }, - async clearAllProcessedItems( - scope: DeduplicationScope, - options: ICheckProcessedOptions, - ): Promise { - return await clearAllProcessedItems(scope, { node, workflow }, options); - }, - async getProcessedDataCount( - scope: DeduplicationScope, - options: ICheckProcessedOptions, - ): Promise { - return await getProcessedDataCount(scope, { node, workflow }, options); - }, -}); - /** * Returns a copy of the items which only contains the json data and * of that only the defined properties @@ -3159,600 +2773,30 @@ export function getExecuteTriggerFunctions( */ export function getExecuteFunctions( workflow: Workflow, + node: INode, + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, runExecutionData: IRunExecutionData, runIndex: number, connectionInputData: INodeExecutionData[], inputData: ITaskDataConnections, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, executeData: IExecuteData, - mode: WorkflowExecuteMode, closeFunctions: CloseFunction[], abortSignal?: AbortSignal, ): IExecuteFunctions { - return ((workflow, runExecutionData, connectionInputData, inputData, node) => { - return { - ...getCommonWorkflowFunctions(workflow, node, additionalData), - ...executionCancellationFunctions(abortSignal), - getMode: () => mode, - getCredentials: async (type, itemIndex) => - await getCredentials( - workflow, - node, - type, - additionalData, - mode, - executeData, - runExecutionData, - runIndex, - connectionInputData, - itemIndex, - ), - getExecuteData: () => executeData, - continueOnFail: () => { - return continueOnFail(node); - }, - evaluateExpression(expression: string, itemIndex: number) { - return workflow.expression.resolveSimpleParameterValue( - `=${expression}`, - {}, - runExecutionData, - runIndex, - itemIndex, - node.name, - connectionInputData, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - ); - }, - async executeWorkflow( - workflowInfo: IExecuteWorkflowInfo, - inputData?: INodeExecutionData[], - parentCallbackManager?: CallbackManager, - ): Promise { - return await additionalData - .executeWorkflow(workflowInfo, additionalData, { - parentWorkflowId: workflow.id?.toString(), - inputData, - parentWorkflowSettings: workflow.settings, - node, - parentCallbackManager, - }) - .then( - async (result) => - await Container.get(BinaryDataService).duplicateBinaryData( - workflow.id, - additionalData.executionId!, - result, - ), - ); - }, - getContext(type: ContextType): IContextObject { - return NodeHelpers.getContext(runExecutionData, type, node); - }, - - async getInputConnectionData( - inputName: NodeConnectionType, - itemIndex: number, - ): Promise { - return await getInputConnectionData( - this, - workflow, - runExecutionData, - runIndex, - connectionInputData, - inputData, - additionalData, - executeData, - mode, - closeFunctions, - inputName, - itemIndex, - abortSignal, - ); - }, - - getNodeInputs(): INodeInputConfiguration[] { - const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - return NodeHelpers.getNodeInputs(workflow, node, nodeType.description).map((output) => { - if (typeof output === 'string') { - return { - type: output, - }; - } - return output; - }); - }, - getNodeOutputs(): INodeOutputConfiguration[] { - const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - return NodeHelpers.getNodeOutputs(workflow, node, nodeType.description).map((output) => { - if (typeof output === 'string') { - return { - type: output, - }; - } - return output; - }); - }, - getInputData: (inputIndex = 0, inputName = 'main') => { - if (!inputData.hasOwnProperty(inputName)) { - // Return empty array because else it would throw error when nothing is connected to input - return []; - } - - // TODO: Check if nodeType has input with that index defined - if (inputData[inputName].length < inputIndex) { - throw new ApplicationError('Could not get input with given index', { - extra: { inputIndex, inputName }, - }); - } - - if (inputData[inputName][inputIndex] === null) { - throw new ApplicationError('Value of input was not set', { - extra: { inputIndex, inputName }, - }); - } - - return inputData[inputName][inputIndex]; - }, - getInputSourceData: (inputIndex = 0, inputName = 'main') => { - if (executeData?.source === null) { - // Should never happen as n8n sets it automatically - throw new ApplicationError('Source data is missing'); - } - return executeData.source[inputName][inputIndex]; - }, - getNodeParameter: ( - parameterName: string, - itemIndex: number, - fallbackValue?: any, - options?: IGetNodeParameterOptions, - ): NodeParameterValueType | object => { - return getNodeParameter( - workflow, - runExecutionData, - runIndex, - connectionInputData, - node, - parameterName, - itemIndex, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - fallbackValue, - options, - ); - }, - getWorkflowDataProxy: (itemIndex: number): IWorkflowDataProxyData => { - const dataProxy = new WorkflowDataProxy( - workflow, - runExecutionData, - runIndex, - itemIndex, - node.name, - connectionInputData, - {}, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - ); - return dataProxy.getDataProxy(); - }, - async putExecutionToWait(waitTill: Date): Promise { - runExecutionData.waitTill = waitTill; - if (additionalData.setExecutionStatus) { - additionalData.setExecutionStatus('waiting'); - } - }, - logNodeOutput(...args: unknown[]): void { - if (mode === 'manual') { - // @ts-expect-error `args` is spreadable - this.sendMessageToUI(...args); - return; - } - - if (process.env.CODE_ENABLE_STDOUT === 'true') { - console.log(`[Workflow "${this.getWorkflow().id}"][Node "${node.name}"]`, ...args); - } - }, - sendMessageToUI(...args: any[]): void { - if (mode !== 'manual') { - return; - } - try { - if (additionalData.sendDataToUI) { - args = args.map((arg) => { - // prevent invalid dates from being logged as null - if (arg.isLuxonDateTime && arg.invalidReason) return { ...arg }; - - // log valid dates in human readable format, as in browser - if (arg.isLuxonDateTime) return new Date(arg.ts).toString(); - if (arg instanceof Date) return arg.toString(); - - return arg; - }); - - additionalData.sendDataToUI('sendConsoleMessage', { - source: `[Node: "${node.name}"]`, - messages: args, - }); - } - } catch (error) { - Logger.warn(`There was a problem sending message to UI: ${error.message}`); - } - }, - async sendResponse(response: IExecuteResponsePromiseData): Promise { - await additionalData.hooks?.executeHookFunctions('sendResponse', [response]); - }, - - addInputData( - connectionType: NodeConnectionType, - data: INodeExecutionData[][] | ExecutionBaseError, - ): { index: number } { - const nodeName = this.getNode().name; - let currentNodeRunIndex = 0; - if (runExecutionData.resultData.runData.hasOwnProperty(nodeName)) { - currentNodeRunIndex = runExecutionData.resultData.runData[nodeName].length; - } - - addExecutionDataFunctions( - 'input', - this.getNode().name, - data, - runExecutionData, - connectionType, - additionalData, - node.name, - runIndex, - currentNodeRunIndex, - ).catch((error) => { - Logger.warn( - `There was a problem logging input data of node "${this.getNode().name}": ${ - error.message - }`, - ); - }); - - return { index: currentNodeRunIndex }; - }, - addOutputData( - connectionType: NodeConnectionType, - currentNodeRunIndex: number, - data: INodeExecutionData[][] | ExecutionBaseError, - ): void { - addExecutionDataFunctions( - 'output', - this.getNode().name, - data, - runExecutionData, - connectionType, - additionalData, - node.name, - runIndex, - currentNodeRunIndex, - ).catch((error) => { - Logger.warn( - `There was a problem logging output data of node "${this.getNode().name}": ${ - error.message - }`, - ); - }); - }, - helpers: { - createDeferredPromise, - copyInputItems, - ...getRequestHelperFunctions( - workflow, - node, - additionalData, - runExecutionData, - connectionInputData, - ), - ...getSSHTunnelFunctions(), - ...getFileSystemHelperFunctions(node), - ...getBinaryHelperFunctions(additionalData, workflow.id), - ...getCheckProcessedHelperFunctions(workflow, node), - assertBinaryData: (itemIndex, propertyName) => - assertBinaryData(inputData, node, itemIndex, propertyName, 0), - getBinaryDataBuffer: async (itemIndex, propertyName) => - await getBinaryDataBuffer(inputData, itemIndex, propertyName, 0), - - returnJsonArray, - normalizeItems, - constructExecutionMetaData, - }, - nodeHelpers: getNodeHelperFunctions(additionalData, workflow.id), - logAiEvent: (eventName: AiEvent, msg: string) => { - return additionalData.logAiEvent(eventName, { - executionId: additionalData.executionId ?? 'unsaved-execution', - nodeName: node.name, - workflowName: workflow.name ?? 'Unnamed workflow', - nodeType: node.type, - workflowId: workflow.id ?? 'unsaved-workflow', - msg, - }); - }, - getParentCallbackManager: () => additionalData.parentCallbackManager, - startJob: createAgentStartJob( - additionalData, - inputData, - node, - workflow, - runExecutionData, - runIndex, - node.name, - connectionInputData, - {}, - mode, - executeData, - ), - }; - })(workflow, runExecutionData, connectionInputData, inputData, node) as IExecuteFunctions; -} - -export function getSupplyDataFunctions( - workflow: Workflow, - runExecutionData: IRunExecutionData, - runIndex: number, - connectionInputData: INodeExecutionData[], - inputData: ITaskDataConnections, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, - executeData: IExecuteData, - mode: WorkflowExecuteMode, - closeFunctions: CloseFunction[], - abortSignal?: AbortSignal, -): ISupplyDataFunctions { - return { - ...getCommonWorkflowFunctions(workflow, node, additionalData), - ...executionCancellationFunctions(abortSignal), - getMode: () => mode, - getCredentials: async (type, itemIndex) => - await getCredentials( - workflow, - node, - type, - additionalData, - mode, - executeData, - runExecutionData, - runIndex, - connectionInputData, - itemIndex, - ), - continueOnFail: () => continueOnFail(node), - evaluateExpression: (expression: string, itemIndex: number) => - workflow.expression.resolveSimpleParameterValue( - `=${expression}`, - {}, - runExecutionData, - runIndex, - itemIndex, - node.name, - connectionInputData, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - ), - executeWorkflow: async ( - workflowInfo: IExecuteWorkflowInfo, - inputData?: INodeExecutionData[], - parentCallbackManager?: CallbackManager, - ) => - await additionalData - .executeWorkflow(workflowInfo, additionalData, { - parentWorkflowId: workflow.id?.toString(), - inputData, - parentWorkflowSettings: workflow.settings, - node, - parentCallbackManager, - }) - .then( - async (result) => - await Container.get(BinaryDataService).duplicateBinaryData( - workflow.id, - additionalData.executionId!, - result, - ), - ), - getNodeOutputs() { - const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - return NodeHelpers.getNodeOutputs(workflow, node, nodeType.description).map((output) => { - if (typeof output === 'string') { - return { - type: output, - }; - } - return output; - }); - }, - async getInputConnectionData( - inputName: NodeConnectionType, - itemIndex: number, - ): Promise { - return await getInputConnectionData( - this, - workflow, - runExecutionData, - runIndex, - connectionInputData, - inputData, - additionalData, - executeData, - mode, - closeFunctions, - inputName, - itemIndex, - abortSignal, - ); - }, - getInputData: (inputIndex = 0, inputName = 'main') => { - if (!inputData.hasOwnProperty(inputName)) { - // Return empty array because else it would throw error when nothing is connected to input - return []; - } - - // TODO: Check if nodeType has input with that index defined - if (inputData[inputName].length < inputIndex) { - throw new ApplicationError('Could not get input with given index', { - extra: { inputIndex, inputName }, - }); - } - - if (inputData[inputName][inputIndex] === null) { - throw new ApplicationError('Value of input was not set', { - extra: { inputIndex, inputName }, - }); - } - - return inputData[inputName][inputIndex]; - }, - getNodeParameter: (( - parameterName: string, - itemIndex: number, - fallbackValue?: any, - options?: IGetNodeParameterOptions, - ) => - getNodeParameter( - workflow, - runExecutionData, - runIndex, - connectionInputData, - node, - parameterName, - itemIndex, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - fallbackValue, - options, - )) as ISupplyDataFunctions['getNodeParameter'], - getWorkflowDataProxy: (itemIndex: number) => - new WorkflowDataProxy( - workflow, - runExecutionData, - runIndex, - itemIndex, - node.name, - connectionInputData, - {}, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - ).getDataProxy(), - sendMessageToUI(...args: any[]): void { - if (mode !== 'manual') { - return; - } - try { - if (additionalData.sendDataToUI) { - args = args.map((arg) => { - // prevent invalid dates from being logged as null - if (arg.isLuxonDateTime && arg.invalidReason) return { ...arg }; - - // log valid dates in human readable format, as in browser - if (arg.isLuxonDateTime) return new Date(arg.ts).toString(); - if (arg instanceof Date) return arg.toString(); - - return arg; - }); - - additionalData.sendDataToUI('sendConsoleMessage', { - source: `[Node: "${node.name}"]`, - messages: args, - }); - } - } catch (error) { - Logger.warn(`There was a problem sending message to UI: ${error.message}`); - } - }, - logAiEvent: (eventName: AiEvent, msg: string) => - additionalData.logAiEvent(eventName, { - executionId: additionalData.executionId ?? 'unsaved-execution', - nodeName: node.name, - workflowName: workflow.name ?? 'Unnamed workflow', - nodeType: node.type, - workflowId: workflow.id ?? 'unsaved-workflow', - msg, - }), - addInputData( - connectionType: NodeConnectionType, - data: INodeExecutionData[][], - ): { index: number } { - const nodeName = this.getNode().name; - let currentNodeRunIndex = 0; - if (runExecutionData.resultData.runData.hasOwnProperty(nodeName)) { - currentNodeRunIndex = runExecutionData.resultData.runData[nodeName].length; - } - - addExecutionDataFunctions( - 'input', - this.getNode().name, - data, - runExecutionData, - connectionType, - additionalData, - node.name, - runIndex, - currentNodeRunIndex, - ).catch((error) => { - Logger.warn( - `There was a problem logging input data of node "${this.getNode().name}": ${ - error.message - }`, - ); - }); - - return { index: currentNodeRunIndex }; - }, - addOutputData( - connectionType: NodeConnectionType, - currentNodeRunIndex: number, - data: INodeExecutionData[][], - ): void { - addExecutionDataFunctions( - 'output', - this.getNode().name, - data, - runExecutionData, - connectionType, - additionalData, - node.name, - runIndex, - currentNodeRunIndex, - ).catch((error) => { - Logger.warn( - `There was a problem logging output data of node "${this.getNode().name}": ${ - error.message - }`, - ); - }); - }, - helpers: { - createDeferredPromise, - copyInputItems, - ...getRequestHelperFunctions( - workflow, - node, - additionalData, - runExecutionData, - connectionInputData, - ), - ...getSSHTunnelFunctions(), - ...getFileSystemHelperFunctions(node), - ...getBinaryHelperFunctions(additionalData, workflow.id), - ...getCheckProcessedHelperFunctions(workflow, node), - assertBinaryData: (itemIndex, propertyName) => - assertBinaryData(inputData, node, itemIndex, propertyName, 0), - getBinaryDataBuffer: async (itemIndex, propertyName) => - await getBinaryDataBuffer(inputData, itemIndex, propertyName, 0), - - returnJsonArray, - normalizeItems, - constructExecutionMetaData, - }, - }; + return new ExecutionContext( + workflow, + node, + additionalData, + mode, + runExecutionData, + runIndex, + connectionInputData, + inputData, + executeData, + closeFunctions, + abortSignal, + ) as unknown as IExecuteFunctions; } /** @@ -3762,26 +2806,26 @@ export function getExecuteSingleFunctions( workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, runExecutionData: IRunExecutionData, runIndex: number, connectionInputData: INodeExecutionData[], inputData: ITaskDataConnections, itemIndex: number, executeData: IExecuteData, - mode: WorkflowExecuteMode, abortSignal?: AbortSignal, ): IExecuteSingleFunctions { return new ExecuteSingleContext( workflow, node, additionalData, + mode, runExecutionData, runIndex, connectionInputData, inputData, itemIndex, executeData, - mode, abortSignal, ); } diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 23b88abd5b..bce1d0b76e 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -58,6 +58,7 @@ import { recreateNodeExecutionStack, handleCycles, } from './PartialExecutionUtils'; +import { ExecutionContext } from './node-execution-context/execute-context'; export class WorkflowExecute { private status: ExecutionStatus = 'new'; @@ -1219,16 +1220,16 @@ export class WorkflowExecute { const closeFunctions: CloseFunction[] = []; // Create a WorkflowDataProxy instance that we can get the data of the // item which did error - const executeFunctions = NodeExecuteFunctions.getExecuteFunctions( + const executeFunctions = new ExecutionContext( workflow, + executionData.node, + this.additionalData, + this.mode, this.runExecutionData, runIndex, [], executionData.data, - executionData.node, - this.additionalData, executionData, - this.mode, closeFunctions, this.abortController.signal, ); diff --git a/packages/core/src/node-execution-context/execute-context.ts b/packages/core/src/node-execution-context/execute-context.ts index e69de29bb2..801067e76d 100644 --- a/packages/core/src/node-execution-context/execute-context.ts +++ b/packages/core/src/node-execution-context/execute-context.ts @@ -0,0 +1,497 @@ +import type { + ICredentialDataDecryptedObject, + IGetNodeParameterOptions, + INode, + INodeExecutionData, + IExecuteFunctions, + IRunExecutionData, + IWorkflowExecuteAdditionalData, + Workflow, + WorkflowExecuteMode, + CloseFunction, + IExecuteData, + ITaskDataConnections, + CallbackManager, + IExecuteWorkflowInfo, + ContextType, + IContextObject, + NodeConnectionType, + INodeInputConfiguration, + INodeOutputConfiguration, + IWorkflowDataProxyData, + IExecuteResponsePromiseData, + ExecutionBaseError, + AiEvent, + DeduplicationHelperFunctions, + FileSystemHelperFunctions, +} from 'n8n-workflow'; +import { + ApplicationError, + createDeferredPromise, + NodeHelpers, + WorkflowDataProxy, +} from 'n8n-workflow'; + +import { + addExecutionDataFunctions, + assertBinaryData, + constructExecutionMetaData, + continueOnFail, + copyBinaryFile, + copyInputItems, + getAdditionalKeys, + getBinaryDataBuffer, + getCredentials, + getInputConnectionData, + getNodeParameter, + normalizeItems, + returnJsonArray, +} from '@/NodeExecuteFunctions'; +import { BaseContext } from './base-contexts'; +import { RequestHelpers } from './helpers/request-helpers'; +import { BinaryDataService } from '@/BinaryData/BinaryData.service'; +import Container from 'typedi'; +import { BinaryHelpers } from './helpers/binary-helpers'; +import { createAgentStartJob } from '@/Agent'; +import { SSHTunnelHelpers } from './helpers/ssh-tunnel-helpers'; +import { DeduplicationHelpers } from './helpers/deduplication-helpers'; +import { FileSystemHelpers } from './helpers/file-system-helpers'; + +export class ExecutionContext extends BaseContext implements IExecuteFunctions { + readonly helpers: IExecuteFunctions['helpers']; + readonly nodeHelpers: IExecuteFunctions['nodeHelpers']; + readonly startJob: IExecuteFunctions['startJob']; + + private readonly binaryDataService = Container.get(BinaryDataService); + + constructor( + workflow: Workflow, + node: INode, + additionalData: IWorkflowExecuteAdditionalData, + private readonly mode: WorkflowExecuteMode, + private readonly runExecutionData: IRunExecutionData, + private readonly runIndex: number, + private readonly connectionInputData: INodeExecutionData[], + private readonly inputData: ITaskDataConnections, + private readonly executeData: IExecuteData, + private readonly closeFunctions: CloseFunction[], + private readonly abortSignal?: AbortSignal, + ) { + super(workflow, node, additionalData); + + const binaryHelpers = new BinaryHelpers(workflow, additionalData); + const deduplicationHelpers = new DeduplicationHelpers(workflow, node); + const fileSystemHelpers = new FileSystemHelpers(node); + const requestHelpers = new RequestHelpers( + this as IExecuteFunctions, + workflow, + node, + additionalData, + ); + const sshTunnelHelpers = new SSHTunnelHelpers(); + + // TODO: extract out in a BaseExecutionContext + this.helpers = { + createDeferredPromise, + returnJsonArray, + + getBinaryPath: (id) => binaryHelpers.getBinaryPath(id), + getBinaryMetadata: (id) => binaryHelpers.getBinaryMetadata(id), + getBinaryStream: (id) => binaryHelpers.getBinaryStream(id), + binaryToBuffer: (body) => binaryHelpers.binaryToBuffer(body), + binaryToString: (body) => binaryHelpers.binaryToString(body), + prepareBinaryData: binaryHelpers.prepareBinaryData.bind(binaryHelpers), + setBinaryDataBuffer: binaryHelpers.setBinaryDataBuffer.bind(binaryHelpers), + copyBinaryFile: () => binaryHelpers.copyBinaryFile(), + assertBinaryData: (itemIndex, propertyName) => + assertBinaryData(inputData, node, itemIndex, propertyName, 0), + getBinaryDataBuffer: async (itemIndex, propertyName) => + await getBinaryDataBuffer(inputData, itemIndex, propertyName, 0), + + httpRequest: requestHelpers.httpRequest.bind(requestHelpers), + httpRequestWithAuthentication: + requestHelpers.httpRequestWithAuthentication.bind(requestHelpers), + requestWithAuthenticationPaginated: + requestHelpers.requestWithAuthenticationPaginated.bind(requestHelpers), + request: requestHelpers.request.bind(requestHelpers), + requestWithAuthentication: requestHelpers.requestWithAuthentication.bind(requestHelpers), + requestOAuth1: requestHelpers.requestOAuth1.bind(requestHelpers), + requestOAuth2: requestHelpers.requestOAuth2.bind(requestHelpers), + + getSSHClient: sshTunnelHelpers.getSSHClient.bind(sshTunnelHelpers), + + copyInputItems, + normalizeItems, + constructExecutionMetaData, + + checkProcessedAndRecord: + deduplicationHelpers.checkProcessedAndRecord.bind(deduplicationHelpers), + checkProcessedItemsAndRecord: + deduplicationHelpers.checkProcessedItemsAndRecord.bind(deduplicationHelpers), + removeProcessed: deduplicationHelpers.removeProcessed.bind(deduplicationHelpers), + clearAllProcessedItems: + deduplicationHelpers.clearAllProcessedItems.bind(deduplicationHelpers), + getProcessedDataCount: deduplicationHelpers.getProcessedDataCount.bind(deduplicationHelpers), + + createReadStream: fileSystemHelpers.createReadStream.bind(fileSystemHelpers), + getStoragePath: fileSystemHelpers.getStoragePath.bind(fileSystemHelpers), + writeContentToFile: fileSystemHelpers.writeContentToFile.bind(fileSystemHelpers), + }; + + this.nodeHelpers = { + copyBinaryFile: async (filePath, fileName, mimeType) => + await copyBinaryFile( + this.workflow.id, + this.additionalData.executionId!, + filePath, + fileName, + mimeType, + ), + }; + + this.startJob = createAgentStartJob( + additionalData, + inputData, + node, + workflow, + runExecutionData, + runIndex, + node.name, + connectionInputData, + {}, + mode, + executeData, + ); + } + + getMode() { + return this.mode; + } + + // TODO: extract out in a BaseExecutionContext + getExecutionCancelSignal() { + return this.abortSignal; + } + + // TODO: extract out in a BaseExecutionContext + onExecutionCancellation(handler: () => unknown) { + const fn = () => { + this.abortSignal?.removeEventListener('abort', fn); + handler(); + }; + this.abortSignal?.addEventListener('abort', fn); + } + + // TODO: This is identical to PollContext + async getCredentials( + type: string, + itemIndex?: number, + ) { + return await getCredentials( + this.workflow, + this.node, + type, + this.additionalData, + this.mode, + this.executeData, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + itemIndex, + ); + } + + getExecuteData() { + return this.executeData; + } + + continueOnFail() { + return continueOnFail(this.node); + } + + // TODO: Move to BaseContext + evaluateExpression(expression: string, itemIndex: number) { + return this.workflow.expression.resolveSimpleParameterValue( + `=${expression}`, + {}, + this.runExecutionData, + this.runIndex, + itemIndex, + // TODO: revert this back to `node.name` when we stop using `IExecuteFunctions` as the context object in AI nodes. + // https://linear.app/n8n/issue/CAT-269 + this.node.name, + this.connectionInputData, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + ); + } + + async executeWorkflow( + workflowInfo: IExecuteWorkflowInfo, + inputData?: INodeExecutionData[], + parentCallbackManager?: CallbackManager, + ): Promise { + return await this.additionalData + .executeWorkflow(workflowInfo, this.additionalData, { + parentWorkflowId: this.workflow.id?.toString(), + inputData, + parentWorkflowSettings: this.workflow.settings, + node: this.node, + parentCallbackManager, + }) + .then( + async (result) => + await this.binaryDataService.duplicateBinaryData( + this.workflow.id, + this.additionalData.executionId!, + result, + ), + ); + } + + // TODO: Move to BaseExecutionContext + getContext(type: ContextType): IContextObject { + return NodeHelpers.getContext(this.runExecutionData, type, this.node); + } + + async getInputConnectionData(inputName: NodeConnectionType, itemIndex: number): Promise { + // TODO: trim down the function signature + return await getInputConnectionData( + this as IExecuteFunctions, + this.workflow, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + this.inputData, + this.additionalData, + this.executeData, + this.mode, + this.closeFunctions, + inputName, + itemIndex, + ); + } + + getNodeInputs(): INodeInputConfiguration[] { + const nodeType = this.workflow.nodeTypes.getByNameAndVersion( + this.node.type, + this.node.typeVersion, + ); + // TODO: move NodeHelpers.getNodeInputs here (if possible) + return NodeHelpers.getNodeInputs(this.workflow, this.node, nodeType.description).map( + (output) => { + if (typeof output === 'string') { + return { + type: output, + }; + } + return output; + }, + ); + } + + getNodeOutputs(): INodeOutputConfiguration[] { + const nodeType = this.workflow.nodeTypes.getByNameAndVersion( + this.node.type, + this.node.typeVersion, + ); + return NodeHelpers.getNodeOutputs(this.workflow, this.node, nodeType.description).map( + (output) => { + if (typeof output === 'string') { + return { + type: output, + }; + } + return output; + }, + ); + } + + getInputData(inputIndex = 0, inputName = 'main') { + if (!this.inputData.hasOwnProperty(inputName)) { + // Return empty array because else it would throw error when nothing is connected to input + return []; + } + + // TODO: Check if nodeType has input with that index defined + if (this.inputData[inputName].length < inputIndex) { + throw new ApplicationError('Could not get input with given index', { + extra: { inputIndex, inputName }, + }); + } + + if (this.inputData[inputName][inputIndex] === null) { + throw new ApplicationError('Value of input was not set', { + extra: { inputIndex, inputName }, + }); + } + + return this.inputData[inputName][inputIndex]; + } + + getInputSourceData(inputIndex = 0, inputName = 'main') { + if (this.executeData?.source === null) { + // Should never happen as n8n sets it automatically + throw new ApplicationError('Source data is missing'); + } + return this.executeData.source[inputName][inputIndex]!; + } + + // TODO: Move to BaseContext + // @ts-expect-error Not sure how to fix this typing + getNodeParameter( + parameterName: string, + itemIndex: number, + fallbackValue?: any, + options?: IGetNodeParameterOptions, + ) { + return getNodeParameter( + this.workflow, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + this.node, + parameterName, + itemIndex, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + fallbackValue, + options, + ); + } + + // TODO: Move to BaseExecutionContext + getWorkflowDataProxy(itemIndex: number): IWorkflowDataProxyData { + const dataProxy = new WorkflowDataProxy( + this.workflow, + this.runExecutionData, + this.runIndex, + itemIndex, + this.node.name, + this.connectionInputData, + {}, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + ); + return dataProxy.getDataProxy(); + } + + async putExecutionToWait(waitTill: Date): Promise { + this.runExecutionData.waitTill = waitTill; + if (this.additionalData.setExecutionStatus) { + this.additionalData.setExecutionStatus('waiting'); + } + } + + logNodeOutput(...args: unknown[]): void { + if (this.mode === 'manual') { + this.sendMessageToUI(...args); + return; + } + + if (process.env.CODE_ENABLE_STDOUT === 'true') { + console.log(`[Workflow "${this.workflow.id}"][Node "${this.node.name}"]`, ...args); + } + } + + sendMessageToUI(...args: any[]): void { + if (this.mode !== 'manual') { + return; + } + + try { + if (this.additionalData.sendDataToUI) { + args = args.map((arg) => { + // prevent invalid dates from being logged as null + if (arg.isLuxonDateTime && arg.invalidReason) return { ...arg }; + + // log valid dates in human readable format, as in browser + if (arg.isLuxonDateTime) return new Date(arg.ts).toString(); + if (arg instanceof Date) return arg.toString(); + + return arg; + }); + + this.additionalData.sendDataToUI('sendConsoleMessage', { + source: `[Node: "${this.node.name}"]`, + messages: args, + }); + } + } catch (error) { + this.logger.warn(`There was a problem sending message to UI: ${error.message}`); + } + } + + async sendResponse(response: IExecuteResponsePromiseData): Promise { + await this.additionalData.hooks?.executeHookFunctions('sendResponse', [response]); + } + + addInputData( + connectionType: NodeConnectionType, + data: INodeExecutionData[][] | ExecutionBaseError, + ): { index: number } { + const nodeName = this.node.name; + let currentNodeRunIndex = 0; + if (this.runExecutionData.resultData.runData.hasOwnProperty(nodeName)) { + currentNodeRunIndex = this.runExecutionData.resultData.runData[nodeName].length; + } + + addExecutionDataFunctions( + 'input', + this.node.name, + data, + this.runExecutionData, + connectionType, + this.additionalData, + this.node.name, + this.runIndex, + currentNodeRunIndex, + ).catch((error) => { + this.logger.warn( + `There was a problem logging input data of node "${this.node.name}": ${error.message}`, + ); + }); + + return { index: currentNodeRunIndex }; + } + + addOutputData( + connectionType: NodeConnectionType, + currentNodeRunIndex: number, + data: INodeExecutionData[][] | ExecutionBaseError, + ): void { + addExecutionDataFunctions( + 'output', + this.node.name, + data, + this.runExecutionData, + connectionType, + this.additionalData, + this.node.name, + this.runIndex, + currentNodeRunIndex, + ).catch((error) => { + this.logger.warn( + `There was a problem logging output data of node "${this.node.name}": ${error.message}`, + ); + }); + } + + logAiEvent(eventName: AiEvent, msg: string) { + return this.additionalData.logAiEvent(eventName, { + executionId: this.additionalData.executionId ?? 'unsaved-execution', + nodeName: this.node.name, + workflowName: this.workflow.name ?? 'Unnamed workflow', + nodeType: this.node.type, + workflowId: this.workflow.id ?? 'unsaved-workflow', + msg, + }); + } + + getParentCallbackManager(): CallbackManager | undefined { + return this.additionalData.parentCallbackManager; + } +} diff --git a/packages/core/src/node-execution-context/execute-single-context.ts b/packages/core/src/node-execution-context/execute-single-context.ts index e92a30da4a..0f290ed888 100644 --- a/packages/core/src/node-execution-context/execute-single-context.ts +++ b/packages/core/src/node-execution-context/execute-single-context.ts @@ -41,13 +41,13 @@ export class ExecuteSingleContext extends BaseContext implements IExecuteSingleF workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, + private readonly mode: WorkflowExecuteMode, private readonly runExecutionData: IRunExecutionData, private readonly runIndex: number, private readonly connectionInputData: INodeExecutionData[], private readonly inputData: ITaskDataConnections, private readonly itemIndex: number, private readonly executeData: IExecuteData, - private readonly mode: WorkflowExecuteMode, private readonly abortSignal?: AbortSignal, ) { super(workflow, node, additionalData); @@ -84,10 +84,12 @@ export class ExecuteSingleContext extends BaseContext implements IExecuteSingleF }; } + // TODO: extract out in a BaseExecutionContext getExecutionCancelSignal() { return this.abortSignal; } + // TODO: extract out in a BaseExecutionContext onExecutionCancellation(handler: () => unknown) { const fn = () => { this.abortSignal?.removeEventListener('abort', fn); @@ -96,6 +98,7 @@ export class ExecuteSingleContext extends BaseContext implements IExecuteSingleF this.abortSignal?.addEventListener('abort', fn); } + // TODO: extract out in a BaseExecutionContext continueOnFail() { return continueOnFail(this.node); } @@ -171,6 +174,7 @@ export class ExecuteSingleContext extends BaseContext implements IExecuteSingleF ); } + // TODO: extract out in a BaseExecutionContext async getCredentials(type: string) { return await getCredentials( this.workflow, diff --git a/packages/core/src/node-execution-context/supply-data-context.ts b/packages/core/src/node-execution-context/supply-data-context.ts new file mode 100644 index 0000000000..ca414ed0a4 --- /dev/null +++ b/packages/core/src/node-execution-context/supply-data-context.ts @@ -0,0 +1,396 @@ +import type { + ICredentialDataDecryptedObject, + IGetNodeParameterOptions, + INode, + INodeExecutionData, + ISupplyDataFunctions, + IRunExecutionData, + IWorkflowExecuteAdditionalData, + Workflow, + WorkflowExecuteMode, + CloseFunction, + IExecuteData, + ITaskDataConnections, + IExecuteWorkflowInfo, + CallbackManager, + NodeConnectionType, + AiEvent, + DeduplicationHelperFunctions, + FileSystemHelperFunctions, +} from 'n8n-workflow'; +import { + ApplicationError, + createDeferredPromise, + NodeHelpers, + WorkflowDataProxy, +} from 'n8n-workflow'; + +import { + addExecutionDataFunctions, + continueOnFail, + getAdditionalKeys, + getCredentials, + getInputConnectionData, + getNodeParameter, + constructExecutionMetaData, + normalizeItems, + returnJsonArray, + assertBinaryData, + getBinaryDataBuffer, + copyInputItems, +} from '@/NodeExecuteFunctions'; +import { BaseContext } from './base-contexts'; +import { BinaryHelpers } from './helpers/binary-helpers'; +import { RequestHelpers } from './helpers/request-helpers'; +import Container from 'typedi'; +import { BinaryDataService } from '@/BinaryData/BinaryData.service'; +import { SSHTunnelHelpers } from './helpers/ssh-tunnel-helpers'; +import { DeduplicationHelpers } from './helpers/deduplication-helpers'; +import { FileSystemHelpers } from './helpers/file-system-helpers'; + +export class SupplyDataContext extends BaseContext implements ISupplyDataFunctions { + readonly helpers: ISupplyDataFunctions['helpers']; + + private readonly binaryDataService = Container.get(BinaryDataService); + + constructor( + workflow: Workflow, + node: INode, + additionalData: IWorkflowExecuteAdditionalData, + private readonly mode: WorkflowExecuteMode, + private readonly runExecutionData: IRunExecutionData, + private readonly runIndex: number, + private readonly connectionInputData: INodeExecutionData[], + private readonly inputData: ITaskDataConnections, + private readonly executeData: IExecuteData, + private readonly closeFunctions: CloseFunction[], + private readonly abortSignal?: AbortSignal, + ) { + super(workflow, node, additionalData); + + const binaryHelpers = new BinaryHelpers(workflow, additionalData); + const deduplicationHelpers = new DeduplicationHelpers(workflow, node); + const fileSystemHelpers = new FileSystemHelpers(node); + const requestHelpers = new RequestHelpers( + this as ISupplyDataFunctions, + workflow, + node, + additionalData, + ); + const sshTunnelHelpers = new SSHTunnelHelpers(); + + // TODO: extract out in a BaseExecutionContext + this.helpers = { + createDeferredPromise, + returnJsonArray, + + getBinaryPath: (id) => binaryHelpers.getBinaryPath(id), + getBinaryMetadata: (id) => binaryHelpers.getBinaryMetadata(id), + getBinaryStream: (id) => binaryHelpers.getBinaryStream(id), + binaryToBuffer: (body) => binaryHelpers.binaryToBuffer(body), + binaryToString: (body) => binaryHelpers.binaryToString(body), + prepareBinaryData: binaryHelpers.prepareBinaryData.bind(binaryHelpers), + setBinaryDataBuffer: binaryHelpers.setBinaryDataBuffer.bind(binaryHelpers), + copyBinaryFile: () => binaryHelpers.copyBinaryFile(), + assertBinaryData: (itemIndex, propertyName) => + assertBinaryData(inputData, node, itemIndex, propertyName, 0), + getBinaryDataBuffer: async (itemIndex, propertyName) => + await getBinaryDataBuffer(inputData, itemIndex, propertyName, 0), + + httpRequest: requestHelpers.httpRequest.bind(requestHelpers), + httpRequestWithAuthentication: + requestHelpers.httpRequestWithAuthentication.bind(requestHelpers), + requestWithAuthenticationPaginated: + requestHelpers.requestWithAuthenticationPaginated.bind(requestHelpers), + request: requestHelpers.request.bind(requestHelpers), + requestWithAuthentication: requestHelpers.requestWithAuthentication.bind(requestHelpers), + requestOAuth1: requestHelpers.requestOAuth1.bind(requestHelpers), + requestOAuth2: requestHelpers.requestOAuth2.bind(requestHelpers), + + getSSHClient: sshTunnelHelpers.getSSHClient.bind(sshTunnelHelpers), + + copyInputItems, + normalizeItems, + constructExecutionMetaData, + + checkProcessedAndRecord: + deduplicationHelpers.checkProcessedAndRecord.bind(deduplicationHelpers), + checkProcessedItemsAndRecord: + deduplicationHelpers.checkProcessedItemsAndRecord.bind(deduplicationHelpers), + removeProcessed: deduplicationHelpers.removeProcessed.bind(deduplicationHelpers), + clearAllProcessedItems: + deduplicationHelpers.clearAllProcessedItems.bind(deduplicationHelpers), + getProcessedDataCount: deduplicationHelpers.getProcessedDataCount.bind(deduplicationHelpers), + + createReadStream: fileSystemHelpers.createReadStream.bind(fileSystemHelpers), + getStoragePath: fileSystemHelpers.getStoragePath.bind(fileSystemHelpers), + writeContentToFile: fileSystemHelpers.writeContentToFile.bind(fileSystemHelpers), + }; + } + + // TODO: extract out in a BaseExecutionContext + getMode() { + return this.mode; + } + + // TODO: extract out in a BaseExecutionContext + getExecutionCancelSignal() { + return this.abortSignal; + } + + // TODO: extract out in a BaseExecutionContext + onExecutionCancellation(handler: () => unknown) { + const fn = () => { + this.abortSignal?.removeEventListener('abort', fn); + handler(); + }; + this.abortSignal?.addEventListener('abort', fn); + } + + // TODO: extract out in a BaseExecutionContext + continueOnFail() { + return continueOnFail(this.node); + } + + // TODO: extract out in a BaseExecutionContext + async getCredentials( + type: string, + itemIndex: number, + ) { + return await getCredentials( + this.workflow, + this.node, + type, + this.additionalData, + this.mode, + this.executeData, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + itemIndex, + ); + } + + // TODO: extract out in a BaseExecutionContext + evaluateExpression(expression: string, itemIndex: number) { + return this.workflow.expression.resolveSimpleParameterValue( + `=${expression}`, + {}, + this.runExecutionData, + this.runIndex, + itemIndex, + this.node.name, + this.connectionInputData, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + ); + } + + async executeWorkflow( + workflowInfo: IExecuteWorkflowInfo, + inputData?: INodeExecutionData[], + parentCallbackManager?: CallbackManager, + ) { + await this.additionalData + .executeWorkflow(workflowInfo, this.additionalData, { + parentWorkflowId: this.workflow.id?.toString(), + inputData, + parentWorkflowSettings: this.workflow.settings, + node: this.node, + parentCallbackManager, + }) + .then( + async (result) => + await this.binaryDataService.duplicateBinaryData( + this.workflow.id, + this.additionalData.executionId!, + result, + ), + ); + } + + getNodeOutputs() { + const nodeType = this.workflow.nodeTypes.getByNameAndVersion( + this.node.type, + this.node.typeVersion, + ); + return NodeHelpers.getNodeOutputs(this.workflow, this.node, nodeType.description).map( + (output) => { + if (typeof output === 'string') { + return { + type: output, + }; + } + return output; + }, + ); + } + + async getInputConnectionData(inputName: NodeConnectionType, itemIndex: number): Promise { + return await getInputConnectionData( + this as ISupplyDataFunctions, + this.workflow, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + this.inputData, + this.additionalData, + this.executeData, + this.mode, + this.closeFunctions, + inputName, + itemIndex, + this.abortSignal, + ); + } + + getInputData(inputIndex = 0, inputName = 'main') { + if (!this.inputData.hasOwnProperty(inputName)) { + // Return empty array because else it would throw error when nothing is connected to input + return []; + } + + // TODO: Check if nodeType has input with that index defined + if (this.inputData[inputName].length < inputIndex) { + throw new ApplicationError('Could not get input with given index', { + extra: { inputIndex, inputName }, + }); + } + + if (this.inputData[inputName][inputIndex] === null) { + throw new ApplicationError('Value of input was not set', { + extra: { inputIndex, inputName }, + }); + } + + return this.inputData[inputName][inputIndex]; + } + + // @ts-expect-error Not sure how to fix this typing + getNodeParameter( + parameterName: string, + itemIndex: number, + fallbackValue?: any, + options?: IGetNodeParameterOptions, + ) { + return getNodeParameter( + this.workflow, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + this.node, + parameterName, + itemIndex, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + fallbackValue, + options, + ) as ISupplyDataFunctions['getNodeParameter']; + } + + getWorkflowDataProxy(itemIndex: number) { + return new WorkflowDataProxy( + this.workflow, + this.runExecutionData, + this.runIndex, + itemIndex, + this.node.name, + this.connectionInputData, + {}, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + ).getDataProxy(); + } + + sendMessageToUI(...args: any[]): void { + if (this.mode !== 'manual') { + return; + } + try { + if (this.additionalData.sendDataToUI) { + args = args.map((arg) => { + // prevent invalid dates from being logged as null + if (arg.isLuxonDateTime && arg.invalidReason) return { ...arg }; + + // log valid dates in human readable format, as in browser + if (arg.isLuxonDateTime) return new Date(arg.ts).toString(); + if (arg instanceof Date) return arg.toString(); + + return arg; + }); + + this.additionalData.sendDataToUI('sendConsoleMessage', { + source: `[Node: "${this.node.name}"]`, + messages: args, + }); + } + } catch (error) { + this.logger.warn(`There was a problem sending message to UI: ${error.message}`); + } + } + + logAiEvent(eventName: AiEvent, msg: string) { + return this.additionalData.logAiEvent(eventName, { + executionId: this.additionalData.executionId ?? 'unsaved-execution', + nodeName: this.node.name, + workflowName: this.workflow.name ?? 'Unnamed workflow', + nodeType: this.node.type, + workflowId: this.workflow.id ?? 'unsaved-workflow', + msg, + }); + } + + addInputData( + connectionType: NodeConnectionType, + data: INodeExecutionData[][], + ): { index: number } { + const nodeName = this.node.name; + let currentNodeRunIndex = 0; + if (this.runExecutionData.resultData.runData.hasOwnProperty(nodeName)) { + currentNodeRunIndex = this.runExecutionData.resultData.runData[nodeName].length; + } + + addExecutionDataFunctions( + 'input', + this.node.name, + data, + this.runExecutionData, + connectionType, + this.additionalData, + this.node.name, + this.runIndex, + currentNodeRunIndex, + ).catch((error) => { + this.logger.warn( + `There was a problem logging input data of node "${this.node.name}": ${error.message}`, + ); + }); + + return { index: currentNodeRunIndex }; + } + + addOutputData( + connectionType: NodeConnectionType, + currentNodeRunIndex: number, + data: INodeExecutionData[][], + ): void { + addExecutionDataFunctions( + 'output', + this.node.name, + data, + this.runExecutionData, + connectionType, + this.additionalData, + this.node.name, + this.runIndex, + currentNodeRunIndex, + ).catch((error) => { + this.logger.warn( + `There was a problem logging output data of node "${this.node.name}": ${error.message}`, + ); + }); + } +} diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index c02f9737cd..fdae001c89 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -423,14 +423,14 @@ export interface IRunNodeResponse { export interface IGetExecuteFunctions { ( workflow: Workflow, + node: INode, + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, runExecutionData: IRunExecutionData, runIndex: number, connectionInputData: INodeExecutionData[], inputData: ITaskDataConnections, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, executeData: IExecuteData, - mode: WorkflowExecuteMode, closeFunctions: CloseFunction[], abortSignal?: AbortSignal, ): IExecuteFunctions; @@ -441,13 +441,13 @@ export interface IGetExecuteSingleFunctions { workflow: Workflow, node: INode, additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, runExecutionData: IRunExecutionData, runIndex: number, connectionInputData: INodeExecutionData[], inputData: ITaskDataConnections, itemIndex: number, executeData: IExecuteData, - mode: WorkflowExecuteMode, abortSignal?: AbortSignal, ): IExecuteSingleFunctions; } diff --git a/packages/workflow/src/RoutingNode.ts b/packages/workflow/src/RoutingNode.ts index 171415d879..cda3cd01f6 100644 --- a/packages/workflow/src/RoutingNode.ts +++ b/packages/workflow/src/RoutingNode.ts @@ -92,14 +92,14 @@ export class RoutingNode { const closeFunctions: CloseFunction[] = []; const executeFunctions = nodeExecuteFunctions.getExecuteFunctions( this.workflow, + this.node, + this.additionalData, + this.mode, this.runExecutionData, runIndex, this.connectionInputData, inputData, - this.node, - this.additionalData, executeData, - this.mode, closeFunctions, abortSignal, ); @@ -172,13 +172,13 @@ export class RoutingNode { this.workflow, this.node, this.additionalData, + this.mode, this.runExecutionData, runIndex, this.connectionInputData, inputData, itemIndex, executeData, - this.mode, abortSignal, ), requestData: { diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index f5a5d25a5b..052a50e36e 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -1372,14 +1372,14 @@ export class Workflow { const closeFunctions: CloseFunction[] = []; const context = nodeExecuteFunctions.getExecuteFunctions( this, + node, + additionalData, + mode, runExecutionData, runIndex, connectionInputData, inputData, - node, - additionalData, executionData, - mode, closeFunctions, abortSignal, ); diff --git a/packages/workflow/test/RoutingNode.test.ts b/packages/workflow/test/RoutingNode.test.ts index 1e06e0fdbe..6c491cdba9 100644 --- a/packages/workflow/test/RoutingNode.test.ts +++ b/packages/workflow/test/RoutingNode.test.ts @@ -2130,13 +2130,13 @@ describe('RoutingNode', () => { routingNode.workflow, routingNode.node, routingNode.additionalData, + routingNode.mode, routingNode.runExecutionData, runIndex, routingNode.connectionInputData, inputData, iteration, executeData, - routingNode.mode, ); currentItemIndex = routingNodeExecutionContext.getItemIndex();