From 54126b2c87a8d622b69a13db3de1687f05051522 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Wed, 4 Jan 2023 12:29:56 +0100 Subject: [PATCH] refactor(Google Drive Node): Use node streams for uploading and downloading files (#5017) * use streams to upload files to google drive * use streams to download files from google drive * use resumable uploads api for google drive * avoid dangling promises, and reduce memory usage in error logging --- .../core/src/BinaryDataManager/FileSystem.ts | 9 +- packages/core/src/BinaryDataManager/index.ts | 14 +- packages/core/src/Interfaces.ts | 8 +- packages/core/src/NodeExecuteFunctions.ts | 193 ++++++++++-------- .../nodes/Google/Drive/GoogleDrive.node.ts | 156 ++++++++------ packages/workflow/src/Interfaces.ts | 12 +- 6 files changed, 229 insertions(+), 163 deletions(-) diff --git a/packages/core/src/BinaryDataManager/FileSystem.ts b/packages/core/src/BinaryDataManager/FileSystem.ts index ad72afdd6f..5aeebec05b 100644 --- a/packages/core/src/BinaryDataManager/FileSystem.ts +++ b/packages/core/src/BinaryDataManager/FileSystem.ts @@ -1,10 +1,11 @@ +import { createReadStream } from 'fs'; import fs from 'fs/promises'; -import { jsonParse } from 'n8n-workflow'; import path from 'path'; import { v4 as uuid } from 'uuid'; import type { Readable } from 'stream'; +import { BinaryMetadata, jsonParse } from 'n8n-workflow'; -import { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; +import { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; const PREFIX_METAFILE = 'binarymeta'; const PREFIX_PERSISTED_METAFILE = 'persistedmeta'; @@ -74,6 +75,10 @@ export class BinaryDataFileSystem implements IBinaryDataManager { return binaryDataId; } + getBinaryStream(identifier: string, chunkSize?: number): Readable { + return createReadStream(this.getBinaryPath(identifier), { highWaterMark: chunkSize }); + } + async retrieveBinaryDataByIdentifier(identifier: string): Promise { return this.retrieveFromLocalStorage(identifier); } diff --git a/packages/core/src/BinaryDataManager/index.ts b/packages/core/src/BinaryDataManager/index.ts index 14cbb6123e..a422ad4003 100644 --- a/packages/core/src/BinaryDataManager/index.ts +++ b/packages/core/src/BinaryDataManager/index.ts @@ -1,10 +1,10 @@ import concatStream from 'concat-stream'; import { readFile, stat } from 'fs/promises'; -import type { IBinaryData, INodeExecutionData } from 'n8n-workflow'; +import type { BinaryMetadata, IBinaryData, INodeExecutionData } from 'n8n-workflow'; import prettyBytes from 'pretty-bytes'; import type { Readable } from 'stream'; import { BINARY_ENCODING } from '../Constants'; -import type { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; +import type { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces'; import { BinaryDataFileSystem } from './FileSystem'; export class BinaryDataManager { @@ -88,6 +88,7 @@ export class BinaryDataManager { const manager = this.managers[this.binaryDataMode]; if (manager) { const identifier = await manager.storeBinaryData(input, executionId); + // Add data manager reference id. binaryData.id = this.generateBinaryId(identifier); @@ -115,6 +116,15 @@ export class BinaryDataManager { return binaryData; } + getBinaryStream(identifier: string, chunkSize?: number): Readable { + const { mode, id } = this.splitBinaryModeFileId(identifier); + if (this.managers[mode]) { + return this.managers[mode].getBinaryStream(id, chunkSize); + } + + throw new Error('Storage mode used to store binary data not available'); + } + async retrieveBinaryData(binaryData: IBinaryData): Promise { if (binaryData.id) { return this.retrieveBinaryDataByIdentifier(binaryData.id); diff --git a/packages/core/src/Interfaces.ts b/packages/core/src/Interfaces.ts index a49a8d520b..f9e2aec74c 100644 --- a/packages/core/src/Interfaces.ts +++ b/packages/core/src/Interfaces.ts @@ -10,6 +10,7 @@ import type { IPollFunctions as IPollFunctionsBase, ITriggerFunctions as ITriggerFunctionsBase, IWebhookFunctions as IWebhookFunctionsBase, + BinaryMetadata, } from 'n8n-workflow'; // TODO: remove these after removing `n8n-core` dependency from `nodes-bases` @@ -56,12 +57,6 @@ export interface IBinaryDataConfig { persistedBinaryDataTTL: number; } -export interface BinaryMetadata { - fileName?: string; - mimeType?: string; - fileSize: number; -} - export interface IBinaryDataManager { init(startPurger: boolean): Promise; getFileSize(filePath: string): Promise; @@ -71,6 +66,7 @@ export interface IBinaryDataManager { storeBinaryData(binaryData: Buffer | Readable, executionId: string): Promise; retrieveBinaryDataByIdentifier(identifier: string): Promise; getBinaryPath(identifier: string): string; + getBinaryStream(identifier: string, chunkSize?: number): Readable; markDataForDeletionByExecutionId(executionId: string): Promise; deleteMarkedFiles(): Promise; deleteBinaryDataByIdentifier(identifier: string): Promise; diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 3552eec10f..a7ff2a6765 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -66,6 +66,7 @@ import { IPollFunctions, ITriggerFunctions, IWebhookFunctions, + BinaryMetadata, } from 'n8n-workflow'; import { Agent } from 'https'; @@ -463,7 +464,9 @@ async function parseRequestObject(requestObject: IDataObject) { } } - if (requestObject.encoding === null) { + if (requestObject.useStream) { + axiosConfig.responseType = 'stream'; + } else if (requestObject.encoding === null) { // When downloading files, return an arrayBuffer. axiosConfig.responseType = 'arraybuffer'; } @@ -519,7 +522,7 @@ function digestAuthAxiosConfig( const realm: string = authDetails .find((el: any) => el[0].toLowerCase().indexOf('realm') > -1)[1] .replace(/"/g, ''); - // If authDeatials does not have opaque, we should not add it to authorization. + // If authDetails does not have opaque, we should not add it to authorization. const opaqueKV = authDetails.find((el: any) => el[0].toLowerCase().indexOf('opaque') > -1); const opaque: string = opaqueKV ? opaqueKV[1].replace(/"/g, '') : undefined; const nonce: string = authDetails @@ -576,7 +579,7 @@ async function proxyRequestToAxios( maxBodyLength: Infinity, maxContentLength: Infinity, }; - let axiosPromise: AxiosPromise; + type ConfigObject = { auth?: { sendImmediately: boolean }; resolveWithFullResponse?: boolean; @@ -602,107 +605,102 @@ async function proxyRequestToAxios( // } ); + let requestFn: () => AxiosPromise; if (configObject.auth?.sendImmediately === false) { // for digest-auth - const { auth } = axiosConfig; - delete axiosConfig.auth; - // eslint-disable-next-line no-async-promise-executor - axiosPromise = new Promise(async (resolve, reject) => { + requestFn = async () => { try { - const result = await axios(axiosConfig); - resolve(result); - } catch (resp: any) { - if ( - resp.response === undefined || - resp.response.status !== 401 || - !resp.response.headers['www-authenticate']?.includes('nonce') - ) { - reject(resp); + return await axios(axiosConfig); + } catch (error) { + const { response } = error; + if (response?.status !== 401 || !response.headers['www-authenticate']?.includes('nonce')) { + throw error; } - axiosConfig = digestAuthAxiosConfig(axiosConfig, resp.response, auth); - resolve(axios(axiosConfig)); + const { auth } = axiosConfig; + delete axiosConfig.auth; + axiosConfig = digestAuthAxiosConfig(axiosConfig, response, auth); + return await axios(axiosConfig); } - }); + }; } else { - axiosPromise = axios(axiosConfig); + requestFn = async () => axios(axiosConfig); } - return new Promise((resolve, reject) => { - axiosPromise - .then(async (response) => { - if (configObject.resolveWithFullResponse === true) { - let body = response.data; - if (response.data === '') { - if (axiosConfig.responseType === 'arraybuffer') { - body = Buffer.alloc(0); - } else { - body = undefined; - } - } - await additionalData.hooks?.executeHookFunctions('nodeFetchedData', [workflow.id, node]); - resolve({ - body, - headers: response.headers, - statusCode: response.status, - statusMessage: response.statusText, - request: response.request, - }); + try { + const response = await requestFn(); + if (configObject.resolveWithFullResponse === true) { + let body = response.data; + if (response.data === '') { + if (axiosConfig.responseType === 'arraybuffer') { + body = Buffer.alloc(0); } else { - let body = response.data; - if (response.data === '') { - if (axiosConfig.responseType === 'arraybuffer') { - body = Buffer.alloc(0); - } else { - body = undefined; - } - } - await additionalData.hooks?.executeHookFunctions('nodeFetchedData', [workflow.id, node]); - resolve(body); + body = undefined; } - }) - .catch((error) => { - if (configObject.simple === false && error.response) { - if (configObject.resolveWithFullResponse) { - resolve({ - body: error.response.data, - headers: error.response.headers, - statusCode: error.response.status, - statusMessage: error.response.statusText, - }); - } else { - resolve(error.response.data); - } - return; + } + await additionalData.hooks?.executeHookFunctions('nodeFetchedData', [workflow.id, node]); + return { + body, + headers: response.headers, + statusCode: response.status, + statusMessage: response.statusText, + request: response.request, + }; + } else { + let body = response.data; + if (response.data === '') { + if (axiosConfig.responseType === 'arraybuffer') { + body = Buffer.alloc(0); + } else { + body = undefined; } + } + await additionalData.hooks?.executeHookFunctions('nodeFetchedData', [workflow.id, node]); + return body; + } + } catch (error) { + const { request, response, isAxiosError, toJSON, config, ...errorData } = error; + if (configObject.simple === false && response) { + if (configObject.resolveWithFullResponse) { + return { + body: response.data, + headers: response.headers, + statusCode: response.status, + statusMessage: response.statusText, + }; + } else { + return response.data; + } + } - Logger.debug('Request proxied to Axios failed', { error }); + // Axios hydrates the original error with more data. We extract them. + // https://github.com/axios/axios/blob/master/lib/core/enhanceError.js + // Note: `code` is ignored as it's an expected part of the errorData. + if (response) { + Logger.debug('Request proxied to Axios failed', { status: response.status }); + let responseData = response.data; + if (Buffer.isBuffer(responseData)) { + responseData = responseData.toString('utf-8'); + } + error.message = `${response.status as number} - ${JSON.stringify(responseData)}`; + } - // Axios hydrates the original error with more data. We extract them. - // https://github.com/axios/axios/blob/master/lib/core/enhanceError.js - // Note: `code` is ignored as it's an expected part of the errorData. - const { request, response, isAxiosError, toJSON, config, ...errorData } = error; - if (response) { - error.message = `${response.status as number} - ${JSON.stringify(response.data)}`; - } + error.cause = errorData; + error.error = error.response?.data || errorData; + error.statusCode = error.response?.status; + error.options = config || {}; - error.cause = errorData; - error.error = error.response?.data || errorData; - error.statusCode = error.response?.status; - error.options = config || {}; + // Remove not needed data and so also remove circular references + error.request = undefined; + error.config = undefined; + error.options.adapter = undefined; + error.options.httpsAgent = undefined; + error.options.paramsSerializer = undefined; + error.options.transformRequest = undefined; + error.options.transformResponse = undefined; + error.options.validateStatus = undefined; - // Remove not needed data and so also remove circular references - error.request = undefined; - error.config = undefined; - error.options.adapter = undefined; - error.options.httpsAgent = undefined; - error.options.paramsSerializer = undefined; - error.options.transformRequest = undefined; - error.options.transformResponse = undefined; - error.options.validateStatus = undefined; - - reject(error); - }); - }); + throw error; + } } function isIterator(obj: unknown): boolean { @@ -823,9 +821,22 @@ async function httpRequest( return result.data; } +/** + * Returns binary file metadata + */ +export async function getBinaryMetadata(binaryDataId: string): Promise { + return BinaryDataManager.getInstance().getBinaryMetadata(binaryDataId); +} + +/** + * Returns binary file stream for piping + */ +export function getBinaryStream(binaryDataId: string, chunkSize?: number): Readable { + return BinaryDataManager.getInstance().getBinaryStream(binaryDataId, chunkSize); +} + /** * Returns binary data buffer for given item index and property name. - * */ export async function getBinaryDataBuffer( inputData: ITaskDataConnections, @@ -1989,6 +2000,8 @@ const getRequestHelperFunctions = ( const getBinaryHelperFunctions = ({ executionId, }: IWorkflowExecuteAdditionalData): BinaryHelperFunctions => ({ + getBinaryStream, + getBinaryMetadata, prepareBinaryData: async (binaryData, filePath, mimeType) => prepareBinaryData(binaryData, executionId!, filePath, mimeType), setBinaryDataBuffer: async (data, binaryData) => diff --git a/packages/nodes-base/nodes/Google/Drive/GoogleDrive.node.ts b/packages/nodes-base/nodes/Google/Drive/GoogleDrive.node.ts index ba28329334..bf2d999249 100644 --- a/packages/nodes-base/nodes/Google/Drive/GoogleDrive.node.ts +++ b/packages/nodes-base/nodes/Google/Drive/GoogleDrive.node.ts @@ -1,4 +1,4 @@ -import { IExecuteFunctions } from 'n8n-core'; +import { BINARY_ENCODING, IExecuteFunctions } from 'n8n-core'; import { IDataObject, @@ -13,6 +13,9 @@ import { import { googleApiRequest, googleApiRequestAllItems } from './GenericFunctions'; import { v4 as uuid } from 'uuid'; +import type { Readable } from 'stream'; + +const UPLOAD_CHUNK_SIZE = 256 * 1024; interface GoogleDriveFilesItem { id: string; @@ -2306,6 +2309,7 @@ export class GoogleDrive implements INodeType { const downloadOptions = this.getNodeParameter('options', i); const requestOptions = { + useStream: true, resolveWithFullResponse: true, encoding: null, json: false, @@ -2316,7 +2320,7 @@ export class GoogleDrive implements INodeType { 'GET', `/drive/v3/files/${fileId}`, {}, - { fields: 'mimeType', supportsTeamDrives: true }, + { fields: 'mimeType,name', supportsTeamDrives: true }, ); let response; @@ -2370,15 +2374,8 @@ export class GoogleDrive implements INodeType { ); } - let mimeType: string | undefined; - let fileName: string | undefined = undefined; - if (response.headers['content-type']) { - mimeType = response.headers['content-type']; - } - - if (downloadOptions.fileName) { - fileName = downloadOptions.fileName as string; - } + const mimeType = file.mimeType ?? response.headers['content-type'] ?? undefined; + const fileName = downloadOptions.fileName ?? file.name ?? undefined; const newItem: INodeExecutionData = { json: items[i].json, @@ -2400,10 +2397,8 @@ export class GoogleDrive implements INodeType { i, ) as string; - const data = Buffer.from(response.body as string); - items[i].binary![dataPropertyNameDownload] = await this.helpers.prepareBinaryData( - data as unknown as Buffer, + response.body as unknown as Readable, fileName, mimeType, ); @@ -2511,9 +2506,11 @@ export class GoogleDrive implements INodeType { // ---------------------------------- const resolveData = this.getNodeParameter('resolveData', 0); - let mimeType = 'text/plain'; - let body; + let contentLength: number; + let fileContent: Buffer | Readable; let originalFilename: string | undefined; + let mimeType = 'text/plain'; + if (this.getNodeParameter('binaryData', i)) { // Is binary file to upload const item = items[i]; @@ -2526,7 +2523,8 @@ export class GoogleDrive implements INodeType { const propertyNameUpload = this.getNodeParameter('binaryPropertyName', i) as string; - if (item.binary[propertyNameUpload] === undefined) { + const binary = item.binary[propertyNameUpload]; + if (binary === undefined) { throw new NodeOperationError( this.getNode(), `No binary data property "${propertyNameUpload}" does not exists on item!`, @@ -2534,48 +2532,86 @@ export class GoogleDrive implements INodeType { ); } - if (item.binary[propertyNameUpload].mimeType) { - mimeType = item.binary[propertyNameUpload].mimeType; + if (binary.id) { + // Stream data in 256KB chunks, and upload the via the resumable upload api + fileContent = this.helpers.getBinaryStream(binary.id, UPLOAD_CHUNK_SIZE); + const metadata = await this.helpers.getBinaryMetadata(binary.id); + contentLength = metadata.fileSize; + originalFilename = metadata.fileName; + if (metadata.mimeType) mimeType = binary.mimeType; + } else { + fileContent = Buffer.from(binary.data, BINARY_ENCODING); + contentLength = fileContent.length; + originalFilename = binary.fileName; + mimeType = binary.mimeType; } - - if (item.binary[propertyNameUpload].fileName) { - originalFilename = item.binary[propertyNameUpload].fileName; - } - - body = await this.helpers.getBinaryDataBuffer(i, propertyNameUpload); } else { // Is text file - body = Buffer.from(this.getNodeParameter('fileContent', i) as string, 'utf8'); + fileContent = Buffer.from(this.getNodeParameter('fileContent', i) as string, 'utf8'); + contentLength = fileContent.byteLength; } const name = this.getNodeParameter('name', i) as string; const parents = this.getNodeParameter('parents', i) as string[]; - let qs: IDataObject = { - fields: queryFields, - uploadType: 'media', - }; + let uploadId; + if (Buffer.isBuffer(fileContent)) { + const response = await googleApiRequest.call( + this, + 'POST', + '/upload/drive/v3/files', + fileContent, + { + fields: queryFields, + uploadType: 'media', + }, + undefined, + { + headers: { + 'Content-Type': mimeType, + 'Content-Length': contentLength, + }, + encoding: null, + json: false, + }, + ); + uploadId = JSON.parse(response).id; + } else { + const resumableUpload = await googleApiRequest.call( + this, + 'POST', + '/upload/drive/v3/files', + undefined, + { uploadType: 'resumable' }, + undefined, + { + resolveWithFullResponse: true, + }, + ); + const uploadUrl = resumableUpload.headers.location; - const requestOptions = { - headers: { - 'Content-Type': mimeType, - 'Content-Length': body.byteLength, - }, - encoding: null, - json: false, - }; + let offset = 0; + for await (const chunk of fileContent) { + const nextOffset = offset + chunk.length; + try { + const response = await this.helpers.httpRequest({ + method: 'PUT', + url: uploadUrl, + headers: { + 'Content-Length': chunk.length, + 'Content-Range': `bytes ${offset}-${nextOffset - 1}/${contentLength}`, + }, + body: chunk, + }); + uploadId = response.id; + } catch (error) { + if (error.response?.status !== 308) throw error; + } + offset = nextOffset; + } + } - let response = await googleApiRequest.call( - this, - 'POST', - '/upload/drive/v3/files', - body, - qs, - undefined, - requestOptions, - ); - - body = { + const requestBody = { mimeType, name, originalFilename, @@ -2588,7 +2624,7 @@ export class GoogleDrive implements INodeType { ) as IDataObject[]; if (properties.length) { - Object.assign(body, { + Object.assign(requestBody, { properties: properties.reduce( (obj, value) => Object.assign(obj, { [`${value.key}`]: value.value }), {}, @@ -2603,7 +2639,7 @@ export class GoogleDrive implements INodeType { ) as IDataObject[]; if (properties.length) { - Object.assign(body, { + Object.assign(requestBody, { appProperties: appProperties.reduce( (obj, value) => Object.assign(obj, { [`${value.key}`]: value.value }), {}, @@ -2611,18 +2647,16 @@ export class GoogleDrive implements INodeType { }); } - qs = { - addParents: parents.join(','), - // When set to true shared drives can be used. - supportsAllDrives: true, - }; - - response = await googleApiRequest.call( + let response = await googleApiRequest.call( this, 'PATCH', - `/drive/v3/files/${JSON.parse(response).id}`, - body, - qs, + `/drive/v3/files/${uploadId}`, + requestBody, + { + addParents: parents.join(','), + // When set to true shared drives can be used. + supportsAllDrives: true, + }, ); if (resolveData) { diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index e8b990149c..b0f06bdfda 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -40,10 +40,16 @@ export interface IBinaryData { fileName?: string; directory?: string; fileExtension?: string; - fileSize?: string; + fileSize?: string; // TODO: change this to number and store the actual value id?: string; } +export interface BinaryMetadata { + fileName?: string; + mimeType?: string; + fileSize: number; +} + // All properties in this interface except for // "includeCredentialsOnRefreshOnBody" will get // removed once we add the OAuth2 hooks to the @@ -641,6 +647,9 @@ export interface BinaryHelperFunctions { ): Promise; setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise; copyBinaryFile(filePath: string, fileName: string, mimeType?: string): Promise; + + getBinaryStream(binaryDataId: string, chunkSize?: number): Readable; + getBinaryMetadata(binaryDataId: string): Promise; } export interface RequestHelperFunctions { @@ -721,7 +730,6 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn & inputData: INodeExecutionData[], options: { itemData: IPairedItemData | IPairedItemData[] }, ): NodeExecutionWithMetadata[]; - getBinaryDataBuffer(itemIndex: number, propertyName: string): Promise; }; };