From 770230fbfe0b9e86527254e201c4602fbced94ff Mon Sep 17 00:00:00 2001 From: Elias Meire Date: Wed, 13 Nov 2024 11:44:16 +0100 Subject: [PATCH] fix(Google Drive Node): Fix file upload for streams (#11698) --- .../Drive/test/v2/node/file/upload.test.ts | 72 +++++++++++++++++-- .../Google/Drive/test/v2/node/helpers.ts | 23 ++++++ .../Drive/v2/actions/file/upload.operation.ts | 13 ++-- .../nodes/Google/Drive/v2/helpers/utils.ts | 26 +++++++ 4 files changed, 122 insertions(+), 12 deletions(-) diff --git a/packages/nodes-base/nodes/Google/Drive/test/v2/node/file/upload.test.ts b/packages/nodes-base/nodes/Google/Drive/test/v2/node/file/upload.test.ts index 1eadf887fd..80446db6ce 100644 --- a/packages/nodes-base/nodes/Google/Drive/test/v2/node/file/upload.test.ts +++ b/packages/nodes-base/nodes/Google/Drive/test/v2/node/file/upload.test.ts @@ -6,7 +6,7 @@ import * as upload from '../../../../v2/actions/file/upload.operation'; import * as transport from '../../../../v2/transport'; import * as utils from '../../../../v2/helpers/utils'; -import { createMockExecuteFunction, driveNode } from '../helpers'; +import { createMockExecuteFunction, createTestStream, driveNode } from '../helpers'; jest.mock('../../../../v2/transport', () => { const originalModule = jest.requireActual('../../../../v2/transport'); @@ -30,7 +30,7 @@ jest.mock('../../../../v2/helpers/utils', () => { getItemBinaryData: jest.fn(async function () { return { contentLength: '123', - fileContent: 'Hello Drive!', + fileContent: Buffer.from('Hello Drive!'), originalFilename: 'original.txt', mimeType: 'text/plain', }; @@ -43,13 +43,17 @@ describe('test GoogleDriveV2: file upload', () => { nock.disableNetConnect(); }); + beforeEach(() => { + jest.clearAllMocks(); + }); + afterAll(() => { nock.restore(); jest.unmock('../../../../v2/transport'); jest.unmock('../../../../v2/helpers/utils'); }); - it('should be called with', async () => { + it('should upload buffers', async () => { const nodeParameters = { name: 'newFile.txt', folderId: { @@ -73,10 +77,10 @@ describe('test GoogleDriveV2: file upload', () => { expect(transport.googleApiRequest).toHaveBeenCalledWith( 'POST', '/upload/drive/v3/files', + expect.any(Buffer), + { uploadType: 'media' }, undefined, - { uploadType: 'resumable' }, - undefined, - { returnFullResponse: true }, + { headers: { 'Content-Length': '123', 'Content-Type': 'text/plain' } }, ); expect(transport.googleApiRequest).toHaveBeenCalledWith( 'PATCH', @@ -94,4 +98,60 @@ describe('test GoogleDriveV2: file upload', () => { expect(utils.getItemBinaryData).toBeCalledTimes(1); expect(utils.getItemBinaryData).toHaveBeenCalled(); }); + + it('should stream large files in 2MB chunks', async () => { + const nodeParameters = { + name: 'newFile.jpg', + folderId: { + __rl: true, + value: 'folderIDxxxxxx', + mode: 'list', + cachedResultName: 'testFolder 3', + cachedResultUrl: 'https://drive.google.com/drive/folders/folderIDxxxxxx', + }, + options: { + simplifyOutput: true, + }, + }; + + const fakeExecuteFunction = createMockExecuteFunction(nodeParameters, driveNode); + const httpRequestSpy = jest.spyOn(fakeExecuteFunction.helpers, 'httpRequest'); + + const fileSize = 7 * 1024 * 1024; // 7MB + jest.mocked(utils.getItemBinaryData).mockResolvedValue({ + mimeType: 'image/jpg', + originalFilename: 'test.jpg', + contentLength: fileSize, + fileContent: createTestStream(fileSize), + }); + + await upload.execute.call(fakeExecuteFunction, 0); + + // 4 chunks: 7MB = 3x2MB + 1x1MB + expect(httpRequestSpy).toHaveBeenCalledTimes(4); + expect(httpRequestSpy).toHaveBeenCalledWith( + expect.objectContaining({ body: expect.any(Buffer) }), + ); + expect(transport.googleApiRequest).toBeCalledTimes(2); + expect(transport.googleApiRequest).toHaveBeenCalledWith( + 'POST', + '/upload/drive/v3/files', + undefined, + { uploadType: 'resumable' }, + undefined, + { returnFullResponse: true }, + ); + expect(transport.googleApiRequest).toHaveBeenCalledWith( + 'PATCH', + '/drive/v3/files/undefined', + { mimeType: 'image/jpg', name: 'newFile.jpg', originalFilename: 'test.jpg' }, + { + addParents: 'folderIDxxxxxx', + supportsAllDrives: true, + corpora: 'allDrives', + includeItemsFromAllDrives: true, + spaces: 'appDataFolder, drive', + }, + ); + }); }); diff --git a/packages/nodes-base/nodes/Google/Drive/test/v2/node/helpers.ts b/packages/nodes-base/nodes/Google/Drive/test/v2/node/helpers.ts index b04a33e2e2..522f28afdb 100644 --- a/packages/nodes-base/nodes/Google/Drive/test/v2/node/helpers.ts +++ b/packages/nodes-base/nodes/Google/Drive/test/v2/node/helpers.ts @@ -2,6 +2,7 @@ import type { IDataObject, IExecuteFunctions, IGetNodeParameterOptions, INode } import { get } from 'lodash'; import { constructExecutionMetaData, returnJsonArray } from 'n8n-core'; +import { Readable } from 'stream'; export const driveNode: INode = { id: '11', @@ -40,3 +41,25 @@ export const createMockExecuteFunction = ( } as unknown as IExecuteFunctions; return fakeExecuteFunction; }; + +export function createTestStream(byteSize: number) { + let bytesSent = 0; + const CHUNK_SIZE = 64 * 1024; // 64kB chunks (default NodeJS highWaterMark) + + return new Readable({ + read() { + const remainingBytes = byteSize - bytesSent; + + if (remainingBytes <= 0) { + this.push(null); + return; + } + + const chunkSize = Math.min(CHUNK_SIZE, remainingBytes); + const chunk = Buffer.alloc(chunkSize, 'A'); // Test data just a string of "A" + + bytesSent += chunkSize; + this.push(chunk); + }, + }); +} diff --git a/packages/nodes-base/nodes/Google/Drive/v2/actions/file/upload.operation.ts b/packages/nodes-base/nodes/Google/Drive/v2/actions/file/upload.operation.ts index da4ca39016..f81dc3478e 100644 --- a/packages/nodes-base/nodes/Google/Drive/v2/actions/file/upload.operation.ts +++ b/packages/nodes-base/nodes/Google/Drive/v2/actions/file/upload.operation.ts @@ -12,6 +12,7 @@ import { setFileProperties, setUpdateCommonParams, setParentFolder, + processInChunks, } from '../../helpers/utils'; import { updateDisplayOptions } from '@utils/utilities'; @@ -129,16 +130,17 @@ export async function execute(this: IExecuteFunctions, i: number): Promise { try { const response = await this.helpers.httpRequest({ method: 'PUT', url: uploadUrl, headers: { 'Content-Length': chunk.length, - 'Content-Range': `bytes ${offset}-${nextOffset - 1}/${contentLength}`, + 'Content-Range': `bytes ${offset}-${offset + chunk.byteLength - 1}/${contentLength}`, }, body: chunk, }); @@ -146,8 +148,7 @@ export async function execute(this: IExecuteFunctions, i: number): Promise void | Promise, +) { + let buffer = Buffer.alloc(0); + let offset = 0; + + for await (const chunk of stream) { + buffer = Buffer.concat([buffer, chunk]); + + while (buffer.length >= chunkSize) { + const chunkToProcess = buffer.subarray(0, chunkSize); + await process(chunkToProcess, offset); + + buffer = buffer.subarray(chunkSize); + offset += chunkSize; + } + } + + // Process last chunk, could be smaller than chunkSize + if (buffer.length > 0) { + await process(buffer, offset); + } +}