From 07e4743a3e1a93542fac661aabba0a36d95c098a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Thu, 24 Nov 2022 16:54:43 +0100 Subject: [PATCH] refactor(core): Reduce memory usage in the Webhook node (#4640) use file streaming to pass webhook binaries around --- packages/cli/src/Server.ts | 22 +++-- packages/core/package.json | 1 + .../core/src/BinaryDataManager/FileSystem.ts | 53 ++++++++++-- packages/core/src/BinaryDataManager/index.ts | 84 +++++++++++++++---- packages/core/src/Interfaces.ts | 12 +++ packages/core/src/NodeExecuteFunctions.ts | 82 +++++++++++++++++- packages/editor-ui/src/Interface.ts | 9 +- .../src/components/BinaryDataDisplay.vue | 15 +--- .../src/components/BinaryDataDisplayEmbed.vue | 20 ++--- packages/editor-ui/src/components/RunData.vue | 36 ++++---- packages/editor-ui/src/mixins/restApi.ts | 4 + .../src/plugins/i18n/locales/en.json | 1 + packages/nodes-base/nodes/Wait/Wait.node.ts | 80 ++++++++---------- .../nodes-base/nodes/Webhook/Webhook.node.ts | 80 ++++++++---------- packages/workflow/src/Interfaces.ts | 3 + pnpm-lock.yaml | 3 +- 16 files changed, 329 insertions(+), 176 deletions(-) diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index b3346a8b05..2558b9bbd2 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -1497,18 +1497,22 @@ class App { // Binary data // ---------------------------------------- - // Returns binary buffer + // Download binary this.app.get( `/${this.restEndpoint}/data/:path`, - ResponseHelper.send(async (req: express.Request, res: express.Response): Promise => { + async (req: express.Request, res: express.Response): Promise => { // TODO UM: check if this needs permission check for UM - const dataPath = req.params.path; - return BinaryDataManager.getInstance() - .retrieveBinaryDataByIdentifier(dataPath) - .then((buffer: Buffer) => { - return buffer.toString('base64'); - }); - }), + const identifier = req.params.path; + const binaryDataManager = BinaryDataManager.getInstance(); + const binaryPath = binaryDataManager.getBinaryPath(identifier); + const { mimeType, fileName, fileSize } = await binaryDataManager.getBinaryMetadata( + identifier, + ); + if (mimeType) res.setHeader('Content-Type', mimeType); + if (fileName) res.setHeader('Content-Disposition', `attachment; filename="${fileName}"`); + res.setHeader('Content-Length', fileSize); + res.sendFile(binaryPath); + }, ); // ---------------------------------------- diff --git a/packages/core/package.json b/packages/core/package.json index 5a4e6dc1e2..24803be905 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -56,6 +56,7 @@ "n8n-workflow": "~0.126.0", "oauth-1.0a": "^2.2.6", "p-cancelable": "^2.0.0", + "pretty-bytes": "^5.6.0", "qs": "^6.10.1", "request": "^2.88.2", "request-promise-native": "^1.0.7", diff --git a/packages/core/src/BinaryDataManager/FileSystem.ts b/packages/core/src/BinaryDataManager/FileSystem.ts index d9a8e4d2a2..360051ff03 100644 --- a/packages/core/src/BinaryDataManager/FileSystem.ts +++ b/packages/core/src/BinaryDataManager/FileSystem.ts @@ -1,8 +1,9 @@ -import { promises as fs } from 'fs'; +import fs from 'fs/promises'; +import { jsonParse } from 'n8n-workflow'; import path from 'path'; import { v4 as uuid } from 'uuid'; -import { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; +import { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; const PREFIX_METAFILE = 'binarymeta'; const PREFIX_PERSISTED_METAFILE = 'persistedmeta'; @@ -43,17 +44,47 @@ export class BinaryDataFileSystem implements IBinaryDataManager { .then(() => {}); } + async getFileSize(identifier: string): Promise { + const stats = await fs.stat(this.getBinaryPath(identifier)); + return stats.size; + } + + async copyBinaryFile(filePath: string, executionId: string): Promise { + const binaryDataId = this.generateFileName(executionId); + await this.addBinaryIdToPersistMeta(executionId, binaryDataId); + await this.copyFileToLocalStorage(filePath, binaryDataId); + return binaryDataId; + } + + async storeBinaryMetadata(identifier: string, metadata: BinaryMetadata) { + await fs.writeFile(this.getMetadataPath(identifier), JSON.stringify(metadata), { + encoding: 'utf-8', + }); + } + + async getBinaryMetadata(identifier: string): Promise { + return jsonParse(await fs.readFile(this.getMetadataPath(identifier), { encoding: 'utf-8' })); + } + async storeBinaryData(binaryBuffer: Buffer, executionId: string): Promise { const binaryDataId = this.generateFileName(executionId); - return this.addBinaryIdToPersistMeta(executionId, binaryDataId).then(async () => - this.saveToLocalStorage(binaryBuffer, binaryDataId).then(() => binaryDataId), - ); + await this.addBinaryIdToPersistMeta(executionId, binaryDataId); + await this.saveToLocalStorage(binaryBuffer, binaryDataId); + return binaryDataId; } async retrieveBinaryDataByIdentifier(identifier: string): Promise { return this.retrieveFromLocalStorage(identifier); } + getBinaryPath(identifier: string): string { + return path.join(this.storagePath, identifier); + } + + getMetadataPath(identifier: string): string { + return path.join(this.storagePath, `${identifier}.metadata`); + } + async markDataForDeletionByExecutionId(executionId: string): Promise { const tt = new Date(new Date().getTime() + this.binaryDataTTL * 60000); return fs.writeFile( @@ -180,7 +211,7 @@ export class BinaryDataFileSystem implements IBinaryDataManager { } private generateFileName(prefix: string): string { - return `${prefix}_${uuid()}`; + return [prefix, uuid()].join(''); } private getBinaryDataMetaPath() { @@ -196,15 +227,19 @@ export class BinaryDataFileSystem implements IBinaryDataManager { } private async deleteFromLocalStorage(identifier: string) { - return fs.rm(path.join(this.storagePath, identifier)); + return fs.rm(this.getBinaryPath(identifier)); + } + + private async copyFileToLocalStorage(source: string, identifier: string): Promise { + await fs.cp(source, this.getBinaryPath(identifier)); } private async saveToLocalStorage(data: Buffer, identifier: string) { - await fs.writeFile(path.join(this.storagePath, identifier), data); + await fs.writeFile(this.getBinaryPath(identifier), data); } private async retrieveFromLocalStorage(identifier: string): Promise { - const filePath = path.join(this.storagePath, identifier); + const filePath = this.getBinaryPath(identifier); try { return await fs.readFile(filePath); } catch (e) { diff --git a/packages/core/src/BinaryDataManager/index.ts b/packages/core/src/BinaryDataManager/index.ts index 57b9313b53..ae65534945 100644 --- a/packages/core/src/BinaryDataManager/index.ts +++ b/packages/core/src/BinaryDataManager/index.ts @@ -1,7 +1,9 @@ -import { IBinaryData, INodeExecutionData } from 'n8n-workflow'; +import prettyBytes from 'pretty-bytes'; +import type { IBinaryData, INodeExecutionData } from 'n8n-workflow'; import { BINARY_ENCODING } from '../Constants'; -import { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; +import type { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; import { BinaryDataFileSystem } from './FileSystem'; +import { readFile, stat } from 'fs/promises'; export class BinaryDataManager { static instance: BinaryDataManager | undefined; @@ -43,31 +45,59 @@ export class BinaryDataManager { return BinaryDataManager.instance; } + async copyBinaryFile( + binaryData: IBinaryData, + filePath: string, + executionId: string, + ): Promise { + // If a manager handles this binary, copy over the binary file and return its reference id. + const manager = this.managers[this.binaryDataMode]; + if (manager) { + const identifier = await manager.copyBinaryFile(filePath, executionId); + // Add data manager reference id. + binaryData.id = this.generateBinaryId(identifier); + + // Prevent preserving data in memory if handled by a data manager. + binaryData.data = this.binaryDataMode; + + const fileSize = await manager.getFileSize(identifier); + binaryData.fileSize = prettyBytes(fileSize); + + await manager.storeBinaryMetadata(identifier, { + fileName: binaryData.fileName, + mimeType: binaryData.mimeType, + fileSize, + }); + } else { + const { size } = await stat(filePath); + binaryData.fileSize = prettyBytes(size); + binaryData.data = await readFile(filePath, { encoding: BINARY_ENCODING }); + } + + return binaryData; + } + async storeBinaryData( binaryData: IBinaryData, binaryBuffer: Buffer, executionId: string, ): Promise { - const retBinaryData = binaryData; + binaryData.fileSize = prettyBytes(binaryBuffer.length); - // If a manager handles this binary, return the binary data with it's reference id. - if (this.managers[this.binaryDataMode]) { - return this.managers[this.binaryDataMode] - .storeBinaryData(binaryBuffer, executionId) - .then((filename) => { - // Add data manager reference id. - retBinaryData.id = this.generateBinaryId(filename); + // If a manager handles this binary, return the binary data with its reference id. + const manager = this.managers[this.binaryDataMode]; + if (manager) { + const identifier = await manager.storeBinaryData(binaryBuffer, executionId); + // Add data manager reference id. + binaryData.id = this.generateBinaryId(identifier); - // Prevent preserving data in memory if handled by a data manager. - retBinaryData.data = this.binaryDataMode; - - // Short-circuit return to prevent further actions. - return retBinaryData; - }); + // Prevent preserving data in memory if handled by a data manager. + binaryData.data = this.binaryDataMode; + } else { + // Else fallback to storing this data in memory. + binaryData.data = binaryBuffer.toString(BINARY_ENCODING); } - // Else fallback to storing this data in memory. - retBinaryData.data = binaryBuffer.toString(BINARY_ENCODING); return binaryData; } @@ -88,6 +118,24 @@ export class BinaryDataManager { throw new Error('Storage mode used to store binary data not available'); } + getBinaryPath(identifier: string): string { + const { mode, id } = this.splitBinaryModeFileId(identifier); + if (this.managers[mode]) { + return this.managers[mode].getBinaryPath(id); + } + + throw new Error('Storage mode used to store binary data not available'); + } + + async getBinaryMetadata(identifier: string): Promise { + const { mode, id } = this.splitBinaryModeFileId(identifier); + if (this.managers[mode]) { + return this.managers[mode].getBinaryMetadata(id); + } + + throw new Error('Storage mode used to store binary data not available'); + } + async markDataForDeletionByExecutionId(executionId: string): Promise { if (this.managers[this.binaryDataMode]) { return this.managers[this.binaryDataMode].markDataForDeletionByExecutionId(executionId); diff --git a/packages/core/src/Interfaces.ts b/packages/core/src/Interfaces.ts index da1b3e6f1b..51dec31fbd 100644 --- a/packages/core/src/Interfaces.ts +++ b/packages/core/src/Interfaces.ts @@ -260,6 +260,7 @@ export interface IWebhookFunctions extends IWebhookFunctionsBase { filePath?: string, mimeType?: string, ): Promise; + copyBinaryFile(filePath: string, fileName: string, mimeType?: string): Promise; request: (uriOrObject: string | IDataObject | any, options?: IDataObject) => Promise; requestWithAuthentication( this: IAllExecuteFunctions, @@ -306,10 +307,21 @@ export interface IBinaryDataConfig { persistedBinaryDataTTL: number; } +export interface BinaryMetadata { + fileName?: string; + mimeType?: string; + fileSize: number; +} + export interface IBinaryDataManager { init(startPurger: boolean): Promise; + getFileSize(filePath: string): Promise; + copyBinaryFile(filePath: string, executionId: string): Promise; + storeBinaryMetadata(identifier: string, metadata: BinaryMetadata): Promise; + getBinaryMetadata(identifier: string): Promise; storeBinaryData(binaryBuffer: Buffer, executionId: string): Promise; retrieveBinaryDataByIdentifier(identifier: string): Promise; + getBinaryPath(identifier: string): string; markDataForDeletionByExecutionId(executionId: string): Promise; deleteMarkedFiles(): Promise; deleteBinaryDataByIdentifier(identifier: string): Promise; diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 0e2d10e7b2..0bc5d51c76 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -64,6 +64,7 @@ import { NodeExecutionWithMetadata, IPairedItemData, deepCopy, + BinaryFileType, } from 'n8n-workflow'; import { Agent } from 'https'; @@ -77,8 +78,8 @@ import FormData from 'form-data'; import path from 'path'; import { OptionsWithUri, OptionsWithUrl, RequestCallback, RequiredUriUrl } from 'request'; import requestPromise, { RequestPromiseOptions } from 'request-promise-native'; -import { fromBuffer } from 'file-type'; -import { lookup } from 'mime-types'; +import FileType from 'file-type'; +import { lookup, extension } from 'mime-types'; import { IncomingHttpHeaders } from 'http'; import axios, { AxiosError, @@ -830,6 +831,13 @@ export async function getBinaryDataBuffer( return BinaryDataManager.getInstance().retrieveBinaryData(binaryData); } +function fileTypeFromMimeType(mimeType: string): BinaryFileType | undefined { + if (mimeType.startsWith('image/')) return 'image'; + if (mimeType.startsWith('video/')) return 'video'; + if (mimeType.startsWith('text/') || mimeType.startsWith('application/json')) return 'text'; + return; +} + /** * Store an incoming IBinaryData & related buffer using the configured binary data manager. * @@ -846,10 +854,60 @@ export async function setBinaryDataBuffer( return BinaryDataManager.getInstance().storeBinaryData(data, binaryData, executionId); } +export async function copyBinaryFile( + executionId: string, + filePath: string, + fileName: string, + mimeType?: string, +): Promise { + let fileExtension: string | undefined; + if (!mimeType) { + // If no mime type is given figure it out + + if (filePath) { + // Use file path to guess mime type + const mimeTypeLookup = lookup(filePath); + if (mimeTypeLookup) { + mimeType = mimeTypeLookup; + } + } + + if (!mimeType) { + // read the first bytes of the file to guess mime type + const fileTypeData = await FileType.fromFile(filePath); + if (fileTypeData) { + mimeType = fileTypeData.mime; + fileExtension = fileTypeData.ext; + } + } + + if (!mimeType) { + // Fall back to text + mimeType = 'text/plain'; + } + } else if (!fileExtension) { + fileExtension = extension(mimeType) || undefined; + } + + const returnData: IBinaryData = { + mimeType, + fileType: fileTypeFromMimeType(mimeType), + fileExtension, + data: '', + }; + + if (fileName) { + returnData.fileName = fileName; + } else if (filePath) { + returnData.fileName = path.parse(filePath).base; + } + + return BinaryDataManager.getInstance().copyBinaryFile(returnData, filePath, executionId); +} + /** * Takes a buffer and converts it into the format n8n uses. It encodes the binary data as * base64 and adds metadata. - * */ export async function prepareBinaryData( binaryData: Buffer, @@ -871,7 +929,7 @@ export async function prepareBinaryData( if (!mimeType) { // Use buffer to guess mime type - const fileTypeData = await fromBuffer(binaryData); + const fileTypeData = await FileType.fromBuffer(binaryData); if (fileTypeData) { mimeType = fileTypeData.mime; fileExtension = fileTypeData.ext; @@ -882,10 +940,13 @@ export async function prepareBinaryData( // Fall back to text mimeType = 'text/plain'; } + } else if (!fileExtension) { + fileExtension = extension(mimeType) || undefined; } const returnData: IBinaryData = { mimeType, + fileType: fileTypeFromMimeType(mimeType), fileExtension, data: '', }; @@ -3076,6 +3137,19 @@ export function getExecuteWebhookFunctions( async setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise { return setBinaryDataBuffer.call(this, data, binaryData, additionalData.executionId!); }, + async copyBinaryFile( + filePath: string, + fileName: string, + mimeType?: string, + ): Promise { + return copyBinaryFile.call( + this, + additionalData.executionId!, + filePath, + fileName, + mimeType, + ); + }, async prepareBinaryData( binaryData: Buffer, filePath?: string, diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts index e623057a92..0a7fed175e 100644 --- a/packages/editor-ui/src/Interface.ts +++ b/packages/editor-ui/src/Interface.ts @@ -215,6 +215,7 @@ export interface IRestApi { retryExecution(id: string, loadWorkflow?: boolean): Promise; getTimezones(): Promise; getBinaryBufferString(dataPath: string): Promise; + getBinaryUrl(dataPath: string): string; } export interface INodeTranslationHeaders { @@ -226,14 +227,6 @@ export interface INodeTranslationHeaders { }; } -export interface IBinaryDisplayData { - index: number; - key: string; - node: string; - outputIndex: number; - runIndex: number; -} - export interface IStartRunData { workflowData: IWorkflowData; startNodes?: string[]; diff --git a/packages/editor-ui/src/components/BinaryDataDisplay.vue b/packages/editor-ui/src/components/BinaryDataDisplay.vue index 59aac23200..8ef83d206d 100644 --- a/packages/editor-ui/src/components/BinaryDataDisplay.vue +++ b/packages/editor-ui/src/components/BinaryDataDisplay.vue @@ -20,10 +20,7 @@