From 89b4c5535c00a11b17f6abd52b22a3745dafdd1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Tue, 5 Nov 2024 20:54:54 +0100 Subject: [PATCH] refactor(core): Extract supply-date context out of NodeExecutionFunctions (no-changelog) --- packages/core/src/NodeExecuteFunctions.ts | 272 ++------------ .../helpers/deduplication-helpers.ts | 95 +++++ .../helpers/file-system-helpers.ts | 135 +++++++ .../core/src/node-execution-context/index.ts | 1 + .../supply-data-context.ts | 342 ++++++++++++++++++ 5 files changed, 594 insertions(+), 251 deletions(-) create mode 100644 packages/core/src/node-execution-context/helpers/deduplication-helpers.ts create mode 100644 packages/core/src/node-execution-context/helpers/file-system-helpers.ts create mode 100644 packages/core/src/node-execution-context/supply-data-context.ts diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index e50d049f59..924468e719 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -166,7 +166,13 @@ import { extractValue } from './ExtractValue'; import { InstanceSettings } from './InstanceSettings'; import type { ExtendedValidationResult, IResponseError } from './Interfaces'; // eslint-disable-next-line import/no-cycle -import { HookContext, PollContext, TriggerContext, WebhookContext } from './node-execution-context'; +import { + HookContext, + PollContext, + SupplyDataContext, + TriggerContext, + WebhookContext, +} from './node-execution-context'; import { getSecretsProxy } from './Secrets'; import { SSHClientsManager } from './SSHClientsManager'; @@ -2695,7 +2701,7 @@ export function getWebhookDescription( } // TODO: Change options to an object -const addExecutionDataFunctions = async ( +export const addExecutionDataFunctions = async ( type: 'input' | 'output', nodeName: string, data: INodeExecutionData[][] | ExecutionBaseError, @@ -3913,255 +3919,19 @@ export function getSupplyDataFunctions( closeFunctions: CloseFunction[], abortSignal?: AbortSignal, ): ISupplyDataFunctions { - return { - ...getCommonWorkflowFunctions(workflow, node, additionalData), - ...executionCancellationFunctions(abortSignal), - getMode: () => mode, - getCredentials: async (type, itemIndex) => - await getCredentials( - workflow, - node, - type, - additionalData, - mode, - executeData, - runExecutionData, - runIndex, - connectionInputData, - itemIndex, - ), - continueOnFail: () => continueOnFail(node), - evaluateExpression: (expression: string, itemIndex: number) => - workflow.expression.resolveSimpleParameterValue( - `=${expression}`, - {}, - runExecutionData, - runIndex, - itemIndex, - node.name, - connectionInputData, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - ), - executeWorkflow: async ( - workflowInfo: IExecuteWorkflowInfo, - inputData?: INodeExecutionData[], - parentCallbackManager?: CallbackManager, - ) => - await additionalData - .executeWorkflow(workflowInfo, additionalData, { - parentWorkflowId: workflow.id?.toString(), - inputData, - parentWorkflowSettings: workflow.settings, - node, - parentCallbackManager, - }) - .then( - async (result) => - await Container.get(BinaryDataService).duplicateBinaryData( - workflow.id, - additionalData.executionId!, - result, - ), - ), - getNodeOutputs() { - const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - return NodeHelpers.getNodeOutputs(workflow, node, nodeType.description).map((output) => { - if (typeof output === 'string') { - return { - type: output, - }; - } - return output; - }); - }, - async getInputConnectionData( - inputName: NodeConnectionType, - itemIndex: number, - ): Promise { - return await getInputConnectionData.call( - this, - workflow, - runExecutionData, - runIndex, - connectionInputData, - inputData, - additionalData, - executeData, - mode, - closeFunctions, - inputName, - itemIndex, - abortSignal, - ); - }, - getInputData: (inputIndex = 0, inputName = 'main') => { - if (!inputData.hasOwnProperty(inputName)) { - // Return empty array because else it would throw error when nothing is connected to input - return []; - } - - // TODO: Check if nodeType has input with that index defined - if (inputData[inputName].length < inputIndex) { - throw new ApplicationError('Could not get input with given index', { - extra: { inputIndex, inputName }, - }); - } - - if (inputData[inputName][inputIndex] === null) { - throw new ApplicationError('Value of input was not set', { - extra: { inputIndex, inputName }, - }); - } - - return inputData[inputName][inputIndex]; - }, - getNodeParameter: (( - parameterName: string, - itemIndex: number, - fallbackValue?: any, - options?: IGetNodeParameterOptions, - ) => - getNodeParameter( - workflow, - runExecutionData, - runIndex, - connectionInputData, - node, - parameterName, - itemIndex, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - fallbackValue, - options, - )) as ISupplyDataFunctions['getNodeParameter'], - getWorkflowDataProxy: (itemIndex: number) => - new WorkflowDataProxy( - workflow, - runExecutionData, - runIndex, - itemIndex, - node.name, - connectionInputData, - {}, - mode, - getAdditionalKeys(additionalData, mode, runExecutionData), - executeData, - ).getDataProxy(), - sendMessageToUI(...args: any[]): void { - if (mode !== 'manual') { - return; - } - try { - if (additionalData.sendDataToUI) { - args = args.map((arg) => { - // prevent invalid dates from being logged as null - if (arg.isLuxonDateTime && arg.invalidReason) return { ...arg }; - - // log valid dates in human readable format, as in browser - if (arg.isLuxonDateTime) return new Date(arg.ts).toString(); - if (arg instanceof Date) return arg.toString(); - - return arg; - }); - - additionalData.sendDataToUI('sendConsoleMessage', { - source: `[Node: "${node.name}"]`, - messages: args, - }); - } - } catch (error) { - Logger.warn(`There was a problem sending message to UI: ${error.message}`); - } - }, - logAiEvent: (eventName: AiEvent, msg: string) => - additionalData.logAiEvent(eventName, { - executionId: additionalData.executionId ?? 'unsaved-execution', - nodeName: node.name, - workflowName: workflow.name ?? 'Unnamed workflow', - nodeType: node.type, - workflowId: workflow.id ?? 'unsaved-workflow', - msg, - }), - addInputData( - connectionType: NodeConnectionType, - data: INodeExecutionData[][], - ): { index: number } { - const nodeName = this.getNode().name; - let currentNodeRunIndex = 0; - if (runExecutionData.resultData.runData.hasOwnProperty(nodeName)) { - currentNodeRunIndex = runExecutionData.resultData.runData[nodeName].length; - } - - addExecutionDataFunctions( - 'input', - this.getNode().name, - data, - runExecutionData, - connectionType, - additionalData, - node.name, - runIndex, - currentNodeRunIndex, - ).catch((error) => { - Logger.warn( - `There was a problem logging input data of node "${this.getNode().name}": ${ - error.message - }`, - ); - }); - - return { index: currentNodeRunIndex }; - }, - addOutputData( - connectionType: NodeConnectionType, - currentNodeRunIndex: number, - data: INodeExecutionData[][], - ): void { - addExecutionDataFunctions( - 'output', - this.getNode().name, - data, - runExecutionData, - connectionType, - additionalData, - node.name, - runIndex, - currentNodeRunIndex, - ).catch((error) => { - Logger.warn( - `There was a problem logging output data of node "${this.getNode().name}": ${ - error.message - }`, - ); - }); - }, - helpers: { - createDeferredPromise, - copyInputItems, - ...getRequestHelperFunctions( - workflow, - node, - additionalData, - runExecutionData, - connectionInputData, - ), - ...getSSHTunnelFunctions(), - ...getFileSystemHelperFunctions(node), - ...getBinaryHelperFunctions(additionalData, workflow.id), - ...getCheckProcessedHelperFunctions(workflow, node), - assertBinaryData: (itemIndex, propertyName) => - assertBinaryData(inputData, node, itemIndex, propertyName, 0), - getBinaryDataBuffer: async (itemIndex, propertyName) => - await getBinaryDataBuffer(inputData, itemIndex, propertyName, 0), - - returnJsonArray, - normalizeItems, - constructExecutionMetaData, - }, - }; + return new SupplyDataContext( + workflow, + node, + additionalData, + mode, + runExecutionData, + runIndex, + connectionInputData, + inputData, + executeData, + closeFunctions, + abortSignal, + ); } /** diff --git a/packages/core/src/node-execution-context/helpers/deduplication-helpers.ts b/packages/core/src/node-execution-context/helpers/deduplication-helpers.ts new file mode 100644 index 0000000000..f209c0f37f --- /dev/null +++ b/packages/core/src/node-execution-context/helpers/deduplication-helpers.ts @@ -0,0 +1,95 @@ +import type { + DeduplicationHelperFunctions, + DeduplicationItemTypes, + DeduplicationScope, + ICheckProcessedContextData, + ICheckProcessedOptions, + IDataObject, + IDeduplicationOutput, + IDeduplicationOutputItems, + INode, + Workflow, +} from 'n8n-workflow'; + +import { DataDeduplicationService } from '@/data-deduplication-service'; + +export class DeduplicationHelpers { + private readonly contextData: ICheckProcessedContextData; + + constructor(workflow: Workflow, node: INode) { + this.contextData = { node, workflow }; + } + + get exported(): DeduplicationHelperFunctions { + return { + checkProcessedAndRecord: this.checkProcessedAndRecord.bind(this), + checkProcessedItemsAndRecord: this.checkProcessedItemsAndRecord.bind(this), + removeProcessed: this.removeProcessed.bind(this), + clearAllProcessedItems: this.clearAllProcessedItems.bind(this), + getProcessedDataCount: this.getProcessedDataCount.bind(this), + }; + } + + async checkProcessedAndRecord( + items: DeduplicationItemTypes[], + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise { + return await DataDeduplicationService.getInstance().checkProcessedAndRecord( + items, + scope, + this.contextData, + options, + ); + } + + async checkProcessedItemsAndRecord( + propertyName: string, + items: IDataObject[], + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise { + return await DataDeduplicationService.getInstance().checkProcessedItemsAndRecord( + propertyName, + items, + scope, + this.contextData, + options, + ); + } + + async removeProcessed( + items: DeduplicationItemTypes[], + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise { + return await DataDeduplicationService.getInstance().removeProcessed( + items, + scope, + this.contextData, + options, + ); + } + + async clearAllProcessedItems( + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise { + return await DataDeduplicationService.getInstance().clearAllProcessedItems( + scope, + this.contextData, + options, + ); + } + + async getProcessedDataCount( + scope: DeduplicationScope, + options: ICheckProcessedOptions, + ): Promise { + return await DataDeduplicationService.getInstance().getProcessedDataCount( + scope, + this.contextData, + options, + ); + } +} diff --git a/packages/core/src/node-execution-context/helpers/file-system-helpers.ts b/packages/core/src/node-execution-context/helpers/file-system-helpers.ts new file mode 100644 index 0000000000..f4b6270582 --- /dev/null +++ b/packages/core/src/node-execution-context/helpers/file-system-helpers.ts @@ -0,0 +1,135 @@ +import { createReadStream } from 'fs'; +import { access as fsAccess, writeFile as fsWriteFile } from 'fs/promises'; +import type { FileSystemHelperFunctions, INode } from 'n8n-workflow'; +import { NodeOperationError } from 'n8n-workflow'; +import type { PathLike } from 'node:fs'; +import { join, resolve } from 'node:path'; +import type { Readable } from 'node:stream'; +import Container from 'typedi'; + +import { + BINARY_DATA_STORAGE_PATH, + BLOCK_FILE_ACCESS_TO_N8N_FILES, + CONFIG_FILES, + CUSTOM_EXTENSION_ENV, + RESTRICT_FILE_ACCESS_TO, + UM_EMAIL_TEMPLATES_INVITE, + UM_EMAIL_TEMPLATES_PWRESET, +} from '@/Constants'; +import { InstanceSettings } from '@/InstanceSettings'; + +export class FileSystemHelpers { + private readonly instanceSettings = Container.get(InstanceSettings); + + constructor(private readonly node: INode) {} + + get exported(): FileSystemHelperFunctions { + return { + createReadStream: this.createReadStream.bind(this), + getStoragePath: this.getStoragePath.bind(this), + writeContentToFile: this.writeContentToFile.bind(this), + }; + } + + async createReadStream(filePath: PathLike) { + try { + await fsAccess(filePath); + } catch (error) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + throw error.code === 'ENOENT' + ? new NodeOperationError(this.node, error as Error, { + message: `The file "${String(filePath)}" could not be accessed.`, + level: 'warning', + }) + : error; + } + if (this.isFilePathBlocked(filePath as string)) { + const allowedPaths = this.getAllowedPaths(); + const message = allowedPaths.length ? ` Allowed paths: ${allowedPaths.join(', ')}` : ''; + throw new NodeOperationError(this.node, `Access to the file is not allowed.${message}`, { + level: 'warning', + }); + } + return createReadStream(filePath); + } + + getStoragePath() { + return join(this.instanceSettings.n8nFolder, `storage/${this.node.type}`); + } + + async writeContentToFile(filePath: PathLike, content: string | Buffer | Readable, flag?: string) { + if (this.isFilePathBlocked(filePath as string)) { + throw new NodeOperationError(this.node, `The file "${String(filePath)}" is not writable.`, { + level: 'warning', + }); + } + return await fsWriteFile(filePath, content, { encoding: 'binary', flag }); + } + + // TODO: cache this in the constructor + private getAllowedPaths() { + const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO]; + if (!restrictFileAccessTo) { + return []; + } + const allowedPaths = restrictFileAccessTo + .split(';') + .map((path) => path.trim()) + .filter((path) => path); + return allowedPaths; + } + + private isFilePathBlocked(filePath: string): boolean { + const allowedPaths = this.getAllowedPaths(); + const resolvedFilePath = resolve(filePath); + const blockFileAccessToN8nFiles = process.env[BLOCK_FILE_ACCESS_TO_N8N_FILES] !== 'false'; + + //if allowed paths are defined, allow access only to those paths + if (allowedPaths.length) { + for (const path of allowedPaths) { + if (resolvedFilePath.startsWith(path)) { + return false; + } + } + + return true; + } + + //restrict access to .n8n folder, ~/.cache/n8n/public, and other .env config related paths + if (blockFileAccessToN8nFiles) { + const { n8nFolder, staticCacheDir } = this.instanceSettings; + const restrictedPaths = [n8nFolder, staticCacheDir]; + + if (process.env[CONFIG_FILES]) { + restrictedPaths.push(...process.env[CONFIG_FILES].split(',')); + } + + if (process.env[CUSTOM_EXTENSION_ENV]) { + const customExtensionFolders = process.env[CUSTOM_EXTENSION_ENV].split(';'); + restrictedPaths.push(...customExtensionFolders); + } + + if (process.env[BINARY_DATA_STORAGE_PATH]) { + restrictedPaths.push(process.env[BINARY_DATA_STORAGE_PATH]); + } + + if (process.env[UM_EMAIL_TEMPLATES_INVITE]) { + restrictedPaths.push(process.env[UM_EMAIL_TEMPLATES_INVITE]); + } + + if (process.env[UM_EMAIL_TEMPLATES_PWRESET]) { + restrictedPaths.push(process.env[UM_EMAIL_TEMPLATES_PWRESET]); + } + + //check if the file path is restricted + for (const path of restrictedPaths) { + if (resolvedFilePath.startsWith(path)) { + return true; + } + } + } + + //path is not restricted + return false; + } +} diff --git a/packages/core/src/node-execution-context/index.ts b/packages/core/src/node-execution-context/index.ts index a6397c60ce..73e0e62782 100644 --- a/packages/core/src/node-execution-context/index.ts +++ b/packages/core/src/node-execution-context/index.ts @@ -2,5 +2,6 @@ export { HookContext } from './hook-context'; export { LoadOptionsContext } from './load-options-context'; export { PollContext } from './poll-context'; +export { SupplyDataContext } from './supply-data-context'; export { TriggerContext } from './trigger-context'; export { WebhookContext } from './webhook-context'; diff --git a/packages/core/src/node-execution-context/supply-data-context.ts b/packages/core/src/node-execution-context/supply-data-context.ts new file mode 100644 index 0000000000..c1c77374a3 --- /dev/null +++ b/packages/core/src/node-execution-context/supply-data-context.ts @@ -0,0 +1,342 @@ +/* eslint-disable @typescript-eslint/no-unsafe-argument */ +/* eslint-disable @typescript-eslint/no-unsafe-return */ +/* eslint-disable @typescript-eslint/no-unsafe-member-access */ +import type { + ICredentialDataDecryptedObject, + IGetNodeParameterOptions, + INode, + INodeExecutionData, + ISupplyDataFunctions, + IRunExecutionData, + IWorkflowExecuteAdditionalData, + Workflow, + WorkflowExecuteMode, + CloseFunction, + IExecuteData, + ITaskDataConnections, + IExecuteWorkflowInfo, + CallbackManager, + NodeConnectionType, + AiEvent, +} from 'n8n-workflow'; +import { + ApplicationError, + createDeferredPromise, + NodeHelpers, + WorkflowDataProxy, +} from 'n8n-workflow'; +import Container from 'typedi'; + +import { BinaryDataService } from '@/BinaryData/BinaryData.service'; +// eslint-disable-next-line import/no-cycle +import { + addExecutionDataFunctions, + continueOnFail, + getAdditionalKeys, + getCredentials, + getInputConnectionData, + getNodeParameter, + constructExecutionMetaData, + normalizeItems, + returnJsonArray, + copyInputItems, +} from '@/NodeExecuteFunctions'; + +import { BinaryHelpers } from './helpers/binary-helpers'; +import { DeduplicationHelpers } from './helpers/deduplication-helpers'; +import { FileSystemHelpers } from './helpers/file-system-helpers'; +import { RequestHelpers } from './helpers/request-helpers'; +import { SSHTunnelHelpers } from './helpers/ssh-tunnel-helpers'; +import { NodeExecutionContext } from './node-execution-context'; + +export class SupplyDataContext extends NodeExecutionContext implements ISupplyDataFunctions { + readonly helpers: ISupplyDataFunctions['helpers']; + + private readonly binaryDataService = Container.get(BinaryDataService); + + constructor( + workflow: Workflow, + node: INode, + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, + private readonly runExecutionData: IRunExecutionData, + private readonly runIndex: number, + private readonly connectionInputData: INodeExecutionData[], + private readonly inputData: ITaskDataConnections, + private readonly executeData: IExecuteData, + private readonly closeFunctions: CloseFunction[], + private readonly abortSignal?: AbortSignal, + ) { + super(workflow, node, additionalData, mode); + + this.helpers = { + createDeferredPromise, + returnJsonArray, + copyInputItems, + normalizeItems, + constructExecutionMetaData, + ...new BinaryHelpers(workflow, additionalData).exported, + ...new RequestHelpers(this as ISupplyDataFunctions, workflow, node, additionalData).exported, + ...new SSHTunnelHelpers().exported, + ...new DeduplicationHelpers(workflow, node).exported, + ...new FileSystemHelpers(node).exported, + } as ISupplyDataFunctions['helpers']; + } + + getExecutionCancelSignal() { + return this.abortSignal; + } + + onExecutionCancellation(handler: () => unknown) { + const fn = () => { + this.abortSignal?.removeEventListener('abort', fn); + handler(); + }; + this.abortSignal?.addEventListener('abort', fn); + } + + continueOnFail() { + return continueOnFail(this.node); + } + + async getCredentials( + type: string, + itemIndex: number, + ) { + return await getCredentials( + this.workflow, + this.node, + type, + this.additionalData, + this.mode, + this.executeData, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + itemIndex, + ); + } + + evaluateExpression(expression: string, itemIndex: number) { + return this.workflow.expression.resolveSimpleParameterValue( + `=${expression}`, + {}, + this.runExecutionData, + this.runIndex, + itemIndex, + this.node.name, + this.connectionInputData, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + ); + } + + async executeWorkflow( + workflowInfo: IExecuteWorkflowInfo, + inputData?: INodeExecutionData[], + parentCallbackManager?: CallbackManager, + ) { + await this.additionalData + .executeWorkflow(workflowInfo, this.additionalData, { + parentWorkflowId: this.workflow.id?.toString(), + inputData, + parentWorkflowSettings: this.workflow.settings, + node: this.node, + parentCallbackManager, + }) + .then( + async (result) => + await this.binaryDataService.duplicateBinaryData( + this.workflow.id, + this.additionalData.executionId!, + result, + ), + ); + } + + getNodeOutputs() { + const nodeType = this.workflow.nodeTypes.getByNameAndVersion( + this.node.type, + this.node.typeVersion, + ); + return NodeHelpers.getNodeOutputs(this.workflow, this.node, nodeType.description).map( + (output) => { + if (typeof output === 'string') { + return { + type: output, + }; + } + return output; + }, + ); + } + + async getInputConnectionData(inputName: NodeConnectionType, itemIndex: number): Promise { + return await getInputConnectionData.call( + this as ISupplyDataFunctions, + this.workflow, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + this.inputData, + this.additionalData, + this.executeData, + this.mode, + this.closeFunctions, + inputName, + itemIndex, + this.abortSignal, + ); + } + + getInputData(inputIndex = 0, inputName = 'main') { + if (!this.inputData.hasOwnProperty(inputName)) { + // Return empty array because else it would throw error when nothing is connected to input + return []; + } + + // TODO: Check if nodeType has input with that index defined + if (this.inputData[inputName].length < inputIndex) { + throw new ApplicationError('Could not get input with given index', { + extra: { inputIndex, inputName }, + }); + } + + if (this.inputData[inputName][inputIndex] === null) { + throw new ApplicationError('Value of input was not set', { + extra: { inputIndex, inputName }, + }); + } + + return this.inputData[inputName][inputIndex]; + } + + // @ts-expect-error Not sure how to fix this typing + getNodeParameter( + parameterName: string, + itemIndex: number, + fallbackValue?: any, + options?: IGetNodeParameterOptions, + ) { + return getNodeParameter( + this.workflow, + this.runExecutionData, + this.runIndex, + this.connectionInputData, + this.node, + parameterName, + itemIndex, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + fallbackValue, + options, + ) as ISupplyDataFunctions['getNodeParameter']; + } + + getWorkflowDataProxy(itemIndex: number) { + return new WorkflowDataProxy( + this.workflow, + this.runExecutionData, + this.runIndex, + itemIndex, + this.node.name, + this.connectionInputData, + {}, + this.mode, + getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData), + this.executeData, + ).getDataProxy(); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + sendMessageToUI(...args: any[]): void { + if (this.mode !== 'manual') { + return; + } + try { + if (this.additionalData.sendDataToUI) { + args = args.map((arg) => { + // prevent invalid dates from being logged as null + if (arg.isLuxonDateTime && arg.invalidReason) return { ...arg }; + + // log valid dates in human readable format, as in browser + if (arg.isLuxonDateTime) return new Date(arg.ts).toString(); + if (arg instanceof Date) return arg.toString(); + + return arg; + }); + + this.additionalData.sendDataToUI('sendConsoleMessage', { + source: `[Node: "${this.node.name}"]`, + messages: args, + }); + } + } catch (error) { + this.logger.warn(`There was a problem sending message to UI: ${error.message}`); + } + } + + logAiEvent(eventName: AiEvent, msg: string) { + return this.additionalData.logAiEvent(eventName, { + executionId: this.additionalData.executionId ?? 'unsaved-execution', + nodeName: this.node.name, + workflowName: this.workflow.name ?? 'Unnamed workflow', + nodeType: this.node.type, + workflowId: this.workflow.id ?? 'unsaved-workflow', + msg, + }); + } + + addInputData( + connectionType: NodeConnectionType, + data: INodeExecutionData[][], + ): { index: number } { + const nodeName = this.node.name; + let currentNodeRunIndex = 0; + if (this.runExecutionData.resultData.runData.hasOwnProperty(nodeName)) { + currentNodeRunIndex = this.runExecutionData.resultData.runData[nodeName].length; + } + + addExecutionDataFunctions( + 'input', + this.node.name, + data, + this.runExecutionData, + connectionType, + this.additionalData, + this.node.name, + this.runIndex, + currentNodeRunIndex, + ).catch((error) => { + this.logger.warn( + `There was a problem logging input data of node "${this.node.name}": ${error.message}`, + ); + }); + + return { index: currentNodeRunIndex }; + } + + addOutputData( + connectionType: NodeConnectionType, + currentNodeRunIndex: number, + data: INodeExecutionData[][], + ): void { + addExecutionDataFunctions( + 'output', + this.node.name, + data, + this.runExecutionData, + connectionType, + this.additionalData, + this.node.name, + this.runIndex, + currentNodeRunIndex, + ).catch((error) => { + this.logger.warn( + `There was a problem logging output data of node "${this.node.name}": ${error.message}`, + ); + }); + } +}