refactor(Read Binary File Node): Use node streams for to reduce memory usage (#5069)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-01-02 17:07:10 +01:00 committed by GitHub
parent a455cce7e6
commit 8bee04cd2a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 55 additions and 25 deletions

View file

@ -34,6 +34,7 @@
"bin" "bin"
], ],
"devDependencies": { "devDependencies": {
"@types/concat-stream": "^2.0.0",
"@types/cron": "~1.7.1", "@types/cron": "~1.7.1",
"@types/crypto-js": "^4.0.1", "@types/crypto-js": "^4.0.1",
"@types/express": "^4.17.6", "@types/express": "^4.17.6",
@ -45,6 +46,7 @@
"dependencies": { "dependencies": {
"axios": "^0.21.1", "axios": "^0.21.1",
"client-oauth2": "^4.2.5", "client-oauth2": "^4.2.5",
"concat-stream": "^2.0.0",
"cron": "~1.7.2", "cron": "~1.7.2",
"crypto-js": "~4.1.1", "crypto-js": "~4.1.1",
"fast-glob": "^3.2.5", "fast-glob": "^3.2.5",

View file

@ -2,6 +2,7 @@ import fs from 'fs/promises';
import { jsonParse } from 'n8n-workflow'; import { jsonParse } from 'n8n-workflow';
import path from 'path'; import path from 'path';
import { v4 as uuid } from 'uuid'; import { v4 as uuid } from 'uuid';
import type { Readable } from 'stream';
import { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; import { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';
@ -66,10 +67,10 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
return jsonParse(await fs.readFile(this.getMetadataPath(identifier), { encoding: 'utf-8' })); return jsonParse(await fs.readFile(this.getMetadataPath(identifier), { encoding: 'utf-8' }));
} }
async storeBinaryData(binaryBuffer: Buffer, executionId: string): Promise<string> { async storeBinaryData(binaryData: Buffer | Readable, executionId: string): Promise<string> {
const binaryDataId = this.generateFileName(executionId); const binaryDataId = this.generateFileName(executionId);
await this.addBinaryIdToPersistMeta(executionId, binaryDataId); await this.addBinaryIdToPersistMeta(executionId, binaryDataId);
await this.saveToLocalStorage(binaryBuffer, binaryDataId); await this.saveToLocalStorage(binaryData, binaryDataId);
return binaryDataId; return binaryDataId;
} }
@ -234,8 +235,8 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
await fs.cp(source, this.getBinaryPath(identifier)); await fs.cp(source, this.getBinaryPath(identifier));
} }
private async saveToLocalStorage(data: Buffer, identifier: string) { private async saveToLocalStorage(binaryData: Buffer | Readable, identifier: string) {
await fs.writeFile(this.getBinaryPath(identifier), data); await fs.writeFile(this.getBinaryPath(identifier), binaryData);
} }
private async retrieveFromLocalStorage(identifier: string): Promise<Buffer> { private async retrieveFromLocalStorage(identifier: string): Promise<Buffer> {

View file

@ -1,9 +1,11 @@
import prettyBytes from 'pretty-bytes'; import concatStream from 'concat-stream';
import { readFile, stat } from 'fs/promises';
import type { IBinaryData, INodeExecutionData } from 'n8n-workflow'; import type { IBinaryData, INodeExecutionData } from 'n8n-workflow';
import prettyBytes from 'pretty-bytes';
import type { Readable } from 'stream';
import { BINARY_ENCODING } from '../Constants'; import { BINARY_ENCODING } from '../Constants';
import type { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; import type { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';
import { BinaryDataFileSystem } from './FileSystem'; import { BinaryDataFileSystem } from './FileSystem';
import { readFile, stat } from 'fs/promises';
export class BinaryDataManager { export class BinaryDataManager {
static instance: BinaryDataManager | undefined; static instance: BinaryDataManager | undefined;
@ -79,29 +81,35 @@ export class BinaryDataManager {
async storeBinaryData( async storeBinaryData(
binaryData: IBinaryData, binaryData: IBinaryData,
binaryBuffer: Buffer, input: Buffer | Readable,
executionId: string, executionId: string,
): Promise<IBinaryData> { ): Promise<IBinaryData> {
binaryData.fileSize = prettyBytes(binaryBuffer.length);
// If a manager handles this binary, return the binary data with its reference id. // If a manager handles this binary, return the binary data with its reference id.
const manager = this.managers[this.binaryDataMode]; const manager = this.managers[this.binaryDataMode];
if (manager) { if (manager) {
const identifier = await manager.storeBinaryData(binaryBuffer, executionId); const identifier = await manager.storeBinaryData(input, executionId);
// Add data manager reference id. // Add data manager reference id.
binaryData.id = this.generateBinaryId(identifier); binaryData.id = this.generateBinaryId(identifier);
// Prevent preserving data in memory if handled by a data manager. // Prevent preserving data in memory if handled by a data manager.
binaryData.data = this.binaryDataMode; binaryData.data = this.binaryDataMode;
const fileSize = await manager.getFileSize(identifier);
binaryData.fileSize = prettyBytes(fileSize);
await manager.storeBinaryMetadata(identifier, { await manager.storeBinaryMetadata(identifier, {
fileName: binaryData.fileName, fileName: binaryData.fileName,
mimeType: binaryData.mimeType, mimeType: binaryData.mimeType,
fileSize: binaryBuffer.length, fileSize,
}); });
} else { } else {
// Else fallback to storing this data in memory. // Else fallback to storing this data in memory.
binaryData.data = binaryBuffer.toString(BINARY_ENCODING); const buffer = await new Promise<Buffer>((resolve) => {
if (Buffer.isBuffer(input)) resolve(input);
else input.pipe(concatStream(resolve));
});
binaryData.data = buffer.toString(BINARY_ENCODING);
binaryData.fileSize = prettyBytes(buffer.length);
} }
return binaryData; return binaryData;

View file

@ -1,4 +1,5 @@
import { import type { Readable } from 'stream';
import type {
IPollResponse, IPollResponse,
ITriggerResponse, ITriggerResponse,
IWorkflowSettings as IWorkflowSettingsWorkflow, IWorkflowSettings as IWorkflowSettingsWorkflow,
@ -67,7 +68,7 @@ export interface IBinaryDataManager {
copyBinaryFile(filePath: string, executionId: string): Promise<string>; copyBinaryFile(filePath: string, executionId: string): Promise<string>;
storeBinaryMetadata(identifier: string, metadata: BinaryMetadata): Promise<void>; storeBinaryMetadata(identifier: string, metadata: BinaryMetadata): Promise<void>;
getBinaryMetadata(identifier: string): Promise<BinaryMetadata>; getBinaryMetadata(identifier: string): Promise<BinaryMetadata>;
storeBinaryData(binaryBuffer: Buffer, executionId: string): Promise<string>; storeBinaryData(binaryData: Buffer | Readable, executionId: string): Promise<string>;
retrieveBinaryDataByIdentifier(identifier: string): Promise<Buffer>; retrieveBinaryDataByIdentifier(identifier: string): Promise<Buffer>;
getBinaryPath(identifier: string): string; getBinaryPath(identifier: string): string;
markDataForDeletionByExecutionId(executionId: string): Promise<void>; markDataForDeletionByExecutionId(executionId: string): Promise<void>;

View file

@ -91,6 +91,8 @@ import axios, {
Method, Method,
} from 'axios'; } from 'axios';
import url, { URL, URLSearchParams } from 'url'; import url, { URL, URLSearchParams } from 'url';
import type { Readable } from 'stream';
import { BinaryDataManager } from './BinaryDataManager'; import { BinaryDataManager } from './BinaryDataManager';
import type { IResponseError, IWorkflowSettings } from './Interfaces'; import type { IResponseError, IWorkflowSettings } from './Interfaces';
import { extractValue } from './ExtractValue'; import { extractValue } from './ExtractValue';
@ -840,12 +842,12 @@ export async function getBinaryDataBuffer(
* *
* @export * @export
* @param {IBinaryData} data * @param {IBinaryData} data
* @param {Buffer} binaryData * @param {Buffer | Readable} binaryData
* @returns {Promise<IBinaryData>} * @returns {Promise<IBinaryData>}
*/ */
export async function setBinaryDataBuffer( export async function setBinaryDataBuffer(
data: IBinaryData, data: IBinaryData,
binaryData: Buffer, binaryData: Buffer | Readable,
executionId: string, executionId: string,
): Promise<IBinaryData> { ): Promise<IBinaryData> {
return BinaryDataManager.getInstance().storeBinaryData(data, binaryData, executionId); return BinaryDataManager.getInstance().storeBinaryData(data, binaryData, executionId);
@ -907,7 +909,7 @@ export async function copyBinaryFile(
* base64 and adds metadata. * base64 and adds metadata.
*/ */
async function prepareBinaryData( async function prepareBinaryData(
binaryData: Buffer, binaryData: Buffer | Readable,
executionId: string, executionId: string,
filePath?: string, filePath?: string,
mimeType?: string, mimeType?: string,
@ -924,7 +926,8 @@ async function prepareBinaryData(
} }
} }
if (!mimeType) { // TODO: detect filetype from streams
if (!mimeType && Buffer.isBuffer(binaryData)) {
// Use buffer to guess mime type // Use buffer to guess mime type
const fileTypeData = await FileType.fromBuffer(binaryData); const fileTypeData = await FileType.fromBuffer(binaryData);
if (fileTypeData) { if (fileTypeData) {

View file

@ -6,7 +6,7 @@ import {
NodeOperationError, NodeOperationError,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { readFile as fsReadFile } from 'fs/promises'; import { createReadStream } from 'fs';
export class ReadBinaryFile implements INodeType { export class ReadBinaryFile implements INodeType {
description: INodeTypeDescription = { description: INodeTypeDescription = {
@ -58,7 +58,7 @@ export class ReadBinaryFile implements INodeType {
let data; let data;
try { try {
data = await fsReadFile(filePath); data = createReadStream(filePath);
} catch (error) { } catch (error) {
if (error.code === 'ENOENT') { if (error.code === 'ENOENT') {
throw new NodeOperationError( throw new NodeOperationError(

View file

@ -1,8 +1,8 @@
import { IExecuteFunctions } from 'n8n-core'; import { IExecuteFunctions } from 'n8n-core';
import { INodeExecutionData, INodeType, INodeTypeDescription } from 'n8n-workflow'; import { INodeExecutionData, INodeType, INodeTypeDescription } from 'n8n-workflow';
import glob from 'fast-glob'; import glob from 'fast-glob';
import { createReadStream } from 'fs';
import { readFile as fsReadFile } from 'fs/promises'; import type { Readable } from 'stream';
export class ReadBinaryFiles implements INodeType { export class ReadBinaryFiles implements INodeType {
description: INodeTypeDescription = { description: INodeTypeDescription = {
@ -47,9 +47,9 @@ export class ReadBinaryFiles implements INodeType {
const items: INodeExecutionData[] = []; const items: INodeExecutionData[] = [];
let item: INodeExecutionData; let item: INodeExecutionData;
let data: Buffer; let data: Readable;
for (const filePath of files) { for (const filePath of files) {
data = await fsReadFile(filePath); data = createReadStream(filePath);
item = { item = {
binary: { binary: {

View file

@ -3,6 +3,7 @@
import type * as express from 'express'; import type * as express from 'express';
import type * as FormData from 'form-data'; import type * as FormData from 'form-data';
import type { IncomingHttpHeaders } from 'http'; import type { IncomingHttpHeaders } from 'http';
import type { Readable } from 'stream';
import type { URLSearchParams } from 'url'; import type { URLSearchParams } from 'url';
import type { OptionsWithUri, OptionsWithUrl } from 'request'; import type { OptionsWithUri, OptionsWithUrl } from 'request';
import type { RequestPromiseOptions, RequestPromiseAPI } from 'request-promise-native'; import type { RequestPromiseOptions, RequestPromiseAPI } from 'request-promise-native';
@ -633,7 +634,11 @@ export interface JsonHelperFunctions {
} }
export interface BinaryHelperFunctions { export interface BinaryHelperFunctions {
prepareBinaryData(binaryData: Buffer, filePath?: string, mimeType?: string): Promise<IBinaryData>; prepareBinaryData(
binaryData: Buffer | Readable,
filePath?: string,
mimeType?: string,
): Promise<IBinaryData>;
setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData>; setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData>;
copyBinaryFile(filePath: string, fileName: string, mimeType?: string): Promise<IBinaryData>; copyBinaryFile(filePath: string, fileName: string, mimeType?: string): Promise<IBinaryData>;
} }

View file

@ -331,6 +331,7 @@ importers:
packages/core: packages/core:
specifiers: specifiers:
'@types/concat-stream': ^2.0.0
'@types/cron': ~1.7.1 '@types/cron': ~1.7.1
'@types/crypto-js': ^4.0.1 '@types/crypto-js': ^4.0.1
'@types/express': ^4.17.6 '@types/express': ^4.17.6
@ -340,6 +341,7 @@ importers:
'@types/uuid': ^8.3.2 '@types/uuid': ^8.3.2
axios: ^0.21.1 axios: ^0.21.1
client-oauth2: ^4.2.5 client-oauth2: ^4.2.5
concat-stream: ^2.0.0
cron: ~1.7.2 cron: ~1.7.2
crypto-js: ~4.1.1 crypto-js: ~4.1.1
fast-glob: ^3.2.5 fast-glob: ^3.2.5
@ -359,6 +361,7 @@ importers:
dependencies: dependencies:
axios: 0.21.4 axios: 0.21.4
client-oauth2: 4.3.3 client-oauth2: 4.3.3
concat-stream: 2.0.0
cron: 1.7.2 cron: 1.7.2
crypto-js: 4.1.1 crypto-js: 4.1.1
fast-glob: 3.2.12 fast-glob: 3.2.12
@ -376,6 +379,7 @@ importers:
request-promise-native: 1.0.9_request@2.88.2 request-promise-native: 1.0.9_request@2.88.2
uuid: 8.3.2 uuid: 8.3.2
devDependencies: devDependencies:
'@types/concat-stream': 2.0.0
'@types/cron': 1.7.3 '@types/cron': 1.7.3
'@types/crypto-js': 4.1.1 '@types/crypto-js': 4.1.1
'@types/express': 4.17.14 '@types/express': 4.17.14
@ -5554,6 +5558,12 @@ packages:
'@types/express': 4.17.14 '@types/express': 4.17.14
dev: true dev: true
/@types/concat-stream/2.0.0:
resolution: {integrity: sha512-t3YCerNM7NTVjLuICZo5gYAXYoDvpuuTceCcFQWcDQz26kxUR5uIWolxbIR5jRNIXpMqhOpW/b8imCR1LEmuJw==}
dependencies:
'@types/node': 16.11.65
dev: true
/@types/connect-history-api-fallback/1.3.5: /@types/connect-history-api-fallback/1.3.5:
resolution: {integrity: sha512-h8QJa8xSb1WD4fpKBDcATDNGXghFj6/3GRWG6dhmRcu0RX1Ubasur2Uvx5aeEwlf0MwblEC2bMzzMQntxnw/Cw==} resolution: {integrity: sha512-h8QJa8xSb1WD4fpKBDcATDNGXghFj6/3GRWG6dhmRcu0RX1Ubasur2Uvx5aeEwlf0MwblEC2bMzzMQntxnw/Cw==}
dependencies: dependencies: