mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-24 04:04:06 -08:00
fix(core & function nodes): Update function nodes to work with binary-data-mode 'filesystem'. (#3845)
* Initial Fix * Self-Review #1 * Lint * Added support for FunctionItem. Minor updates. * Self-review * review comments. Added testing. * Self Review * Fixed memory handling on data manager use. * Fixes for unnecessary memory leaks.
This commit is contained in:
parent
b450e977a3
commit
f6064ef278
|
@ -50,15 +50,23 @@ export class BinaryDataManager {
|
|||
): Promise<IBinaryData> {
|
||||
const retBinaryData = binaryData;
|
||||
|
||||
// If a manager handles this binary, return the binary data with it's reference id.
|
||||
if (this.managers[this.binaryDataMode]) {
|
||||
return this.managers[this.binaryDataMode]
|
||||
.storeBinaryData(binaryBuffer, executionId)
|
||||
.then((filename) => {
|
||||
// Add data manager reference id.
|
||||
retBinaryData.id = this.generateBinaryId(filename);
|
||||
|
||||
// Prevent preserving data in memory if handled by a data manager.
|
||||
retBinaryData.data = this.binaryDataMode;
|
||||
|
||||
// Short-circuit return to prevent further actions.
|
||||
return retBinaryData;
|
||||
});
|
||||
}
|
||||
|
||||
// Else fallback to storing this data in memory.
|
||||
retBinaryData.data = binaryBuffer.toString(BINARY_ENCODING);
|
||||
return binaryData;
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ export interface IExecuteFunctions extends IExecuteFunctionsBase {
|
|||
mimeType?: string,
|
||||
): Promise<IBinaryData>;
|
||||
getBinaryDataBuffer(itemIndex: number, propertyName: string): Promise<Buffer>;
|
||||
setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData>;
|
||||
request: (uriOrObject: string | IDataObject | any, options?: IDataObject) => Promise<any>; // tslint:disable-line:no-any
|
||||
requestWithAuthentication(
|
||||
this: IAllExecuteFunctions,
|
||||
|
@ -80,6 +81,7 @@ export interface IExecuteFunctions extends IExecuteFunctionsBase {
|
|||
export interface IExecuteSingleFunctions extends IExecuteSingleFunctionsBase {
|
||||
helpers: {
|
||||
getBinaryDataBuffer(propertyName: string, inputIndex?: number): Promise<Buffer>;
|
||||
setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData>;
|
||||
httpRequest(requestOptions: IHttpRequestOptions): Promise<any>; // tslint:disable-line:no-any
|
||||
prepareBinaryData(
|
||||
binaryData: Buffer,
|
||||
|
|
|
@ -813,6 +813,22 @@ export async function getBinaryDataBuffer(
|
|||
return BinaryDataManager.getInstance().retrieveBinaryData(binaryData);
|
||||
}
|
||||
|
||||
/**
|
||||
* Store an incoming IBinaryData & related buffer using the configured binary data manager.
|
||||
*
|
||||
* @export
|
||||
* @param {IBinaryData} data
|
||||
* @param {Buffer} binaryData
|
||||
* @returns {Promise<IBinaryData>}
|
||||
*/
|
||||
export async function setBinaryDataBuffer(
|
||||
data: IBinaryData,
|
||||
binaryData: Buffer,
|
||||
executionId: string,
|
||||
): Promise<IBinaryData> {
|
||||
return BinaryDataManager.getInstance().storeBinaryData(data, binaryData, executionId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes a buffer and converts it into the format n8n uses. It encodes the binary data as
|
||||
* base64 and adds metadata.
|
||||
|
@ -882,7 +898,7 @@ export async function prepareBinaryData(
|
|||
}
|
||||
}
|
||||
|
||||
return BinaryDataManager.getInstance().storeBinaryData(returnData, binaryData, executionId);
|
||||
return setBinaryDataBuffer(returnData, binaryData, executionId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1950,6 +1966,9 @@ export function getExecutePollFunctions(
|
|||
},
|
||||
helpers: {
|
||||
httpRequest,
|
||||
async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData> {
|
||||
return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!);
|
||||
},
|
||||
async prepareBinaryData(
|
||||
binaryData: Buffer,
|
||||
filePath?: string,
|
||||
|
@ -2121,6 +2140,9 @@ export function getExecuteTriggerFunctions(
|
|||
additionalCredentialOptions,
|
||||
);
|
||||
},
|
||||
async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData> {
|
||||
return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!);
|
||||
},
|
||||
async prepareBinaryData(
|
||||
binaryData: Buffer,
|
||||
filePath?: string,
|
||||
|
@ -2381,6 +2403,9 @@ export function getExecuteFunctions(
|
|||
additionalCredentialOptions,
|
||||
);
|
||||
},
|
||||
async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData> {
|
||||
return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!);
|
||||
},
|
||||
async prepareBinaryData(
|
||||
binaryData: Buffer,
|
||||
filePath?: string,
|
||||
|
@ -2624,6 +2649,9 @@ export function getExecuteSingleFunctions(
|
|||
additionalCredentialOptions,
|
||||
);
|
||||
},
|
||||
async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData> {
|
||||
return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!);
|
||||
},
|
||||
async prepareBinaryData(
|
||||
binaryData: Buffer,
|
||||
filePath?: string,
|
||||
|
@ -3121,6 +3149,9 @@ export function getExecuteWebhookFunctions(
|
|||
additionalCredentialOptions,
|
||||
);
|
||||
},
|
||||
async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData> {
|
||||
return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!);
|
||||
},
|
||||
async prepareBinaryData(
|
||||
binaryData: Buffer,
|
||||
filePath?: string,
|
||||
|
|
127
packages/core/test/NodeExecuteFunctions.test.ts
Normal file
127
packages/core/test/NodeExecuteFunctions.test.ts
Normal file
|
@ -0,0 +1,127 @@
|
|||
import { join } from 'path';
|
||||
import { tmpdir } from 'os';
|
||||
import { readFileSync, mkdtempSync } from 'fs';
|
||||
|
||||
import { BinaryDataManager, NodeExecuteFunctions } from '../src';
|
||||
import { IBinaryData, ITaskDataConnections } from 'n8n-workflow';
|
||||
|
||||
const temporaryDir = mkdtempSync(join(tmpdir(), 'n8n'));
|
||||
|
||||
describe('NodeExecuteFunctions', () => {
|
||||
describe(`test binary data helper methods`, () => {
|
||||
// Reset BinaryDataManager for each run. This is a dirty operation, as individual managers are not cleaned.
|
||||
beforeEach(() => {
|
||||
//@ts-ignore
|
||||
BinaryDataManager.instance = undefined;
|
||||
});
|
||||
|
||||
test(`test getBinaryDataBuffer(...) & setBinaryDataBuffer(...) methods in 'default' mode`, async () => {
|
||||
// Setup a 'default' binary data manager instance
|
||||
await BinaryDataManager.init({
|
||||
mode: 'default',
|
||||
availableModes: 'default',
|
||||
localStoragePath: temporaryDir,
|
||||
binaryDataTTL: 1,
|
||||
persistedBinaryDataTTL: 1,
|
||||
});
|
||||
|
||||
// Set our binary data buffer
|
||||
let inputData: Buffer = Buffer.from('This is some binary data', 'utf8');
|
||||
let setBinaryDataBufferResponse: IBinaryData = await NodeExecuteFunctions.setBinaryDataBuffer(
|
||||
{
|
||||
mimeType: 'txt',
|
||||
data: 'This should be overwritten by the actual payload in the response',
|
||||
},
|
||||
inputData,
|
||||
'executionId',
|
||||
);
|
||||
|
||||
// Expect our return object to contain the base64 encoding of the input data, as it should be stored in memory.
|
||||
expect(setBinaryDataBufferResponse.data).toEqual(inputData.toString('base64'));
|
||||
|
||||
// Now, re-fetch our data.
|
||||
// An ITaskDataConnections object is used to share data between nodes. The top level property, 'main', represents the successful output object from a previous node.
|
||||
let taskDataConnectionsInput: ITaskDataConnections = {
|
||||
main: [],
|
||||
};
|
||||
|
||||
// We add an input set, with one item at index 0, to this input. It contains an empty json payload and our binary data.
|
||||
taskDataConnectionsInput.main.push([
|
||||
{
|
||||
json: {},
|
||||
binary: {
|
||||
data: setBinaryDataBufferResponse,
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
||||
// Now, lets fetch our data! The item will be item index 0.
|
||||
let getBinaryDataBufferResponse: Buffer = await NodeExecuteFunctions.getBinaryDataBuffer(
|
||||
taskDataConnectionsInput,
|
||||
0,
|
||||
'data',
|
||||
0,
|
||||
);
|
||||
|
||||
expect(getBinaryDataBufferResponse).toEqual(inputData);
|
||||
});
|
||||
|
||||
test(`test getBinaryDataBuffer(...) & setBinaryDataBuffer(...) methods in 'filesystem' mode`, async () => {
|
||||
// Setup a 'filesystem' binary data manager instance
|
||||
await BinaryDataManager.init({
|
||||
mode: 'filesystem',
|
||||
availableModes: 'filesystem',
|
||||
localStoragePath: temporaryDir,
|
||||
binaryDataTTL: 1,
|
||||
persistedBinaryDataTTL: 1,
|
||||
});
|
||||
|
||||
// Set our binary data buffer
|
||||
let inputData: Buffer = Buffer.from('This is some binary data', 'utf8');
|
||||
let setBinaryDataBufferResponse: IBinaryData = await NodeExecuteFunctions.setBinaryDataBuffer(
|
||||
{
|
||||
mimeType: 'txt',
|
||||
data: 'This should be overwritten with the name of the configured data manager',
|
||||
},
|
||||
inputData,
|
||||
'executionId',
|
||||
);
|
||||
|
||||
// Expect our return object to contain the name of the configured data manager.
|
||||
expect(setBinaryDataBufferResponse.data).toEqual('filesystem');
|
||||
|
||||
// Ensure that the input data was successfully persisted to disk.
|
||||
expect(
|
||||
readFileSync(
|
||||
`${temporaryDir}/${setBinaryDataBufferResponse.id?.replace('filesystem:', '')}`,
|
||||
),
|
||||
).toEqual(inputData);
|
||||
|
||||
// Now, re-fetch our data.
|
||||
// An ITaskDataConnections object is used to share data between nodes. The top level property, 'main', represents the successful output object from a previous node.
|
||||
let taskDataConnectionsInput: ITaskDataConnections = {
|
||||
main: [],
|
||||
};
|
||||
|
||||
// We add an input set, with one item at index 0, to this input. It contains an empty json payload and our binary data.
|
||||
taskDataConnectionsInput.main.push([
|
||||
{
|
||||
json: {},
|
||||
binary: {
|
||||
data: setBinaryDataBufferResponse,
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
||||
// Now, lets fetch our data! The item will be item index 0.
|
||||
let getBinaryDataBufferResponse: Buffer = await NodeExecuteFunctions.getBinaryDataBuffer(
|
||||
taskDataConnectionsInput,
|
||||
0,
|
||||
'data',
|
||||
0,
|
||||
);
|
||||
|
||||
expect(getBinaryDataBufferResponse).toEqual(inputData);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -1,5 +1,6 @@
|
|||
import { IExecuteFunctions } from 'n8n-core';
|
||||
import {
|
||||
IBinaryKeyData,
|
||||
IDataObject,
|
||||
INodeExecutionData,
|
||||
INodeType,
|
||||
|
@ -61,6 +62,11 @@ return items;`,
|
|||
// Copy the items as they may get changed in the functions
|
||||
items = JSON.parse(JSON.stringify(items));
|
||||
|
||||
// Assign item indexes
|
||||
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
|
||||
items[itemIndex].index = itemIndex;
|
||||
}
|
||||
|
||||
const cleanupData = (inputData: IDataObject): IDataObject => {
|
||||
Object.keys(inputData).map((key) => {
|
||||
if (inputData[key] !== null && typeof inputData[key] === 'object') {
|
||||
|
@ -84,6 +90,48 @@ return items;`,
|
|||
items,
|
||||
// To be able to access data of other items
|
||||
$item: (index: number) => this.getWorkflowDataProxy(index),
|
||||
getBinaryDataAsync: async (item: INodeExecutionData): Promise<IBinaryKeyData | undefined> => {
|
||||
// Fetch Binary Data, if available. Cannot check item with `if (item?.index)`, as index may be 0.
|
||||
if (item?.binary && item?.index !== undefined && item?.index !== null) {
|
||||
for (const binaryPropertyName of Object.keys(item.binary)) {
|
||||
item.binary[binaryPropertyName].data = (
|
||||
await this.helpers.getBinaryDataBuffer(item.index, binaryPropertyName)
|
||||
)?.toString('base64');
|
||||
}
|
||||
}
|
||||
|
||||
// Return Data
|
||||
return item.binary;
|
||||
},
|
||||
setBinaryDataAsync: async (item: INodeExecutionData, data: IBinaryKeyData) => {
|
||||
// Ensure item is provided, else return a friendly error.
|
||||
if (!item) {
|
||||
throw new NodeOperationError(
|
||||
this.getNode(),
|
||||
'No item was provided to setBinaryDataAsync (item: INodeExecutionData, data: IBinaryKeyData).',
|
||||
);
|
||||
}
|
||||
|
||||
// Ensure data is provided, else return a friendly error.
|
||||
if (!data) {
|
||||
throw new NodeOperationError(
|
||||
this.getNode(),
|
||||
'No data was provided to setBinaryDataAsync (item: INodeExecutionData, data: IBinaryKeyData).',
|
||||
);
|
||||
}
|
||||
|
||||
// Set Binary Data
|
||||
for (const binaryPropertyName of Object.keys(data)) {
|
||||
const binaryItem = data[binaryPropertyName];
|
||||
data[binaryPropertyName] = await this.helpers.setBinaryDataBuffer(
|
||||
binaryItem,
|
||||
Buffer.from(binaryItem.data, 'base64'),
|
||||
);
|
||||
}
|
||||
|
||||
// Set Item Reference
|
||||
item.binary = data;
|
||||
},
|
||||
};
|
||||
|
||||
// Make it possible to access data via $node, $parameter, ...
|
||||
|
|
|
@ -75,22 +75,70 @@ return item;`,
|
|||
};
|
||||
|
||||
for (let itemIndex = 0; itemIndex < length; itemIndex++) {
|
||||
const mode = this.getMode();
|
||||
|
||||
try {
|
||||
item = items[itemIndex];
|
||||
item.index = itemIndex;
|
||||
|
||||
// Copy the items as they may get changed in the functions
|
||||
item = JSON.parse(JSON.stringify(item));
|
||||
|
||||
// Define the global objects for the custom function
|
||||
const sandbox = {
|
||||
/** @deprecated for removal - replaced by getBinaryDataAsync() */
|
||||
getBinaryData: (): IBinaryKeyData | undefined => {
|
||||
if (mode === 'manual') {
|
||||
this.sendMessageToUI(
|
||||
'getBinaryData(...) is deprecated and will be removed in a future version. Please consider switching to getBinaryDataAsync(...) instead.',
|
||||
);
|
||||
}
|
||||
return item.binary;
|
||||
},
|
||||
/** @deprecated for removal - replaced by setBinaryDataAsync() */
|
||||
setBinaryData: async (data: IBinaryKeyData) => {
|
||||
if (mode === 'manual') {
|
||||
this.sendMessageToUI(
|
||||
'setBinaryData(...) is deprecated and will be removed in a future version. Please consider switching to setBinaryDataAsync(...) instead.',
|
||||
);
|
||||
}
|
||||
item.binary = data;
|
||||
},
|
||||
getNodeParameter: this.getNodeParameter,
|
||||
getWorkflowStaticData: this.getWorkflowStaticData,
|
||||
helpers: this.helpers,
|
||||
item: item.json,
|
||||
setBinaryData: (data: IBinaryKeyData) => {
|
||||
getBinaryDataAsync: async (): Promise<IBinaryKeyData | undefined> => {
|
||||
// Fetch Binary Data, if available. Cannot check item with `if (item?.index)`, as index may be 0.
|
||||
if (item?.binary && item?.index !== undefined && item?.index !== null) {
|
||||
for (const binaryPropertyName of Object.keys(item.binary)) {
|
||||
item.binary[binaryPropertyName].data = (
|
||||
await this.helpers.getBinaryDataBuffer(item.index, binaryPropertyName)
|
||||
)?.toString('base64');
|
||||
}
|
||||
}
|
||||
// Retrun Data
|
||||
return item.binary;
|
||||
},
|
||||
setBinaryDataAsync: async (data: IBinaryKeyData) => {
|
||||
// Ensure data is present
|
||||
if (!data) {
|
||||
throw new NodeOperationError(
|
||||
this.getNode(),
|
||||
'No data was provided to setBinaryDataAsync (data: IBinaryKeyData).',
|
||||
);
|
||||
}
|
||||
|
||||
// Set Binary Data
|
||||
for (const binaryPropertyName of Object.keys(data)) {
|
||||
const binaryItem = data[binaryPropertyName];
|
||||
data[binaryPropertyName] = await this.helpers.setBinaryDataBuffer(
|
||||
binaryItem,
|
||||
Buffer.from(binaryItem.data, 'base64'),
|
||||
);
|
||||
}
|
||||
|
||||
// Set Item Reference
|
||||
item.binary = data;
|
||||
},
|
||||
};
|
||||
|
@ -99,8 +147,6 @@ return item;`,
|
|||
const dataProxy = this.getWorkflowDataProxy(itemIndex);
|
||||
Object.assign(sandbox, dataProxy);
|
||||
|
||||
const mode = this.getMode();
|
||||
|
||||
const options = {
|
||||
console: mode === 'manual' ? 'redirect' : 'inherit',
|
||||
sandbox,
|
||||
|
|
|
@ -877,6 +877,7 @@ export interface INodeExecutionData {
|
|||
binary?: IBinaryKeyData;
|
||||
error?: NodeApiError | NodeOperationError;
|
||||
pairedItem?: IPairedItemData | IPairedItemData[] | number;
|
||||
index?: number;
|
||||
}
|
||||
|
||||
export interface INodeExecuteFunctions {
|
||||
|
|
Loading…
Reference in a new issue