From ce0d9d2bede7d87b97e18c45b63ea31ecf592255 Mon Sep 17 00:00:00 2001 From: agobrech <45268029+agobrech@users.noreply.github.com> Date: Thu, 9 Mar 2023 15:38:54 +0100 Subject: [PATCH] feat(HTTP Request Node): Move from Binary Buffer to Binary streaming (#5610) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ✨ Change from binary buffer to binary streaming * Remove console.logs * Import Readable from the correct lib * stream response * parametersToKeyValue doesn't need to be async anymore * Fix bodyParameter reduce method * parametersToKeyValue doesn't need to be async anymore * handle streaming responses * send `Content-Length` and `Content-Type` on binary requests * Add new helper function for binary data * Add binary function to helpers interface * Fix bug in error handler * Fix issue with wrongfully assigned headers to body * Fix test workflow * Remove console.logs * Remove unnecsessary type * Remove concat dependency already imported in workflow package * Update pnpm-lock file * Small fixes, asyncronous error message * reset the lockfile * Remove buffer check and simplify error handling --------- Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ --- packages/core/src/NodeExecuteFunctions.ts | 7 +- .../HttpRequest/V3/HttpRequestV3.node.ts | 78 ++++++++++++------- packages/workflow/src/Interfaces.ts | 2 +- 3 files changed, 54 insertions(+), 33 deletions(-) diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 7d6b4648b2..6186c87336 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -108,6 +108,7 @@ import type { IResponseError, IWorkflowSettings } from './Interfaces'; import { extractValue } from './ExtractValue'; import { getClientCredentialsToken } from './OAuth2Helper'; import { PLACEHOLDER_EMPTY_EXECUTION_ID } from './Constants'; +import { binaryToBuffer } from './BinaryDataManager/utils'; axios.defaults.timeout = 300000; // Prevent axios from adding x-form-www-urlencoded headers by default @@ -686,9 +687,7 @@ async function proxyRequestToAxios( if (response) { Logger.debug('Request proxied to Axios failed', { status: response.status }); let responseData = response.data; - if (Buffer.isBuffer(responseData)) { - responseData = responseData.toString('utf-8'); - } + responseData = await binaryToBuffer(responseData); error.message = `${response.status as number} - ${JSON.stringify(responseData)}`; } @@ -2055,6 +2054,7 @@ const getBinaryHelperFunctions = ({ }: IWorkflowExecuteAdditionalData): BinaryHelperFunctions => ({ getBinaryStream, getBinaryMetadata, + binaryToBuffer, prepareBinaryData: async (binaryData, filePath, mimeType) => prepareBinaryData(binaryData, executionId!, filePath, mimeType), setBinaryDataBuffer: async (data, binaryData) => @@ -2313,6 +2313,7 @@ export function getExecuteFunctions( return dataProxy.getDataProxy(); }, prepareOutputData: NodeHelpers.prepareOutputData, + binaryToBuffer, async putExecutionToWait(waitTill: Date): Promise { runExecutionData.waitTill = waitTill; if (additionalData.setExecutionStatus) { diff --git a/packages/nodes-base/nodes/HttpRequest/V3/HttpRequestV3.node.ts b/packages/nodes-base/nodes/HttpRequest/V3/HttpRequestV3.node.ts index 355a98634d..498a1097e6 100644 --- a/packages/nodes-base/nodes/HttpRequest/V3/HttpRequestV3.node.ts +++ b/packages/nodes-base/nodes/HttpRequest/V3/HttpRequestV3.node.ts @@ -1,3 +1,6 @@ +import type { Readable } from 'stream'; +import { BINARY_ENCODING } from 'n8n-core'; + import type { IBinaryKeyData, IDataObject, @@ -931,7 +934,8 @@ export class HttpRequestV3 implements INodeType { } catch {} } - let requestOptions: OptionsWithUri = { + type RequestOptions = OptionsWithUri & { useStream?: boolean }; + let requestOptions: RequestOptions = { uri: '', }; @@ -1060,18 +1064,23 @@ export class HttpRequestV3 implements INodeType { }); } - const parametersToKeyValue = async ( - acc: Promise<{ [key: string]: any }>, + const parametersToKeyValue = ( + accumulator: { [key: string]: any }, cur: { name: string; value: string; parameterType?: string; inputDataFieldName?: string }, ) => { - const accumulator = await acc; if (cur.parameterType === 'formBinaryData') { if (!cur.inputDataFieldName) return accumulator; const binaryData = this.helpers.assertBinaryData(itemIndex, cur.inputDataFieldName); - const buffer = await this.helpers.getBinaryDataBuffer(itemIndex, cur.inputDataFieldName); + let uploadData: Buffer | Readable; + const itemBinaryData = items[itemIndex].binary![cur.inputDataFieldName]; + if (itemBinaryData.id) { + uploadData = this.helpers.getBinaryStream(itemBinaryData.id); + } else { + uploadData = Buffer.from(itemBinaryData.data, BINARY_ENCODING); + } accumulator[cur.name] = { - value: buffer, + value: uploadData, options: { filename: binaryData.fileName, contentType: binaryData.mimeType, @@ -1086,10 +1095,7 @@ export class HttpRequestV3 implements INodeType { // Get parameters defined in the UI if (sendBody && bodyParameters) { if (specifyBody === 'keypair' || bodyContentType === 'multipart-form-data') { - requestOptions.body = await bodyParameters.reduce( - parametersToKeyValue, - Promise.resolve({}), - ); + requestOptions.body = bodyParameters.reduce(parametersToKeyValue, {}); } else if (specifyBody === 'json') { // body is specified using JSON if (typeof jsonBodyParameter !== 'object' && jsonBodyParameter !== null) { @@ -1128,11 +1134,26 @@ export class HttpRequestV3 implements INodeType { 'inputDataFieldName', itemIndex, ) as string; - this.helpers.assertBinaryData(itemIndex, inputDataFieldName); - requestOptions.body = await this.helpers.getBinaryDataBuffer( - itemIndex, - inputDataFieldName, - ); + + let uploadData: Buffer | Readable; + let contentLength: number; + + const itemBinaryData = this.helpers.assertBinaryData(itemIndex, inputDataFieldName); + + if (itemBinaryData.id) { + uploadData = this.helpers.getBinaryStream(itemBinaryData.id); + const metadata = await this.helpers.getBinaryMetadata(itemBinaryData.id); + contentLength = metadata.fileSize; + } else { + uploadData = Buffer.from(itemBinaryData.data, BINARY_ENCODING); + contentLength = uploadData.length; + } + requestOptions.body = uploadData; + requestOptions.headers = { + ...requestOptions.headers, + 'Content-Length': contentLength, + 'Content-Type': itemBinaryData.mimeType ?? 'application/octet-stream', + }; } else if (bodyContentType === 'raw') { requestOptions.body = body; } @@ -1141,10 +1162,7 @@ export class HttpRequestV3 implements INodeType { // Get parameters defined in the UI if (sendQuery && queryParameters) { if (specifyQuery === 'keypair') { - requestOptions.qs = await queryParameters.reduce( - parametersToKeyValue, - Promise.resolve({}), - ); + requestOptions.qs = queryParameters.reduce(parametersToKeyValue, {}); } else if (specifyQuery === 'json') { // query is specified using JSON try { @@ -1166,10 +1184,7 @@ export class HttpRequestV3 implements INodeType { // Get parameters defined in the UI if (sendHeaders && headerParameters) { if (specifyHeaders === 'keypair') { - requestOptions.headers = await headerParameters.reduce( - parametersToKeyValue, - Promise.resolve({}), - ); + requestOptions.headers = headerParameters.reduce(parametersToKeyValue, {}); } else if (specifyHeaders === 'json') { // body is specified using JSON try { @@ -1191,8 +1206,10 @@ export class HttpRequestV3 implements INodeType { if (autoDetectResponseFormat || responseFormat === 'file') { requestOptions.encoding = null; requestOptions.json = false; + requestOptions.useStream = true; } else if (bodyContentType === 'raw') { requestOptions.json = false; + requestOptions.useStream = true; } else { requestOptions.json = true; } @@ -1284,13 +1301,11 @@ export class HttpRequestV3 implements INodeType { requestPromises.push(requestWithAuthentication); } } - const promisesResponses = await Promise.allSettled(requestPromises); let response: any; for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { response = promisesResponses.shift(); - if (response!.status !== 'fulfilled') { if (!this.continueOnFail()) { if (autoDetectResponseFormat && response.reason.error instanceof Buffer) { @@ -1336,7 +1351,10 @@ export class HttpRequestV3 implements INodeType { 0, false, ) as boolean; - const data = Buffer.from(response.body as Buffer).toString(); + + const data = await this.helpers + .binaryToBuffer(response.body as Buffer | Readable) + .then((body) => body.toString()); response.body = jsonParse(data, { ...(neverError ? { fallbackValue: {} } @@ -1346,7 +1364,9 @@ export class HttpRequestV3 implements INodeType { responseFormat = 'file'; } else { responseFormat = 'text'; - const data = Buffer.from(response.body as Buffer).toString(); + const data = await this.helpers + .binaryToBuffer(response.body as Buffer | Readable) + .then((body) => body.toString()); response.body = !data ? undefined : data; } } @@ -1395,14 +1415,14 @@ export class HttpRequestV3 implements INodeType { newItem.json = returnItem; newItem.binary![outputPropertyName] = await this.helpers.prepareBinaryData( - response!.body as Buffer, + response!.body as Buffer | Readable, fileName, ); } else { newItem.json = items[itemIndex].json; newItem.binary![outputPropertyName] = await this.helpers.prepareBinaryData( - response! as Buffer, + response! as Buffer | Readable, fileName, ); } diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 0e317075d3..9b8a256bf9 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -673,7 +673,7 @@ export interface BinaryHelperFunctions { ): Promise; setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise; copyBinaryFile(filePath: string, fileName: string, mimeType?: string): Promise; - + binaryToBuffer(body: Buffer | Readable): Promise; getBinaryStream(binaryDataId: string, chunkSize?: number): Readable; getBinaryMetadata(binaryDataId: string): Promise; }