From f6064ef278bb481a78942af3af9675f29e59045b Mon Sep 17 00:00:00 2001 From: Rhys Williams Date: Sun, 11 Sep 2022 16:42:09 +0200 Subject: [PATCH] fix(core & function nodes): Update function nodes to work with binary-data-mode 'filesystem'. (#3845) * Initial Fix * Self-Review #1 * Lint * Added support for FunctionItem. Minor updates. * Self-review * review comments. Added testing. * Self Review * Fixed memory handling on data manager use. * Fixes for unnecessary memory leaks. --- packages/core/src/BinaryDataManager/index.ts | 8 ++ packages/core/src/Interfaces.ts | 2 + packages/core/src/NodeExecuteFunctions.ts | 33 ++++- .../core/test/NodeExecuteFunctions.test.ts | 127 ++++++++++++++++++ .../nodes/Function/Function.node.ts | 48 +++++++ .../nodes/FunctionItem/FunctionItem.node.ts | 52 ++++++- packages/workflow/src/Interfaces.ts | 1 + 7 files changed, 267 insertions(+), 4 deletions(-) create mode 100644 packages/core/test/NodeExecuteFunctions.test.ts diff --git a/packages/core/src/BinaryDataManager/index.ts b/packages/core/src/BinaryDataManager/index.ts index b7b7c2a9b2..eeaeee407c 100644 --- a/packages/core/src/BinaryDataManager/index.ts +++ b/packages/core/src/BinaryDataManager/index.ts @@ -50,15 +50,23 @@ export class BinaryDataManager { ): Promise { const retBinaryData = binaryData; + // If a manager handles this binary, return the binary data with it's reference id. if (this.managers[this.binaryDataMode]) { return this.managers[this.binaryDataMode] .storeBinaryData(binaryBuffer, executionId) .then((filename) => { + // Add data manager reference id. retBinaryData.id = this.generateBinaryId(filename); + + // Prevent preserving data in memory if handled by a data manager. + retBinaryData.data = this.binaryDataMode; + + // Short-circuit return to prevent further actions. return retBinaryData; }); } + // Else fallback to storing this data in memory. retBinaryData.data = binaryBuffer.toString(BINARY_ENCODING); return binaryData; } diff --git a/packages/core/src/Interfaces.ts b/packages/core/src/Interfaces.ts index ba2989061e..783f1068fa 100644 --- a/packages/core/src/Interfaces.ts +++ b/packages/core/src/Interfaces.ts @@ -45,6 +45,7 @@ export interface IExecuteFunctions extends IExecuteFunctionsBase { mimeType?: string, ): Promise; getBinaryDataBuffer(itemIndex: number, propertyName: string): Promise; + setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise; request: (uriOrObject: string | IDataObject | any, options?: IDataObject) => Promise; // tslint:disable-line:no-any requestWithAuthentication( this: IAllExecuteFunctions, @@ -80,6 +81,7 @@ export interface IExecuteFunctions extends IExecuteFunctionsBase { export interface IExecuteSingleFunctions extends IExecuteSingleFunctionsBase { helpers: { getBinaryDataBuffer(propertyName: string, inputIndex?: number): Promise; + setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise; httpRequest(requestOptions: IHttpRequestOptions): Promise; // tslint:disable-line:no-any prepareBinaryData( binaryData: Buffer, diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 9f454c9ed3..3e621f7063 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -813,6 +813,22 @@ export async function getBinaryDataBuffer( return BinaryDataManager.getInstance().retrieveBinaryData(binaryData); } +/** + * Store an incoming IBinaryData & related buffer using the configured binary data manager. + * + * @export + * @param {IBinaryData} data + * @param {Buffer} binaryData + * @returns {Promise} + */ +export async function setBinaryDataBuffer( + data: IBinaryData, + binaryData: Buffer, + executionId: string, +): Promise { + return BinaryDataManager.getInstance().storeBinaryData(data, binaryData, executionId); +} + /** * Takes a buffer and converts it into the format n8n uses. It encodes the binary data as * base64 and adds metadata. @@ -882,7 +898,7 @@ export async function prepareBinaryData( } } - return BinaryDataManager.getInstance().storeBinaryData(returnData, binaryData, executionId); + return setBinaryDataBuffer(returnData, binaryData, executionId); } /** @@ -1950,6 +1966,9 @@ export function getExecutePollFunctions( }, helpers: { httpRequest, + async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise { + return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!); + }, async prepareBinaryData( binaryData: Buffer, filePath?: string, @@ -2121,6 +2140,9 @@ export function getExecuteTriggerFunctions( additionalCredentialOptions, ); }, + async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise { + return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!); + }, async prepareBinaryData( binaryData: Buffer, filePath?: string, @@ -2381,6 +2403,9 @@ export function getExecuteFunctions( additionalCredentialOptions, ); }, + async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise { + return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!); + }, async prepareBinaryData( binaryData: Buffer, filePath?: string, @@ -2624,6 +2649,9 @@ export function getExecuteSingleFunctions( additionalCredentialOptions, ); }, + async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise { + return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!); + }, async prepareBinaryData( binaryData: Buffer, filePath?: string, @@ -3121,6 +3149,9 @@ export function getExecuteWebhookFunctions( additionalCredentialOptions, ); }, + async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise { + return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!); + }, async prepareBinaryData( binaryData: Buffer, filePath?: string, diff --git a/packages/core/test/NodeExecuteFunctions.test.ts b/packages/core/test/NodeExecuteFunctions.test.ts new file mode 100644 index 0000000000..653ff4456c --- /dev/null +++ b/packages/core/test/NodeExecuteFunctions.test.ts @@ -0,0 +1,127 @@ +import { join } from 'path'; +import { tmpdir } from 'os'; +import { readFileSync, mkdtempSync } from 'fs'; + +import { BinaryDataManager, NodeExecuteFunctions } from '../src'; +import { IBinaryData, ITaskDataConnections } from 'n8n-workflow'; + +const temporaryDir = mkdtempSync(join(tmpdir(), 'n8n')); + +describe('NodeExecuteFunctions', () => { + describe(`test binary data helper methods`, () => { + // Reset BinaryDataManager for each run. This is a dirty operation, as individual managers are not cleaned. + beforeEach(() => { + //@ts-ignore + BinaryDataManager.instance = undefined; + }); + + test(`test getBinaryDataBuffer(...) & setBinaryDataBuffer(...) methods in 'default' mode`, async () => { + // Setup a 'default' binary data manager instance + await BinaryDataManager.init({ + mode: 'default', + availableModes: 'default', + localStoragePath: temporaryDir, + binaryDataTTL: 1, + persistedBinaryDataTTL: 1, + }); + + // Set our binary data buffer + let inputData: Buffer = Buffer.from('This is some binary data', 'utf8'); + let setBinaryDataBufferResponse: IBinaryData = await NodeExecuteFunctions.setBinaryDataBuffer( + { + mimeType: 'txt', + data: 'This should be overwritten by the actual payload in the response', + }, + inputData, + 'executionId', + ); + + // Expect our return object to contain the base64 encoding of the input data, as it should be stored in memory. + expect(setBinaryDataBufferResponse.data).toEqual(inputData.toString('base64')); + + // Now, re-fetch our data. + // An ITaskDataConnections object is used to share data between nodes. The top level property, 'main', represents the successful output object from a previous node. + let taskDataConnectionsInput: ITaskDataConnections = { + main: [], + }; + + // We add an input set, with one item at index 0, to this input. It contains an empty json payload and our binary data. + taskDataConnectionsInput.main.push([ + { + json: {}, + binary: { + data: setBinaryDataBufferResponse, + }, + }, + ]); + + // Now, lets fetch our data! The item will be item index 0. + let getBinaryDataBufferResponse: Buffer = await NodeExecuteFunctions.getBinaryDataBuffer( + taskDataConnectionsInput, + 0, + 'data', + 0, + ); + + expect(getBinaryDataBufferResponse).toEqual(inputData); + }); + + test(`test getBinaryDataBuffer(...) & setBinaryDataBuffer(...) methods in 'filesystem' mode`, async () => { + // Setup a 'filesystem' binary data manager instance + await BinaryDataManager.init({ + mode: 'filesystem', + availableModes: 'filesystem', + localStoragePath: temporaryDir, + binaryDataTTL: 1, + persistedBinaryDataTTL: 1, + }); + + // Set our binary data buffer + let inputData: Buffer = Buffer.from('This is some binary data', 'utf8'); + let setBinaryDataBufferResponse: IBinaryData = await NodeExecuteFunctions.setBinaryDataBuffer( + { + mimeType: 'txt', + data: 'This should be overwritten with the name of the configured data manager', + }, + inputData, + 'executionId', + ); + + // Expect our return object to contain the name of the configured data manager. + expect(setBinaryDataBufferResponse.data).toEqual('filesystem'); + + // Ensure that the input data was successfully persisted to disk. + expect( + readFileSync( + `${temporaryDir}/${setBinaryDataBufferResponse.id?.replace('filesystem:', '')}`, + ), + ).toEqual(inputData); + + // Now, re-fetch our data. + // An ITaskDataConnections object is used to share data between nodes. The top level property, 'main', represents the successful output object from a previous node. + let taskDataConnectionsInput: ITaskDataConnections = { + main: [], + }; + + // We add an input set, with one item at index 0, to this input. It contains an empty json payload and our binary data. + taskDataConnectionsInput.main.push([ + { + json: {}, + binary: { + data: setBinaryDataBufferResponse, + }, + }, + ]); + + // Now, lets fetch our data! The item will be item index 0. + let getBinaryDataBufferResponse: Buffer = await NodeExecuteFunctions.getBinaryDataBuffer( + taskDataConnectionsInput, + 0, + 'data', + 0, + ); + + expect(getBinaryDataBufferResponse).toEqual(inputData); + }); + }); +}); diff --git a/packages/nodes-base/nodes/Function/Function.node.ts b/packages/nodes-base/nodes/Function/Function.node.ts index 3976e962b0..ea2895a1e1 100644 --- a/packages/nodes-base/nodes/Function/Function.node.ts +++ b/packages/nodes-base/nodes/Function/Function.node.ts @@ -1,5 +1,6 @@ import { IExecuteFunctions } from 'n8n-core'; import { + IBinaryKeyData, IDataObject, INodeExecutionData, INodeType, @@ -61,6 +62,11 @@ return items;`, // Copy the items as they may get changed in the functions items = JSON.parse(JSON.stringify(items)); + // Assign item indexes + for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { + items[itemIndex].index = itemIndex; + } + const cleanupData = (inputData: IDataObject): IDataObject => { Object.keys(inputData).map((key) => { if (inputData[key] !== null && typeof inputData[key] === 'object') { @@ -84,6 +90,48 @@ return items;`, items, // To be able to access data of other items $item: (index: number) => this.getWorkflowDataProxy(index), + getBinaryDataAsync: async (item: INodeExecutionData): Promise => { + // Fetch Binary Data, if available. Cannot check item with `if (item?.index)`, as index may be 0. + if (item?.binary && item?.index !== undefined && item?.index !== null) { + for (const binaryPropertyName of Object.keys(item.binary)) { + item.binary[binaryPropertyName].data = ( + await this.helpers.getBinaryDataBuffer(item.index, binaryPropertyName) + )?.toString('base64'); + } + } + + // Return Data + return item.binary; + }, + setBinaryDataAsync: async (item: INodeExecutionData, data: IBinaryKeyData) => { + // Ensure item is provided, else return a friendly error. + if (!item) { + throw new NodeOperationError( + this.getNode(), + 'No item was provided to setBinaryDataAsync (item: INodeExecutionData, data: IBinaryKeyData).', + ); + } + + // Ensure data is provided, else return a friendly error. + if (!data) { + throw new NodeOperationError( + this.getNode(), + 'No data was provided to setBinaryDataAsync (item: INodeExecutionData, data: IBinaryKeyData).', + ); + } + + // Set Binary Data + for (const binaryPropertyName of Object.keys(data)) { + const binaryItem = data[binaryPropertyName]; + data[binaryPropertyName] = await this.helpers.setBinaryDataBuffer( + binaryItem, + Buffer.from(binaryItem.data, 'base64'), + ); + } + + // Set Item Reference + item.binary = data; + }, }; // Make it possible to access data via $node, $parameter, ... diff --git a/packages/nodes-base/nodes/FunctionItem/FunctionItem.node.ts b/packages/nodes-base/nodes/FunctionItem/FunctionItem.node.ts index 97b87a63b6..1d5b7f942d 100644 --- a/packages/nodes-base/nodes/FunctionItem/FunctionItem.node.ts +++ b/packages/nodes-base/nodes/FunctionItem/FunctionItem.node.ts @@ -75,22 +75,70 @@ return item;`, }; for (let itemIndex = 0; itemIndex < length; itemIndex++) { + const mode = this.getMode(); + try { item = items[itemIndex]; + item.index = itemIndex; // Copy the items as they may get changed in the functions item = JSON.parse(JSON.stringify(item)); // Define the global objects for the custom function const sandbox = { + /** @deprecated for removal - replaced by getBinaryDataAsync() */ getBinaryData: (): IBinaryKeyData | undefined => { + if (mode === 'manual') { + this.sendMessageToUI( + 'getBinaryData(...) is deprecated and will be removed in a future version. Please consider switching to getBinaryDataAsync(...) instead.', + ); + } return item.binary; }, + /** @deprecated for removal - replaced by setBinaryDataAsync() */ + setBinaryData: async (data: IBinaryKeyData) => { + if (mode === 'manual') { + this.sendMessageToUI( + 'setBinaryData(...) is deprecated and will be removed in a future version. Please consider switching to setBinaryDataAsync(...) instead.', + ); + } + item.binary = data; + }, getNodeParameter: this.getNodeParameter, getWorkflowStaticData: this.getWorkflowStaticData, helpers: this.helpers, item: item.json, - setBinaryData: (data: IBinaryKeyData) => { + getBinaryDataAsync: async (): Promise => { + // Fetch Binary Data, if available. Cannot check item with `if (item?.index)`, as index may be 0. + if (item?.binary && item?.index !== undefined && item?.index !== null) { + for (const binaryPropertyName of Object.keys(item.binary)) { + item.binary[binaryPropertyName].data = ( + await this.helpers.getBinaryDataBuffer(item.index, binaryPropertyName) + )?.toString('base64'); + } + } + // Retrun Data + return item.binary; + }, + setBinaryDataAsync: async (data: IBinaryKeyData) => { + // Ensure data is present + if (!data) { + throw new NodeOperationError( + this.getNode(), + 'No data was provided to setBinaryDataAsync (data: IBinaryKeyData).', + ); + } + + // Set Binary Data + for (const binaryPropertyName of Object.keys(data)) { + const binaryItem = data[binaryPropertyName]; + data[binaryPropertyName] = await this.helpers.setBinaryDataBuffer( + binaryItem, + Buffer.from(binaryItem.data, 'base64'), + ); + } + + // Set Item Reference item.binary = data; }, }; @@ -99,8 +147,6 @@ return item;`, const dataProxy = this.getWorkflowDataProxy(itemIndex); Object.assign(sandbox, dataProxy); - const mode = this.getMode(); - const options = { console: mode === 'manual' ? 'redirect' : 'inherit', sandbox, diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index fb0774b8ab..28b71308eb 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -877,6 +877,7 @@ export interface INodeExecutionData { binary?: IBinaryKeyData; error?: NodeApiError | NodeOperationError; pairedItem?: IPairedItemData | IPairedItemData[] | number; + index?: number; } export interface INodeExecuteFunctions {