refactor(core): Extract supply-date context out of NodeExecutionFunctions (no-changelog)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-11-05 20:54:54 +01:00
parent c1a7f68236
commit 89b4c5535c
No known key found for this signature in database
5 changed files with 594 additions and 251 deletions

View file

@ -166,7 +166,13 @@ import { extractValue } from './ExtractValue';
import { InstanceSettings } from './InstanceSettings';
import type { ExtendedValidationResult, IResponseError } from './Interfaces';
// eslint-disable-next-line import/no-cycle
import { HookContext, PollContext, TriggerContext, WebhookContext } from './node-execution-context';
import {
HookContext,
PollContext,
SupplyDataContext,
TriggerContext,
WebhookContext,
} from './node-execution-context';
import { getSecretsProxy } from './Secrets';
import { SSHClientsManager } from './SSHClientsManager';
@ -2695,7 +2701,7 @@ export function getWebhookDescription(
}
// TODO: Change options to an object
const addExecutionDataFunctions = async (
export const addExecutionDataFunctions = async (
type: 'input' | 'output',
nodeName: string,
data: INodeExecutionData[][] | ExecutionBaseError,
@ -3913,255 +3919,19 @@ export function getSupplyDataFunctions(
closeFunctions: CloseFunction[],
abortSignal?: AbortSignal,
): ISupplyDataFunctions {
return {
...getCommonWorkflowFunctions(workflow, node, additionalData),
...executionCancellationFunctions(abortSignal),
getMode: () => mode,
getCredentials: async (type, itemIndex) =>
await getCredentials(
workflow,
node,
type,
additionalData,
mode,
executeData,
runExecutionData,
runIndex,
connectionInputData,
itemIndex,
),
continueOnFail: () => continueOnFail(node),
evaluateExpression: (expression: string, itemIndex: number) =>
workflow.expression.resolveSimpleParameterValue(
`=${expression}`,
{},
runExecutionData,
runIndex,
itemIndex,
node.name,
connectionInputData,
mode,
getAdditionalKeys(additionalData, mode, runExecutionData),
executeData,
),
executeWorkflow: async (
workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[],
parentCallbackManager?: CallbackManager,
) =>
await additionalData
.executeWorkflow(workflowInfo, additionalData, {
parentWorkflowId: workflow.id?.toString(),
inputData,
parentWorkflowSettings: workflow.settings,
node,
parentCallbackManager,
})
.then(
async (result) =>
await Container.get(BinaryDataService).duplicateBinaryData(
workflow.id,
additionalData.executionId!,
result,
),
),
getNodeOutputs() {
const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);
return NodeHelpers.getNodeOutputs(workflow, node, nodeType.description).map((output) => {
if (typeof output === 'string') {
return {
type: output,
};
}
return output;
});
},
async getInputConnectionData(
inputName: NodeConnectionType,
itemIndex: number,
): Promise<unknown> {
return await getInputConnectionData.call(
this,
workflow,
runExecutionData,
runIndex,
connectionInputData,
inputData,
additionalData,
executeData,
mode,
closeFunctions,
inputName,
itemIndex,
abortSignal,
);
},
getInputData: (inputIndex = 0, inputName = 'main') => {
if (!inputData.hasOwnProperty(inputName)) {
// Return empty array because else it would throw error when nothing is connected to input
return [];
}
// TODO: Check if nodeType has input with that index defined
if (inputData[inputName].length < inputIndex) {
throw new ApplicationError('Could not get input with given index', {
extra: { inputIndex, inputName },
});
}
if (inputData[inputName][inputIndex] === null) {
throw new ApplicationError('Value of input was not set', {
extra: { inputIndex, inputName },
});
}
return inputData[inputName][inputIndex];
},
getNodeParameter: ((
parameterName: string,
itemIndex: number,
fallbackValue?: any,
options?: IGetNodeParameterOptions,
) =>
getNodeParameter(
workflow,
runExecutionData,
runIndex,
connectionInputData,
node,
parameterName,
itemIndex,
mode,
getAdditionalKeys(additionalData, mode, runExecutionData),
executeData,
fallbackValue,
options,
)) as ISupplyDataFunctions['getNodeParameter'],
getWorkflowDataProxy: (itemIndex: number) =>
new WorkflowDataProxy(
workflow,
runExecutionData,
runIndex,
itemIndex,
node.name,
connectionInputData,
{},
mode,
getAdditionalKeys(additionalData, mode, runExecutionData),
executeData,
).getDataProxy(),
sendMessageToUI(...args: any[]): void {
if (mode !== 'manual') {
return;
}
try {
if (additionalData.sendDataToUI) {
args = args.map((arg) => {
// prevent invalid dates from being logged as null
if (arg.isLuxonDateTime && arg.invalidReason) return { ...arg };
// log valid dates in human readable format, as in browser
if (arg.isLuxonDateTime) return new Date(arg.ts).toString();
if (arg instanceof Date) return arg.toString();
return arg;
});
additionalData.sendDataToUI('sendConsoleMessage', {
source: `[Node: "${node.name}"]`,
messages: args,
});
}
} catch (error) {
Logger.warn(`There was a problem sending message to UI: ${error.message}`);
}
},
logAiEvent: (eventName: AiEvent, msg: string) =>
additionalData.logAiEvent(eventName, {
executionId: additionalData.executionId ?? 'unsaved-execution',
nodeName: node.name,
workflowName: workflow.name ?? 'Unnamed workflow',
nodeType: node.type,
workflowId: workflow.id ?? 'unsaved-workflow',
msg,
}),
addInputData(
connectionType: NodeConnectionType,
data: INodeExecutionData[][],
): { index: number } {
const nodeName = this.getNode().name;
let currentNodeRunIndex = 0;
if (runExecutionData.resultData.runData.hasOwnProperty(nodeName)) {
currentNodeRunIndex = runExecutionData.resultData.runData[nodeName].length;
}
addExecutionDataFunctions(
'input',
this.getNode().name,
data,
runExecutionData,
connectionType,
additionalData,
node.name,
runIndex,
currentNodeRunIndex,
).catch((error) => {
Logger.warn(
`There was a problem logging input data of node "${this.getNode().name}": ${
error.message
}`,
);
});
return { index: currentNodeRunIndex };
},
addOutputData(
connectionType: NodeConnectionType,
currentNodeRunIndex: number,
data: INodeExecutionData[][],
): void {
addExecutionDataFunctions(
'output',
this.getNode().name,
data,
runExecutionData,
connectionType,
additionalData,
node.name,
runIndex,
currentNodeRunIndex,
).catch((error) => {
Logger.warn(
`There was a problem logging output data of node "${this.getNode().name}": ${
error.message
}`,
);
});
},
helpers: {
createDeferredPromise,
copyInputItems,
...getRequestHelperFunctions(
workflow,
node,
additionalData,
runExecutionData,
connectionInputData,
),
...getSSHTunnelFunctions(),
...getFileSystemHelperFunctions(node),
...getBinaryHelperFunctions(additionalData, workflow.id),
...getCheckProcessedHelperFunctions(workflow, node),
assertBinaryData: (itemIndex, propertyName) =>
assertBinaryData(inputData, node, itemIndex, propertyName, 0),
getBinaryDataBuffer: async (itemIndex, propertyName) =>
await getBinaryDataBuffer(inputData, itemIndex, propertyName, 0),
returnJsonArray,
normalizeItems,
constructExecutionMetaData,
},
};
return new SupplyDataContext(
workflow,
node,
additionalData,
mode,
runExecutionData,
runIndex,
connectionInputData,
inputData,
executeData,
closeFunctions,
abortSignal,
);
}
/**

View file

@ -0,0 +1,95 @@
import type {
DeduplicationHelperFunctions,
DeduplicationItemTypes,
DeduplicationScope,
ICheckProcessedContextData,
ICheckProcessedOptions,
IDataObject,
IDeduplicationOutput,
IDeduplicationOutputItems,
INode,
Workflow,
} from 'n8n-workflow';
import { DataDeduplicationService } from '@/data-deduplication-service';
export class DeduplicationHelpers {
private readonly contextData: ICheckProcessedContextData;
constructor(workflow: Workflow, node: INode) {
this.contextData = { node, workflow };
}
get exported(): DeduplicationHelperFunctions {
return {
checkProcessedAndRecord: this.checkProcessedAndRecord.bind(this),
checkProcessedItemsAndRecord: this.checkProcessedItemsAndRecord.bind(this),
removeProcessed: this.removeProcessed.bind(this),
clearAllProcessedItems: this.clearAllProcessedItems.bind(this),
getProcessedDataCount: this.getProcessedDataCount.bind(this),
};
}
async checkProcessedAndRecord(
items: DeduplicationItemTypes[],
scope: DeduplicationScope,
options: ICheckProcessedOptions,
): Promise<IDeduplicationOutput> {
return await DataDeduplicationService.getInstance().checkProcessedAndRecord(
items,
scope,
this.contextData,
options,
);
}
async checkProcessedItemsAndRecord(
propertyName: string,
items: IDataObject[],
scope: DeduplicationScope,
options: ICheckProcessedOptions,
): Promise<IDeduplicationOutputItems> {
return await DataDeduplicationService.getInstance().checkProcessedItemsAndRecord(
propertyName,
items,
scope,
this.contextData,
options,
);
}
async removeProcessed(
items: DeduplicationItemTypes[],
scope: DeduplicationScope,
options: ICheckProcessedOptions,
): Promise<void> {
return await DataDeduplicationService.getInstance().removeProcessed(
items,
scope,
this.contextData,
options,
);
}
async clearAllProcessedItems(
scope: DeduplicationScope,
options: ICheckProcessedOptions,
): Promise<void> {
return await DataDeduplicationService.getInstance().clearAllProcessedItems(
scope,
this.contextData,
options,
);
}
async getProcessedDataCount(
scope: DeduplicationScope,
options: ICheckProcessedOptions,
): Promise<number> {
return await DataDeduplicationService.getInstance().getProcessedDataCount(
scope,
this.contextData,
options,
);
}
}

View file

@ -0,0 +1,135 @@
import { createReadStream } from 'fs';
import { access as fsAccess, writeFile as fsWriteFile } from 'fs/promises';
import type { FileSystemHelperFunctions, INode } from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
import type { PathLike } from 'node:fs';
import { join, resolve } from 'node:path';
import type { Readable } from 'node:stream';
import Container from 'typedi';
import {
BINARY_DATA_STORAGE_PATH,
BLOCK_FILE_ACCESS_TO_N8N_FILES,
CONFIG_FILES,
CUSTOM_EXTENSION_ENV,
RESTRICT_FILE_ACCESS_TO,
UM_EMAIL_TEMPLATES_INVITE,
UM_EMAIL_TEMPLATES_PWRESET,
} from '@/Constants';
import { InstanceSettings } from '@/InstanceSettings';
export class FileSystemHelpers {
private readonly instanceSettings = Container.get(InstanceSettings);
constructor(private readonly node: INode) {}
get exported(): FileSystemHelperFunctions {
return {
createReadStream: this.createReadStream.bind(this),
getStoragePath: this.getStoragePath.bind(this),
writeContentToFile: this.writeContentToFile.bind(this),
};
}
async createReadStream(filePath: PathLike) {
try {
await fsAccess(filePath);
} catch (error) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
throw error.code === 'ENOENT'
? new NodeOperationError(this.node, error as Error, {
message: `The file "${String(filePath)}" could not be accessed.`,
level: 'warning',
})
: error;
}
if (this.isFilePathBlocked(filePath as string)) {
const allowedPaths = this.getAllowedPaths();
const message = allowedPaths.length ? ` Allowed paths: ${allowedPaths.join(', ')}` : '';
throw new NodeOperationError(this.node, `Access to the file is not allowed.${message}`, {
level: 'warning',
});
}
return createReadStream(filePath);
}
getStoragePath() {
return join(this.instanceSettings.n8nFolder, `storage/${this.node.type}`);
}
async writeContentToFile(filePath: PathLike, content: string | Buffer | Readable, flag?: string) {
if (this.isFilePathBlocked(filePath as string)) {
throw new NodeOperationError(this.node, `The file "${String(filePath)}" is not writable.`, {
level: 'warning',
});
}
return await fsWriteFile(filePath, content, { encoding: 'binary', flag });
}
// TODO: cache this in the constructor
private getAllowedPaths() {
const restrictFileAccessTo = process.env[RESTRICT_FILE_ACCESS_TO];
if (!restrictFileAccessTo) {
return [];
}
const allowedPaths = restrictFileAccessTo
.split(';')
.map((path) => path.trim())
.filter((path) => path);
return allowedPaths;
}
private isFilePathBlocked(filePath: string): boolean {
const allowedPaths = this.getAllowedPaths();
const resolvedFilePath = resolve(filePath);
const blockFileAccessToN8nFiles = process.env[BLOCK_FILE_ACCESS_TO_N8N_FILES] !== 'false';
//if allowed paths are defined, allow access only to those paths
if (allowedPaths.length) {
for (const path of allowedPaths) {
if (resolvedFilePath.startsWith(path)) {
return false;
}
}
return true;
}
//restrict access to .n8n folder, ~/.cache/n8n/public, and other .env config related paths
if (blockFileAccessToN8nFiles) {
const { n8nFolder, staticCacheDir } = this.instanceSettings;
const restrictedPaths = [n8nFolder, staticCacheDir];
if (process.env[CONFIG_FILES]) {
restrictedPaths.push(...process.env[CONFIG_FILES].split(','));
}
if (process.env[CUSTOM_EXTENSION_ENV]) {
const customExtensionFolders = process.env[CUSTOM_EXTENSION_ENV].split(';');
restrictedPaths.push(...customExtensionFolders);
}
if (process.env[BINARY_DATA_STORAGE_PATH]) {
restrictedPaths.push(process.env[BINARY_DATA_STORAGE_PATH]);
}
if (process.env[UM_EMAIL_TEMPLATES_INVITE]) {
restrictedPaths.push(process.env[UM_EMAIL_TEMPLATES_INVITE]);
}
if (process.env[UM_EMAIL_TEMPLATES_PWRESET]) {
restrictedPaths.push(process.env[UM_EMAIL_TEMPLATES_PWRESET]);
}
//check if the file path is restricted
for (const path of restrictedPaths) {
if (resolvedFilePath.startsWith(path)) {
return true;
}
}
}
//path is not restricted
return false;
}
}

View file

@ -2,5 +2,6 @@
export { HookContext } from './hook-context';
export { LoadOptionsContext } from './load-options-context';
export { PollContext } from './poll-context';
export { SupplyDataContext } from './supply-data-context';
export { TriggerContext } from './trigger-context';
export { WebhookContext } from './webhook-context';

View file

@ -0,0 +1,342 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unsafe-return */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import type {
ICredentialDataDecryptedObject,
IGetNodeParameterOptions,
INode,
INodeExecutionData,
ISupplyDataFunctions,
IRunExecutionData,
IWorkflowExecuteAdditionalData,
Workflow,
WorkflowExecuteMode,
CloseFunction,
IExecuteData,
ITaskDataConnections,
IExecuteWorkflowInfo,
CallbackManager,
NodeConnectionType,
AiEvent,
} from 'n8n-workflow';
import {
ApplicationError,
createDeferredPromise,
NodeHelpers,
WorkflowDataProxy,
} from 'n8n-workflow';
import Container from 'typedi';
import { BinaryDataService } from '@/BinaryData/BinaryData.service';
// eslint-disable-next-line import/no-cycle
import {
addExecutionDataFunctions,
continueOnFail,
getAdditionalKeys,
getCredentials,
getInputConnectionData,
getNodeParameter,
constructExecutionMetaData,
normalizeItems,
returnJsonArray,
copyInputItems,
} from '@/NodeExecuteFunctions';
import { BinaryHelpers } from './helpers/binary-helpers';
import { DeduplicationHelpers } from './helpers/deduplication-helpers';
import { FileSystemHelpers } from './helpers/file-system-helpers';
import { RequestHelpers } from './helpers/request-helpers';
import { SSHTunnelHelpers } from './helpers/ssh-tunnel-helpers';
import { NodeExecutionContext } from './node-execution-context';
export class SupplyDataContext extends NodeExecutionContext implements ISupplyDataFunctions {
readonly helpers: ISupplyDataFunctions['helpers'];
private readonly binaryDataService = Container.get(BinaryDataService);
constructor(
workflow: Workflow,
node: INode,
additionalData: IWorkflowExecuteAdditionalData,
mode: WorkflowExecuteMode,
private readonly runExecutionData: IRunExecutionData,
private readonly runIndex: number,
private readonly connectionInputData: INodeExecutionData[],
private readonly inputData: ITaskDataConnections,
private readonly executeData: IExecuteData,
private readonly closeFunctions: CloseFunction[],
private readonly abortSignal?: AbortSignal,
) {
super(workflow, node, additionalData, mode);
this.helpers = {
createDeferredPromise,
returnJsonArray,
copyInputItems,
normalizeItems,
constructExecutionMetaData,
...new BinaryHelpers(workflow, additionalData).exported,
...new RequestHelpers(this as ISupplyDataFunctions, workflow, node, additionalData).exported,
...new SSHTunnelHelpers().exported,
...new DeduplicationHelpers(workflow, node).exported,
...new FileSystemHelpers(node).exported,
} as ISupplyDataFunctions['helpers'];
}
getExecutionCancelSignal() {
return this.abortSignal;
}
onExecutionCancellation(handler: () => unknown) {
const fn = () => {
this.abortSignal?.removeEventListener('abort', fn);
handler();
};
this.abortSignal?.addEventListener('abort', fn);
}
continueOnFail() {
return continueOnFail(this.node);
}
async getCredentials<T extends object = ICredentialDataDecryptedObject>(
type: string,
itemIndex: number,
) {
return await getCredentials<T>(
this.workflow,
this.node,
type,
this.additionalData,
this.mode,
this.executeData,
this.runExecutionData,
this.runIndex,
this.connectionInputData,
itemIndex,
);
}
evaluateExpression(expression: string, itemIndex: number) {
return this.workflow.expression.resolveSimpleParameterValue(
`=${expression}`,
{},
this.runExecutionData,
this.runIndex,
itemIndex,
this.node.name,
this.connectionInputData,
this.mode,
getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData),
this.executeData,
);
}
async executeWorkflow(
workflowInfo: IExecuteWorkflowInfo,
inputData?: INodeExecutionData[],
parentCallbackManager?: CallbackManager,
) {
await this.additionalData
.executeWorkflow(workflowInfo, this.additionalData, {
parentWorkflowId: this.workflow.id?.toString(),
inputData,
parentWorkflowSettings: this.workflow.settings,
node: this.node,
parentCallbackManager,
})
.then(
async (result) =>
await this.binaryDataService.duplicateBinaryData(
this.workflow.id,
this.additionalData.executionId!,
result,
),
);
}
getNodeOutputs() {
const nodeType = this.workflow.nodeTypes.getByNameAndVersion(
this.node.type,
this.node.typeVersion,
);
return NodeHelpers.getNodeOutputs(this.workflow, this.node, nodeType.description).map(
(output) => {
if (typeof output === 'string') {
return {
type: output,
};
}
return output;
},
);
}
async getInputConnectionData(inputName: NodeConnectionType, itemIndex: number): Promise<unknown> {
return await getInputConnectionData.call(
this as ISupplyDataFunctions,
this.workflow,
this.runExecutionData,
this.runIndex,
this.connectionInputData,
this.inputData,
this.additionalData,
this.executeData,
this.mode,
this.closeFunctions,
inputName,
itemIndex,
this.abortSignal,
);
}
getInputData(inputIndex = 0, inputName = 'main') {
if (!this.inputData.hasOwnProperty(inputName)) {
// Return empty array because else it would throw error when nothing is connected to input
return [];
}
// TODO: Check if nodeType has input with that index defined
if (this.inputData[inputName].length < inputIndex) {
throw new ApplicationError('Could not get input with given index', {
extra: { inputIndex, inputName },
});
}
if (this.inputData[inputName][inputIndex] === null) {
throw new ApplicationError('Value of input was not set', {
extra: { inputIndex, inputName },
});
}
return this.inputData[inputName][inputIndex];
}
// @ts-expect-error Not sure how to fix this typing
getNodeParameter(
parameterName: string,
itemIndex: number,
fallbackValue?: any,
options?: IGetNodeParameterOptions,
) {
return getNodeParameter(
this.workflow,
this.runExecutionData,
this.runIndex,
this.connectionInputData,
this.node,
parameterName,
itemIndex,
this.mode,
getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData),
this.executeData,
fallbackValue,
options,
) as ISupplyDataFunctions['getNodeParameter'];
}
getWorkflowDataProxy(itemIndex: number) {
return new WorkflowDataProxy(
this.workflow,
this.runExecutionData,
this.runIndex,
itemIndex,
this.node.name,
this.connectionInputData,
{},
this.mode,
getAdditionalKeys(this.additionalData, this.mode, this.runExecutionData),
this.executeData,
).getDataProxy();
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sendMessageToUI(...args: any[]): void {
if (this.mode !== 'manual') {
return;
}
try {
if (this.additionalData.sendDataToUI) {
args = args.map((arg) => {
// prevent invalid dates from being logged as null
if (arg.isLuxonDateTime && arg.invalidReason) return { ...arg };
// log valid dates in human readable format, as in browser
if (arg.isLuxonDateTime) return new Date(arg.ts).toString();
if (arg instanceof Date) return arg.toString();
return arg;
});
this.additionalData.sendDataToUI('sendConsoleMessage', {
source: `[Node: "${this.node.name}"]`,
messages: args,
});
}
} catch (error) {
this.logger.warn(`There was a problem sending message to UI: ${error.message}`);
}
}
logAiEvent(eventName: AiEvent, msg: string) {
return this.additionalData.logAiEvent(eventName, {
executionId: this.additionalData.executionId ?? 'unsaved-execution',
nodeName: this.node.name,
workflowName: this.workflow.name ?? 'Unnamed workflow',
nodeType: this.node.type,
workflowId: this.workflow.id ?? 'unsaved-workflow',
msg,
});
}
addInputData(
connectionType: NodeConnectionType,
data: INodeExecutionData[][],
): { index: number } {
const nodeName = this.node.name;
let currentNodeRunIndex = 0;
if (this.runExecutionData.resultData.runData.hasOwnProperty(nodeName)) {
currentNodeRunIndex = this.runExecutionData.resultData.runData[nodeName].length;
}
addExecutionDataFunctions(
'input',
this.node.name,
data,
this.runExecutionData,
connectionType,
this.additionalData,
this.node.name,
this.runIndex,
currentNodeRunIndex,
).catch((error) => {
this.logger.warn(
`There was a problem logging input data of node "${this.node.name}": ${error.message}`,
);
});
return { index: currentNodeRunIndex };
}
addOutputData(
connectionType: NodeConnectionType,
currentNodeRunIndex: number,
data: INodeExecutionData[][],
): void {
addExecutionDataFunctions(
'output',
this.node.name,
data,
this.runExecutionData,
connectionType,
this.additionalData,
this.node.name,
this.runIndex,
currentNodeRunIndex,
).catch((error) => {
this.logger.warn(
`There was a problem logging output data of node "${this.node.name}": ${error.message}`,
);
});
}
}