From 8bee04cd2a6283d66c3e3c3e09c918d44028217d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Mon, 2 Jan 2023 17:07:10 +0100 Subject: [PATCH] refactor(Read Binary File Node): Use node streams for to reduce memory usage (#5069) --- packages/core/package.json | 2 ++ .../core/src/BinaryDataManager/FileSystem.ts | 9 +++---- packages/core/src/BinaryDataManager/index.ts | 24 ++++++++++++------- packages/core/src/Interfaces.ts | 5 ++-- packages/core/src/NodeExecuteFunctions.ts | 11 +++++---- .../ReadBinaryFile/ReadBinaryFile.node.ts | 4 ++-- .../ReadBinaryFiles/ReadBinaryFiles.node.ts | 8 +++---- packages/workflow/src/Interfaces.ts | 7 +++++- pnpm-lock.yaml | 10 ++++++++ 9 files changed, 55 insertions(+), 25 deletions(-) diff --git a/packages/core/package.json b/packages/core/package.json index 57025066b7..49e5067847 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -34,6 +34,7 @@ "bin" ], "devDependencies": { + "@types/concat-stream": "^2.0.0", "@types/cron": "~1.7.1", "@types/crypto-js": "^4.0.1", "@types/express": "^4.17.6", @@ -45,6 +46,7 @@ "dependencies": { "axios": "^0.21.1", "client-oauth2": "^4.2.5", + "concat-stream": "^2.0.0", "cron": "~1.7.2", "crypto-js": "~4.1.1", "fast-glob": "^3.2.5", diff --git a/packages/core/src/BinaryDataManager/FileSystem.ts b/packages/core/src/BinaryDataManager/FileSystem.ts index 360051ff03..ad72afdd6f 100644 --- a/packages/core/src/BinaryDataManager/FileSystem.ts +++ b/packages/core/src/BinaryDataManager/FileSystem.ts @@ -2,6 +2,7 @@ import fs from 'fs/promises'; import { jsonParse } from 'n8n-workflow'; import path from 'path'; import { v4 as uuid } from 'uuid'; +import type { Readable } from 'stream'; import { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; @@ -66,10 +67,10 @@ export class BinaryDataFileSystem implements IBinaryDataManager { return jsonParse(await fs.readFile(this.getMetadataPath(identifier), { encoding: 'utf-8' })); } - async storeBinaryData(binaryBuffer: Buffer, executionId: string): Promise { + async storeBinaryData(binaryData: Buffer | Readable, executionId: string): Promise { const binaryDataId = this.generateFileName(executionId); await this.addBinaryIdToPersistMeta(executionId, binaryDataId); - await this.saveToLocalStorage(binaryBuffer, binaryDataId); + await this.saveToLocalStorage(binaryData, binaryDataId); return binaryDataId; } @@ -234,8 +235,8 @@ export class BinaryDataFileSystem implements IBinaryDataManager { await fs.cp(source, this.getBinaryPath(identifier)); } - private async saveToLocalStorage(data: Buffer, identifier: string) { - await fs.writeFile(this.getBinaryPath(identifier), data); + private async saveToLocalStorage(binaryData: Buffer | Readable, identifier: string) { + await fs.writeFile(this.getBinaryPath(identifier), binaryData); } private async retrieveFromLocalStorage(identifier: string): Promise { diff --git a/packages/core/src/BinaryDataManager/index.ts b/packages/core/src/BinaryDataManager/index.ts index b9cfdc4537..14cbb6123e 100644 --- a/packages/core/src/BinaryDataManager/index.ts +++ b/packages/core/src/BinaryDataManager/index.ts @@ -1,9 +1,11 @@ -import prettyBytes from 'pretty-bytes'; +import concatStream from 'concat-stream'; +import { readFile, stat } from 'fs/promises'; import type { IBinaryData, INodeExecutionData } from 'n8n-workflow'; +import prettyBytes from 'pretty-bytes'; +import type { Readable } from 'stream'; import { BINARY_ENCODING } from '../Constants'; import type { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; import { BinaryDataFileSystem } from './FileSystem'; -import { readFile, stat } from 'fs/promises'; export class BinaryDataManager { static instance: BinaryDataManager | undefined; @@ -79,29 +81,35 @@ export class BinaryDataManager { async storeBinaryData( binaryData: IBinaryData, - binaryBuffer: Buffer, + input: Buffer | Readable, executionId: string, ): Promise { - binaryData.fileSize = prettyBytes(binaryBuffer.length); - // If a manager handles this binary, return the binary data with its reference id. const manager = this.managers[this.binaryDataMode]; if (manager) { - const identifier = await manager.storeBinaryData(binaryBuffer, executionId); + const identifier = await manager.storeBinaryData(input, executionId); // Add data manager reference id. binaryData.id = this.generateBinaryId(identifier); // Prevent preserving data in memory if handled by a data manager. binaryData.data = this.binaryDataMode; + const fileSize = await manager.getFileSize(identifier); + binaryData.fileSize = prettyBytes(fileSize); + await manager.storeBinaryMetadata(identifier, { fileName: binaryData.fileName, mimeType: binaryData.mimeType, - fileSize: binaryBuffer.length, + fileSize, }); } else { // Else fallback to storing this data in memory. - binaryData.data = binaryBuffer.toString(BINARY_ENCODING); + const buffer = await new Promise((resolve) => { + if (Buffer.isBuffer(input)) resolve(input); + else input.pipe(concatStream(resolve)); + }); + binaryData.data = buffer.toString(BINARY_ENCODING); + binaryData.fileSize = prettyBytes(buffer.length); } return binaryData; diff --git a/packages/core/src/Interfaces.ts b/packages/core/src/Interfaces.ts index b985fbad42..a49a8d520b 100644 --- a/packages/core/src/Interfaces.ts +++ b/packages/core/src/Interfaces.ts @@ -1,4 +1,5 @@ -import { +import type { Readable } from 'stream'; +import type { IPollResponse, ITriggerResponse, IWorkflowSettings as IWorkflowSettingsWorkflow, @@ -67,7 +68,7 @@ export interface IBinaryDataManager { copyBinaryFile(filePath: string, executionId: string): Promise; storeBinaryMetadata(identifier: string, metadata: BinaryMetadata): Promise; getBinaryMetadata(identifier: string): Promise; - storeBinaryData(binaryBuffer: Buffer, executionId: string): Promise; + storeBinaryData(binaryData: Buffer | Readable, executionId: string): Promise; retrieveBinaryDataByIdentifier(identifier: string): Promise; getBinaryPath(identifier: string): string; markDataForDeletionByExecutionId(executionId: string): Promise; diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 767bad85c2..3552eec10f 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -91,6 +91,8 @@ import axios, { Method, } from 'axios'; import url, { URL, URLSearchParams } from 'url'; +import type { Readable } from 'stream'; + import { BinaryDataManager } from './BinaryDataManager'; import type { IResponseError, IWorkflowSettings } from './Interfaces'; import { extractValue } from './ExtractValue'; @@ -840,12 +842,12 @@ export async function getBinaryDataBuffer( * * @export * @param {IBinaryData} data - * @param {Buffer} binaryData + * @param {Buffer | Readable} binaryData * @returns {Promise} */ export async function setBinaryDataBuffer( data: IBinaryData, - binaryData: Buffer, + binaryData: Buffer | Readable, executionId: string, ): Promise { return BinaryDataManager.getInstance().storeBinaryData(data, binaryData, executionId); @@ -907,7 +909,7 @@ export async function copyBinaryFile( * base64 and adds metadata. */ async function prepareBinaryData( - binaryData: Buffer, + binaryData: Buffer | Readable, executionId: string, filePath?: string, mimeType?: string, @@ -924,7 +926,8 @@ async function prepareBinaryData( } } - if (!mimeType) { + // TODO: detect filetype from streams + if (!mimeType && Buffer.isBuffer(binaryData)) { // Use buffer to guess mime type const fileTypeData = await FileType.fromBuffer(binaryData); if (fileTypeData) { diff --git a/packages/nodes-base/nodes/ReadBinaryFile/ReadBinaryFile.node.ts b/packages/nodes-base/nodes/ReadBinaryFile/ReadBinaryFile.node.ts index 22f7b45bb7..7494b5766c 100644 --- a/packages/nodes-base/nodes/ReadBinaryFile/ReadBinaryFile.node.ts +++ b/packages/nodes-base/nodes/ReadBinaryFile/ReadBinaryFile.node.ts @@ -6,7 +6,7 @@ import { NodeOperationError, } from 'n8n-workflow'; -import { readFile as fsReadFile } from 'fs/promises'; +import { createReadStream } from 'fs'; export class ReadBinaryFile implements INodeType { description: INodeTypeDescription = { @@ -58,7 +58,7 @@ export class ReadBinaryFile implements INodeType { let data; try { - data = await fsReadFile(filePath); + data = createReadStream(filePath); } catch (error) { if (error.code === 'ENOENT') { throw new NodeOperationError( diff --git a/packages/nodes-base/nodes/ReadBinaryFiles/ReadBinaryFiles.node.ts b/packages/nodes-base/nodes/ReadBinaryFiles/ReadBinaryFiles.node.ts index 718440bee3..9528970439 100644 --- a/packages/nodes-base/nodes/ReadBinaryFiles/ReadBinaryFiles.node.ts +++ b/packages/nodes-base/nodes/ReadBinaryFiles/ReadBinaryFiles.node.ts @@ -1,8 +1,8 @@ import { IExecuteFunctions } from 'n8n-core'; import { INodeExecutionData, INodeType, INodeTypeDescription } from 'n8n-workflow'; import glob from 'fast-glob'; - -import { readFile as fsReadFile } from 'fs/promises'; +import { createReadStream } from 'fs'; +import type { Readable } from 'stream'; export class ReadBinaryFiles implements INodeType { description: INodeTypeDescription = { @@ -47,9 +47,9 @@ export class ReadBinaryFiles implements INodeType { const items: INodeExecutionData[] = []; let item: INodeExecutionData; - let data: Buffer; + let data: Readable; for (const filePath of files) { - data = await fsReadFile(filePath); + data = createReadStream(filePath); item = { binary: { diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index f91ed4b737..936e2a7ec6 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -3,6 +3,7 @@ import type * as express from 'express'; import type * as FormData from 'form-data'; import type { IncomingHttpHeaders } from 'http'; +import type { Readable } from 'stream'; import type { URLSearchParams } from 'url'; import type { OptionsWithUri, OptionsWithUrl } from 'request'; import type { RequestPromiseOptions, RequestPromiseAPI } from 'request-promise-native'; @@ -633,7 +634,11 @@ export interface JsonHelperFunctions { } export interface BinaryHelperFunctions { - prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise; + prepareBinaryData( + binaryData: Buffer | Readable, + filePath?: string, + mimeType?: string, + ): Promise; setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise; copyBinaryFile(filePath: string, fileName: string, mimeType?: string): Promise; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9f87421aea..deeb687216 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -331,6 +331,7 @@ importers: packages/core: specifiers: + '@types/concat-stream': ^2.0.0 '@types/cron': ~1.7.1 '@types/crypto-js': ^4.0.1 '@types/express': ^4.17.6 @@ -340,6 +341,7 @@ importers: '@types/uuid': ^8.3.2 axios: ^0.21.1 client-oauth2: ^4.2.5 + concat-stream: ^2.0.0 cron: ~1.7.2 crypto-js: ~4.1.1 fast-glob: ^3.2.5 @@ -359,6 +361,7 @@ importers: dependencies: axios: 0.21.4 client-oauth2: 4.3.3 + concat-stream: 2.0.0 cron: 1.7.2 crypto-js: 4.1.1 fast-glob: 3.2.12 @@ -376,6 +379,7 @@ importers: request-promise-native: 1.0.9_request@2.88.2 uuid: 8.3.2 devDependencies: + '@types/concat-stream': 2.0.0 '@types/cron': 1.7.3 '@types/crypto-js': 4.1.1 '@types/express': 4.17.14 @@ -5554,6 +5558,12 @@ packages: '@types/express': 4.17.14 dev: true + /@types/concat-stream/2.0.0: + resolution: {integrity: sha512-t3YCerNM7NTVjLuICZo5gYAXYoDvpuuTceCcFQWcDQz26kxUR5uIWolxbIR5jRNIXpMqhOpW/b8imCR1LEmuJw==} + dependencies: + '@types/node': 16.11.65 + dev: true + /@types/connect-history-api-fallback/1.3.5: resolution: {integrity: sha512-h8QJa8xSb1WD4fpKBDcATDNGXghFj6/3GRWG6dhmRcu0RX1Ubasur2Uvx5aeEwlf0MwblEC2bMzzMQntxnw/Cw==} dependencies: