fix(Google Drive Node): Fix file upload for streams (#11698)

This commit is contained in:
Elias Meire 2024-11-13 11:44:16 +01:00 committed by GitHub
parent a412ab7ebf
commit 770230fbfe
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 122 additions and 12 deletions

View file

@ -6,7 +6,7 @@ import * as upload from '../../../../v2/actions/file/upload.operation';
import * as transport from '../../../../v2/transport';
import * as utils from '../../../../v2/helpers/utils';
import { createMockExecuteFunction, driveNode } from '../helpers';
import { createMockExecuteFunction, createTestStream, driveNode } from '../helpers';
jest.mock('../../../../v2/transport', () => {
const originalModule = jest.requireActual('../../../../v2/transport');
@ -30,7 +30,7 @@ jest.mock('../../../../v2/helpers/utils', () => {
getItemBinaryData: jest.fn(async function () {
return {
contentLength: '123',
fileContent: 'Hello Drive!',
fileContent: Buffer.from('Hello Drive!'),
originalFilename: 'original.txt',
mimeType: 'text/plain',
};
@ -43,13 +43,17 @@ describe('test GoogleDriveV2: file upload', () => {
nock.disableNetConnect();
});
beforeEach(() => {
jest.clearAllMocks();
});
afterAll(() => {
nock.restore();
jest.unmock('../../../../v2/transport');
jest.unmock('../../../../v2/helpers/utils');
});
it('should be called with', async () => {
it('should upload buffers', async () => {
const nodeParameters = {
name: 'newFile.txt',
folderId: {
@ -73,10 +77,10 @@ describe('test GoogleDriveV2: file upload', () => {
expect(transport.googleApiRequest).toHaveBeenCalledWith(
'POST',
'/upload/drive/v3/files',
expect.any(Buffer),
{ uploadType: 'media' },
undefined,
{ uploadType: 'resumable' },
undefined,
{ returnFullResponse: true },
{ headers: { 'Content-Length': '123', 'Content-Type': 'text/plain' } },
);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
'PATCH',
@ -94,4 +98,60 @@ describe('test GoogleDriveV2: file upload', () => {
expect(utils.getItemBinaryData).toBeCalledTimes(1);
expect(utils.getItemBinaryData).toHaveBeenCalled();
});
it('should stream large files in 2MB chunks', async () => {
const nodeParameters = {
name: 'newFile.jpg',
folderId: {
__rl: true,
value: 'folderIDxxxxxx',
mode: 'list',
cachedResultName: 'testFolder 3',
cachedResultUrl: 'https://drive.google.com/drive/folders/folderIDxxxxxx',
},
options: {
simplifyOutput: true,
},
};
const fakeExecuteFunction = createMockExecuteFunction(nodeParameters, driveNode);
const httpRequestSpy = jest.spyOn(fakeExecuteFunction.helpers, 'httpRequest');
const fileSize = 7 * 1024 * 1024; // 7MB
jest.mocked(utils.getItemBinaryData).mockResolvedValue({
mimeType: 'image/jpg',
originalFilename: 'test.jpg',
contentLength: fileSize,
fileContent: createTestStream(fileSize),
});
await upload.execute.call(fakeExecuteFunction, 0);
// 4 chunks: 7MB = 3x2MB + 1x1MB
expect(httpRequestSpy).toHaveBeenCalledTimes(4);
expect(httpRequestSpy).toHaveBeenCalledWith(
expect.objectContaining({ body: expect.any(Buffer) }),
);
expect(transport.googleApiRequest).toBeCalledTimes(2);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
'POST',
'/upload/drive/v3/files',
undefined,
{ uploadType: 'resumable' },
undefined,
{ returnFullResponse: true },
);
expect(transport.googleApiRequest).toHaveBeenCalledWith(
'PATCH',
'/drive/v3/files/undefined',
{ mimeType: 'image/jpg', name: 'newFile.jpg', originalFilename: 'test.jpg' },
{
addParents: 'folderIDxxxxxx',
supportsAllDrives: true,
corpora: 'allDrives',
includeItemsFromAllDrives: true,
spaces: 'appDataFolder, drive',
},
);
});
});

View file

@ -2,6 +2,7 @@ import type { IDataObject, IExecuteFunctions, IGetNodeParameterOptions, INode }
import { get } from 'lodash';
import { constructExecutionMetaData, returnJsonArray } from 'n8n-core';
import { Readable } from 'stream';
export const driveNode: INode = {
id: '11',
@ -40,3 +41,25 @@ export const createMockExecuteFunction = (
} as unknown as IExecuteFunctions;
return fakeExecuteFunction;
};
export function createTestStream(byteSize: number) {
let bytesSent = 0;
const CHUNK_SIZE = 64 * 1024; // 64kB chunks (default NodeJS highWaterMark)
return new Readable({
read() {
const remainingBytes = byteSize - bytesSent;
if (remainingBytes <= 0) {
this.push(null);
return;
}
const chunkSize = Math.min(CHUNK_SIZE, remainingBytes);
const chunk = Buffer.alloc(chunkSize, 'A'); // Test data just a string of "A"
bytesSent += chunkSize;
this.push(chunk);
},
});
}

View file

@ -12,6 +12,7 @@ import {
setFileProperties,
setUpdateCommonParams,
setParentFolder,
processInChunks,
} from '../../helpers/utils';
import { updateDisplayOptions } from '@utils/utilities';
@ -129,16 +130,17 @@ export async function execute(this: IExecuteFunctions, i: number): Promise<INode
const uploadUrl = resumableUpload.headers.location;
let offset = 0;
for await (const chunk of fileContent) {
const nextOffset = offset + Number(chunk.length);
// 2MB chunks, needs to be a multiple of 256kB for Google Drive API
const chunkSizeBytes = 2048 * 1024;
await processInChunks(fileContent, chunkSizeBytes, async (chunk, offset) => {
try {
const response = await this.helpers.httpRequest({
method: 'PUT',
url: uploadUrl,
headers: {
'Content-Length': chunk.length,
'Content-Range': `bytes ${offset}-${nextOffset - 1}/${contentLength}`,
'Content-Range': `bytes ${offset}-${offset + chunk.byteLength - 1}/${contentLength}`,
},
body: chunk,
});
@ -146,8 +148,7 @@ export async function execute(this: IExecuteFunctions, i: number): Promise<INode
} catch (error) {
if (error.response?.status !== 308) throw error;
}
offset = nextOffset;
}
});
}
const options = this.getNodeParameter('options', i, {});

View file

@ -131,3 +131,29 @@ export function setParentFolder(
return 'root';
}
}
export async function processInChunks(
stream: Readable,
chunkSize: number,
process: (chunk: Buffer, offset: number) => void | Promise<void>,
) {
let buffer = Buffer.alloc(0);
let offset = 0;
for await (const chunk of stream) {
buffer = Buffer.concat([buffer, chunk]);
while (buffer.length >= chunkSize) {
const chunkToProcess = buffer.subarray(0, chunkSize);
await process(chunkToProcess, offset);
buffer = buffer.subarray(chunkSize);
offset += chunkSize;
}
}
// Process last chunk, could be smaller than chunkSize
if (buffer.length > 0) {
await process(buffer, offset);
}
}