refactor(Google Drive Node): Use node streams for uploading and downloading files (#5017)

* use streams to upload files to google drive

* use streams to download files from google drive

* use resumable uploads api for google drive

* avoid dangling promises, and reduce memory usage in error logging
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-01-04 12:29:56 +01:00 committed by GitHub
parent 8b19fdd5f0
commit 54126b2c87
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 229 additions and 163 deletions

View file

@ -1,10 +1,11 @@
import { createReadStream } from 'fs';
import fs from 'fs/promises';
import { jsonParse } from 'n8n-workflow';
import path from 'path';
import { v4 as uuid } from 'uuid';
import type { Readable } from 'stream';
import { BinaryMetadata, jsonParse } from 'n8n-workflow';
import { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';
import { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';
const PREFIX_METAFILE = 'binarymeta';
const PREFIX_PERSISTED_METAFILE = 'persistedmeta';
@ -74,6 +75,10 @@ export class BinaryDataFileSystem implements IBinaryDataManager {
return binaryDataId;
}
getBinaryStream(identifier: string, chunkSize?: number): Readable {
return createReadStream(this.getBinaryPath(identifier), { highWaterMark: chunkSize });
}
async retrieveBinaryDataByIdentifier(identifier: string): Promise<Buffer> {
return this.retrieveFromLocalStorage(identifier);
}

View file

@ -1,10 +1,10 @@
import concatStream from 'concat-stream';
import { readFile, stat } from 'fs/promises';
import type { IBinaryData, INodeExecutionData } from 'n8n-workflow';
import type { BinaryMetadata, IBinaryData, INodeExecutionData } from 'n8n-workflow';
import prettyBytes from 'pretty-bytes';
import type { Readable } from 'stream';
import { BINARY_ENCODING } from '../Constants';
import type { BinaryMetadata, IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';
import type { IBinaryDataConfig, IBinaryDataManager } from '../Interfaces';
import { BinaryDataFileSystem } from './FileSystem';
export class BinaryDataManager {
@ -88,6 +88,7 @@ export class BinaryDataManager {
const manager = this.managers[this.binaryDataMode];
if (manager) {
const identifier = await manager.storeBinaryData(input, executionId);
// Add data manager reference id.
binaryData.id = this.generateBinaryId(identifier);
@ -115,6 +116,15 @@ export class BinaryDataManager {
return binaryData;
}
getBinaryStream(identifier: string, chunkSize?: number): Readable {
const { mode, id } = this.splitBinaryModeFileId(identifier);
if (this.managers[mode]) {
return this.managers[mode].getBinaryStream(id, chunkSize);
}
throw new Error('Storage mode used to store binary data not available');
}
async retrieveBinaryData(binaryData: IBinaryData): Promise<Buffer> {
if (binaryData.id) {
return this.retrieveBinaryDataByIdentifier(binaryData.id);

View file

@ -10,6 +10,7 @@ import type {
IPollFunctions as IPollFunctionsBase,
ITriggerFunctions as ITriggerFunctionsBase,
IWebhookFunctions as IWebhookFunctionsBase,
BinaryMetadata,
} from 'n8n-workflow';
// TODO: remove these after removing `n8n-core` dependency from `nodes-bases`
@ -56,12 +57,6 @@ export interface IBinaryDataConfig {
persistedBinaryDataTTL: number;
}
export interface BinaryMetadata {
fileName?: string;
mimeType?: string;
fileSize: number;
}
export interface IBinaryDataManager {
init(startPurger: boolean): Promise<void>;
getFileSize(filePath: string): Promise<number>;
@ -71,6 +66,7 @@ export interface IBinaryDataManager {
storeBinaryData(binaryData: Buffer | Readable, executionId: string): Promise<string>;
retrieveBinaryDataByIdentifier(identifier: string): Promise<Buffer>;
getBinaryPath(identifier: string): string;
getBinaryStream(identifier: string, chunkSize?: number): Readable;
markDataForDeletionByExecutionId(executionId: string): Promise<void>;
deleteMarkedFiles(): Promise<unknown>;
deleteBinaryDataByIdentifier(identifier: string): Promise<void>;

View file

@ -66,6 +66,7 @@ import {
IPollFunctions,
ITriggerFunctions,
IWebhookFunctions,
BinaryMetadata,
} from 'n8n-workflow';
import { Agent } from 'https';
@ -463,7 +464,9 @@ async function parseRequestObject(requestObject: IDataObject) {
}
}
if (requestObject.encoding === null) {
if (requestObject.useStream) {
axiosConfig.responseType = 'stream';
} else if (requestObject.encoding === null) {
// When downloading files, return an arrayBuffer.
axiosConfig.responseType = 'arraybuffer';
}
@ -519,7 +522,7 @@ function digestAuthAxiosConfig(
const realm: string = authDetails
.find((el: any) => el[0].toLowerCase().indexOf('realm') > -1)[1]
.replace(/"/g, '');
// If authDeatials does not have opaque, we should not add it to authorization.
// If authDetails does not have opaque, we should not add it to authorization.
const opaqueKV = authDetails.find((el: any) => el[0].toLowerCase().indexOf('opaque') > -1);
const opaque: string = opaqueKV ? opaqueKV[1].replace(/"/g, '') : undefined;
const nonce: string = authDetails
@ -576,7 +579,7 @@ async function proxyRequestToAxios(
maxBodyLength: Infinity,
maxContentLength: Infinity,
};
let axiosPromise: AxiosPromise;
type ConfigObject = {
auth?: { sendImmediately: boolean };
resolveWithFullResponse?: boolean;
@ -602,107 +605,102 @@ async function proxyRequestToAxios(
// }
);
let requestFn: () => AxiosPromise;
if (configObject.auth?.sendImmediately === false) {
// for digest-auth
const { auth } = axiosConfig;
delete axiosConfig.auth;
// eslint-disable-next-line no-async-promise-executor
axiosPromise = new Promise(async (resolve, reject) => {
requestFn = async () => {
try {
const result = await axios(axiosConfig);
resolve(result);
} catch (resp: any) {
if (
resp.response === undefined ||
resp.response.status !== 401 ||
!resp.response.headers['www-authenticate']?.includes('nonce')
) {
reject(resp);
return await axios(axiosConfig);
} catch (error) {
const { response } = error;
if (response?.status !== 401 || !response.headers['www-authenticate']?.includes('nonce')) {
throw error;
}
axiosConfig = digestAuthAxiosConfig(axiosConfig, resp.response, auth);
resolve(axios(axiosConfig));
const { auth } = axiosConfig;
delete axiosConfig.auth;
axiosConfig = digestAuthAxiosConfig(axiosConfig, response, auth);
return await axios(axiosConfig);
}
});
};
} else {
axiosPromise = axios(axiosConfig);
requestFn = async () => axios(axiosConfig);
}
return new Promise((resolve, reject) => {
axiosPromise
.then(async (response) => {
if (configObject.resolveWithFullResponse === true) {
let body = response.data;
if (response.data === '') {
if (axiosConfig.responseType === 'arraybuffer') {
body = Buffer.alloc(0);
} else {
body = undefined;
}
}
await additionalData.hooks?.executeHookFunctions('nodeFetchedData', [workflow.id, node]);
resolve({
body,
headers: response.headers,
statusCode: response.status,
statusMessage: response.statusText,
request: response.request,
});
try {
const response = await requestFn();
if (configObject.resolveWithFullResponse === true) {
let body = response.data;
if (response.data === '') {
if (axiosConfig.responseType === 'arraybuffer') {
body = Buffer.alloc(0);
} else {
let body = response.data;
if (response.data === '') {
if (axiosConfig.responseType === 'arraybuffer') {
body = Buffer.alloc(0);
} else {
body = undefined;
}
}
await additionalData.hooks?.executeHookFunctions('nodeFetchedData', [workflow.id, node]);
resolve(body);
body = undefined;
}
})
.catch((error) => {
if (configObject.simple === false && error.response) {
if (configObject.resolveWithFullResponse) {
resolve({
body: error.response.data,
headers: error.response.headers,
statusCode: error.response.status,
statusMessage: error.response.statusText,
});
} else {
resolve(error.response.data);
}
return;
}
await additionalData.hooks?.executeHookFunctions('nodeFetchedData', [workflow.id, node]);
return {
body,
headers: response.headers,
statusCode: response.status,
statusMessage: response.statusText,
request: response.request,
};
} else {
let body = response.data;
if (response.data === '') {
if (axiosConfig.responseType === 'arraybuffer') {
body = Buffer.alloc(0);
} else {
body = undefined;
}
}
await additionalData.hooks?.executeHookFunctions('nodeFetchedData', [workflow.id, node]);
return body;
}
} catch (error) {
const { request, response, isAxiosError, toJSON, config, ...errorData } = error;
if (configObject.simple === false && response) {
if (configObject.resolveWithFullResponse) {
return {
body: response.data,
headers: response.headers,
statusCode: response.status,
statusMessage: response.statusText,
};
} else {
return response.data;
}
}
Logger.debug('Request proxied to Axios failed', { error });
// Axios hydrates the original error with more data. We extract them.
// https://github.com/axios/axios/blob/master/lib/core/enhanceError.js
// Note: `code` is ignored as it's an expected part of the errorData.
if (response) {
Logger.debug('Request proxied to Axios failed', { status: response.status });
let responseData = response.data;
if (Buffer.isBuffer(responseData)) {
responseData = responseData.toString('utf-8');
}
error.message = `${response.status as number} - ${JSON.stringify(responseData)}`;
}
// Axios hydrates the original error with more data. We extract them.
// https://github.com/axios/axios/blob/master/lib/core/enhanceError.js
// Note: `code` is ignored as it's an expected part of the errorData.
const { request, response, isAxiosError, toJSON, config, ...errorData } = error;
if (response) {
error.message = `${response.status as number} - ${JSON.stringify(response.data)}`;
}
error.cause = errorData;
error.error = error.response?.data || errorData;
error.statusCode = error.response?.status;
error.options = config || {};
error.cause = errorData;
error.error = error.response?.data || errorData;
error.statusCode = error.response?.status;
error.options = config || {};
// Remove not needed data and so also remove circular references
error.request = undefined;
error.config = undefined;
error.options.adapter = undefined;
error.options.httpsAgent = undefined;
error.options.paramsSerializer = undefined;
error.options.transformRequest = undefined;
error.options.transformResponse = undefined;
error.options.validateStatus = undefined;
// Remove not needed data and so also remove circular references
error.request = undefined;
error.config = undefined;
error.options.adapter = undefined;
error.options.httpsAgent = undefined;
error.options.paramsSerializer = undefined;
error.options.transformRequest = undefined;
error.options.transformResponse = undefined;
error.options.validateStatus = undefined;
reject(error);
});
});
throw error;
}
}
function isIterator(obj: unknown): boolean {
@ -823,9 +821,22 @@ async function httpRequest(
return result.data;
}
/**
* Returns binary file metadata
*/
export async function getBinaryMetadata(binaryDataId: string): Promise<BinaryMetadata> {
return BinaryDataManager.getInstance().getBinaryMetadata(binaryDataId);
}
/**
* Returns binary file stream for piping
*/
export function getBinaryStream(binaryDataId: string, chunkSize?: number): Readable {
return BinaryDataManager.getInstance().getBinaryStream(binaryDataId, chunkSize);
}
/**
* Returns binary data buffer for given item index and property name.
*
*/
export async function getBinaryDataBuffer(
inputData: ITaskDataConnections,
@ -1989,6 +2000,8 @@ const getRequestHelperFunctions = (
const getBinaryHelperFunctions = ({
executionId,
}: IWorkflowExecuteAdditionalData): BinaryHelperFunctions => ({
getBinaryStream,
getBinaryMetadata,
prepareBinaryData: async (binaryData, filePath, mimeType) =>
prepareBinaryData(binaryData, executionId!, filePath, mimeType),
setBinaryDataBuffer: async (data, binaryData) =>

View file

@ -1,4 +1,4 @@
import { IExecuteFunctions } from 'n8n-core';
import { BINARY_ENCODING, IExecuteFunctions } from 'n8n-core';
import {
IDataObject,
@ -13,6 +13,9 @@ import {
import { googleApiRequest, googleApiRequestAllItems } from './GenericFunctions';
import { v4 as uuid } from 'uuid';
import type { Readable } from 'stream';
const UPLOAD_CHUNK_SIZE = 256 * 1024;
interface GoogleDriveFilesItem {
id: string;
@ -2306,6 +2309,7 @@ export class GoogleDrive implements INodeType {
const downloadOptions = this.getNodeParameter('options', i);
const requestOptions = {
useStream: true,
resolveWithFullResponse: true,
encoding: null,
json: false,
@ -2316,7 +2320,7 @@ export class GoogleDrive implements INodeType {
'GET',
`/drive/v3/files/${fileId}`,
{},
{ fields: 'mimeType', supportsTeamDrives: true },
{ fields: 'mimeType,name', supportsTeamDrives: true },
);
let response;
@ -2370,15 +2374,8 @@ export class GoogleDrive implements INodeType {
);
}
let mimeType: string | undefined;
let fileName: string | undefined = undefined;
if (response.headers['content-type']) {
mimeType = response.headers['content-type'];
}
if (downloadOptions.fileName) {
fileName = downloadOptions.fileName as string;
}
const mimeType = file.mimeType ?? response.headers['content-type'] ?? undefined;
const fileName = downloadOptions.fileName ?? file.name ?? undefined;
const newItem: INodeExecutionData = {
json: items[i].json,
@ -2400,10 +2397,8 @@ export class GoogleDrive implements INodeType {
i,
) as string;
const data = Buffer.from(response.body as string);
items[i].binary![dataPropertyNameDownload] = await this.helpers.prepareBinaryData(
data as unknown as Buffer,
response.body as unknown as Readable,
fileName,
mimeType,
);
@ -2511,9 +2506,11 @@ export class GoogleDrive implements INodeType {
// ----------------------------------
const resolveData = this.getNodeParameter('resolveData', 0);
let mimeType = 'text/plain';
let body;
let contentLength: number;
let fileContent: Buffer | Readable;
let originalFilename: string | undefined;
let mimeType = 'text/plain';
if (this.getNodeParameter('binaryData', i)) {
// Is binary file to upload
const item = items[i];
@ -2526,7 +2523,8 @@ export class GoogleDrive implements INodeType {
const propertyNameUpload = this.getNodeParameter('binaryPropertyName', i) as string;
if (item.binary[propertyNameUpload] === undefined) {
const binary = item.binary[propertyNameUpload];
if (binary === undefined) {
throw new NodeOperationError(
this.getNode(),
`No binary data property "${propertyNameUpload}" does not exists on item!`,
@ -2534,48 +2532,86 @@ export class GoogleDrive implements INodeType {
);
}
if (item.binary[propertyNameUpload].mimeType) {
mimeType = item.binary[propertyNameUpload].mimeType;
if (binary.id) {
// Stream data in 256KB chunks, and upload the via the resumable upload api
fileContent = this.helpers.getBinaryStream(binary.id, UPLOAD_CHUNK_SIZE);
const metadata = await this.helpers.getBinaryMetadata(binary.id);
contentLength = metadata.fileSize;
originalFilename = metadata.fileName;
if (metadata.mimeType) mimeType = binary.mimeType;
} else {
fileContent = Buffer.from(binary.data, BINARY_ENCODING);
contentLength = fileContent.length;
originalFilename = binary.fileName;
mimeType = binary.mimeType;
}
if (item.binary[propertyNameUpload].fileName) {
originalFilename = item.binary[propertyNameUpload].fileName;
}
body = await this.helpers.getBinaryDataBuffer(i, propertyNameUpload);
} else {
// Is text file
body = Buffer.from(this.getNodeParameter('fileContent', i) as string, 'utf8');
fileContent = Buffer.from(this.getNodeParameter('fileContent', i) as string, 'utf8');
contentLength = fileContent.byteLength;
}
const name = this.getNodeParameter('name', i) as string;
const parents = this.getNodeParameter('parents', i) as string[];
let qs: IDataObject = {
fields: queryFields,
uploadType: 'media',
};
let uploadId;
if (Buffer.isBuffer(fileContent)) {
const response = await googleApiRequest.call(
this,
'POST',
'/upload/drive/v3/files',
fileContent,
{
fields: queryFields,
uploadType: 'media',
},
undefined,
{
headers: {
'Content-Type': mimeType,
'Content-Length': contentLength,
},
encoding: null,
json: false,
},
);
uploadId = JSON.parse(response).id;
} else {
const resumableUpload = await googleApiRequest.call(
this,
'POST',
'/upload/drive/v3/files',
undefined,
{ uploadType: 'resumable' },
undefined,
{
resolveWithFullResponse: true,
},
);
const uploadUrl = resumableUpload.headers.location;
const requestOptions = {
headers: {
'Content-Type': mimeType,
'Content-Length': body.byteLength,
},
encoding: null,
json: false,
};
let offset = 0;
for await (const chunk of fileContent) {
const nextOffset = offset + chunk.length;
try {
const response = await this.helpers.httpRequest({
method: 'PUT',
url: uploadUrl,
headers: {
'Content-Length': chunk.length,
'Content-Range': `bytes ${offset}-${nextOffset - 1}/${contentLength}`,
},
body: chunk,
});
uploadId = response.id;
} catch (error) {
if (error.response?.status !== 308) throw error;
}
offset = nextOffset;
}
}
let response = await googleApiRequest.call(
this,
'POST',
'/upload/drive/v3/files',
body,
qs,
undefined,
requestOptions,
);
body = {
const requestBody = {
mimeType,
name,
originalFilename,
@ -2588,7 +2624,7 @@ export class GoogleDrive implements INodeType {
) as IDataObject[];
if (properties.length) {
Object.assign(body, {
Object.assign(requestBody, {
properties: properties.reduce(
(obj, value) => Object.assign(obj, { [`${value.key}`]: value.value }),
{},
@ -2603,7 +2639,7 @@ export class GoogleDrive implements INodeType {
) as IDataObject[];
if (properties.length) {
Object.assign(body, {
Object.assign(requestBody, {
appProperties: appProperties.reduce(
(obj, value) => Object.assign(obj, { [`${value.key}`]: value.value }),
{},
@ -2611,18 +2647,16 @@ export class GoogleDrive implements INodeType {
});
}
qs = {
addParents: parents.join(','),
// When set to true shared drives can be used.
supportsAllDrives: true,
};
response = await googleApiRequest.call(
let response = await googleApiRequest.call(
this,
'PATCH',
`/drive/v3/files/${JSON.parse(response).id}`,
body,
qs,
`/drive/v3/files/${uploadId}`,
requestBody,
{
addParents: parents.join(','),
// When set to true shared drives can be used.
supportsAllDrives: true,
},
);
if (resolveData) {

View file

@ -40,10 +40,16 @@ export interface IBinaryData {
fileName?: string;
directory?: string;
fileExtension?: string;
fileSize?: string;
fileSize?: string; // TODO: change this to number and store the actual value
id?: string;
}
export interface BinaryMetadata {
fileName?: string;
mimeType?: string;
fileSize: number;
}
// All properties in this interface except for
// "includeCredentialsOnRefreshOnBody" will get
// removed once we add the OAuth2 hooks to the
@ -641,6 +647,9 @@ export interface BinaryHelperFunctions {
): Promise<IBinaryData>;
setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData>;
copyBinaryFile(filePath: string, fileName: string, mimeType?: string): Promise<IBinaryData>;
getBinaryStream(binaryDataId: string, chunkSize?: number): Readable;
getBinaryMetadata(binaryDataId: string): Promise<BinaryMetadata>;
}
export interface RequestHelperFunctions {
@ -721,7 +730,6 @@ export type IExecuteFunctions = ExecuteFunctions.GetNodeParameterFn &
inputData: INodeExecutionData[],
options: { itemData: IPairedItemData | IPairedItemData[] },
): NodeExecutionWithMetadata[];
getBinaryDataBuffer(itemIndex: number, propertyName: string): Promise<Buffer>;
};
};