feat(HTTP Request Node): Move from Binary Buffer to Binary streaming (#5610)

*  Change from binary buffer to binary streaming

* Remove console.logs

* Import Readable from the correct lib

* stream response

* parametersToKeyValue doesn't need to be async anymore

* Fix bodyParameter reduce method

* parametersToKeyValue doesn't need to be async anymore

* handle streaming responses

* send `Content-Length` and `Content-Type` on binary requests

* Add new helper function for binary data

* Add binary function to helpers interface

* Fix bug in error handler

* Fix issue with wrongfully assigned headers to body

* Fix test workflow

* Remove console.logs

* Remove unnecsessary type

* Remove concat dependency already imported in workflow package

* Update pnpm-lock file

* Small fixes, asyncronous error message

* reset the lockfile

* Remove buffer check and simplify error handling

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
This commit is contained in:
agobrech 2023-03-09 15:38:54 +01:00 committed by GitHub
parent a9c63f980f
commit ce0d9d2bed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 33 deletions

View file

@ -108,6 +108,7 @@ import type { IResponseError, IWorkflowSettings } from './Interfaces';
import { extractValue } from './ExtractValue'; import { extractValue } from './ExtractValue';
import { getClientCredentialsToken } from './OAuth2Helper'; import { getClientCredentialsToken } from './OAuth2Helper';
import { PLACEHOLDER_EMPTY_EXECUTION_ID } from './Constants'; import { PLACEHOLDER_EMPTY_EXECUTION_ID } from './Constants';
import { binaryToBuffer } from './BinaryDataManager/utils';
axios.defaults.timeout = 300000; axios.defaults.timeout = 300000;
// Prevent axios from adding x-form-www-urlencoded headers by default // Prevent axios from adding x-form-www-urlencoded headers by default
@ -686,9 +687,7 @@ async function proxyRequestToAxios(
if (response) { if (response) {
Logger.debug('Request proxied to Axios failed', { status: response.status }); Logger.debug('Request proxied to Axios failed', { status: response.status });
let responseData = response.data; let responseData = response.data;
if (Buffer.isBuffer(responseData)) { responseData = await binaryToBuffer(responseData);
responseData = responseData.toString('utf-8');
}
error.message = `${response.status as number} - ${JSON.stringify(responseData)}`; error.message = `${response.status as number} - ${JSON.stringify(responseData)}`;
} }
@ -2055,6 +2054,7 @@ const getBinaryHelperFunctions = ({
}: IWorkflowExecuteAdditionalData): BinaryHelperFunctions => ({ }: IWorkflowExecuteAdditionalData): BinaryHelperFunctions => ({
getBinaryStream, getBinaryStream,
getBinaryMetadata, getBinaryMetadata,
binaryToBuffer,
prepareBinaryData: async (binaryData, filePath, mimeType) => prepareBinaryData: async (binaryData, filePath, mimeType) =>
prepareBinaryData(binaryData, executionId!, filePath, mimeType), prepareBinaryData(binaryData, executionId!, filePath, mimeType),
setBinaryDataBuffer: async (data, binaryData) => setBinaryDataBuffer: async (data, binaryData) =>
@ -2313,6 +2313,7 @@ export function getExecuteFunctions(
return dataProxy.getDataProxy(); return dataProxy.getDataProxy();
}, },
prepareOutputData: NodeHelpers.prepareOutputData, prepareOutputData: NodeHelpers.prepareOutputData,
binaryToBuffer,
async putExecutionToWait(waitTill: Date): Promise<void> { async putExecutionToWait(waitTill: Date): Promise<void> {
runExecutionData.waitTill = waitTill; runExecutionData.waitTill = waitTill;
if (additionalData.setExecutionStatus) { if (additionalData.setExecutionStatus) {

View file

@ -1,3 +1,6 @@
import type { Readable } from 'stream';
import { BINARY_ENCODING } from 'n8n-core';
import type { import type {
IBinaryKeyData, IBinaryKeyData,
IDataObject, IDataObject,
@ -931,7 +934,8 @@ export class HttpRequestV3 implements INodeType {
} catch {} } catch {}
} }
let requestOptions: OptionsWithUri = { type RequestOptions = OptionsWithUri & { useStream?: boolean };
let requestOptions: RequestOptions = {
uri: '', uri: '',
}; };
@ -1060,18 +1064,23 @@ export class HttpRequestV3 implements INodeType {
}); });
} }
const parametersToKeyValue = async ( const parametersToKeyValue = (
acc: Promise<{ [key: string]: any }>, accumulator: { [key: string]: any },
cur: { name: string; value: string; parameterType?: string; inputDataFieldName?: string }, cur: { name: string; value: string; parameterType?: string; inputDataFieldName?: string },
) => { ) => {
const accumulator = await acc;
if (cur.parameterType === 'formBinaryData') { if (cur.parameterType === 'formBinaryData') {
if (!cur.inputDataFieldName) return accumulator; if (!cur.inputDataFieldName) return accumulator;
const binaryData = this.helpers.assertBinaryData(itemIndex, cur.inputDataFieldName); const binaryData = this.helpers.assertBinaryData(itemIndex, cur.inputDataFieldName);
const buffer = await this.helpers.getBinaryDataBuffer(itemIndex, cur.inputDataFieldName); let uploadData: Buffer | Readable;
const itemBinaryData = items[itemIndex].binary![cur.inputDataFieldName];
if (itemBinaryData.id) {
uploadData = this.helpers.getBinaryStream(itemBinaryData.id);
} else {
uploadData = Buffer.from(itemBinaryData.data, BINARY_ENCODING);
}
accumulator[cur.name] = { accumulator[cur.name] = {
value: buffer, value: uploadData,
options: { options: {
filename: binaryData.fileName, filename: binaryData.fileName,
contentType: binaryData.mimeType, contentType: binaryData.mimeType,
@ -1086,10 +1095,7 @@ export class HttpRequestV3 implements INodeType {
// Get parameters defined in the UI // Get parameters defined in the UI
if (sendBody && bodyParameters) { if (sendBody && bodyParameters) {
if (specifyBody === 'keypair' || bodyContentType === 'multipart-form-data') { if (specifyBody === 'keypair' || bodyContentType === 'multipart-form-data') {
requestOptions.body = await bodyParameters.reduce( requestOptions.body = bodyParameters.reduce(parametersToKeyValue, {});
parametersToKeyValue,
Promise.resolve({}),
);
} else if (specifyBody === 'json') { } else if (specifyBody === 'json') {
// body is specified using JSON // body is specified using JSON
if (typeof jsonBodyParameter !== 'object' && jsonBodyParameter !== null) { if (typeof jsonBodyParameter !== 'object' && jsonBodyParameter !== null) {
@ -1128,11 +1134,26 @@ export class HttpRequestV3 implements INodeType {
'inputDataFieldName', 'inputDataFieldName',
itemIndex, itemIndex,
) as string; ) as string;
this.helpers.assertBinaryData(itemIndex, inputDataFieldName);
requestOptions.body = await this.helpers.getBinaryDataBuffer( let uploadData: Buffer | Readable;
itemIndex, let contentLength: number;
inputDataFieldName,
); const itemBinaryData = this.helpers.assertBinaryData(itemIndex, inputDataFieldName);
if (itemBinaryData.id) {
uploadData = this.helpers.getBinaryStream(itemBinaryData.id);
const metadata = await this.helpers.getBinaryMetadata(itemBinaryData.id);
contentLength = metadata.fileSize;
} else {
uploadData = Buffer.from(itemBinaryData.data, BINARY_ENCODING);
contentLength = uploadData.length;
}
requestOptions.body = uploadData;
requestOptions.headers = {
...requestOptions.headers,
'Content-Length': contentLength,
'Content-Type': itemBinaryData.mimeType ?? 'application/octet-stream',
};
} else if (bodyContentType === 'raw') { } else if (bodyContentType === 'raw') {
requestOptions.body = body; requestOptions.body = body;
} }
@ -1141,10 +1162,7 @@ export class HttpRequestV3 implements INodeType {
// Get parameters defined in the UI // Get parameters defined in the UI
if (sendQuery && queryParameters) { if (sendQuery && queryParameters) {
if (specifyQuery === 'keypair') { if (specifyQuery === 'keypair') {
requestOptions.qs = await queryParameters.reduce( requestOptions.qs = queryParameters.reduce(parametersToKeyValue, {});
parametersToKeyValue,
Promise.resolve({}),
);
} else if (specifyQuery === 'json') { } else if (specifyQuery === 'json') {
// query is specified using JSON // query is specified using JSON
try { try {
@ -1166,10 +1184,7 @@ export class HttpRequestV3 implements INodeType {
// Get parameters defined in the UI // Get parameters defined in the UI
if (sendHeaders && headerParameters) { if (sendHeaders && headerParameters) {
if (specifyHeaders === 'keypair') { if (specifyHeaders === 'keypair') {
requestOptions.headers = await headerParameters.reduce( requestOptions.headers = headerParameters.reduce(parametersToKeyValue, {});
parametersToKeyValue,
Promise.resolve({}),
);
} else if (specifyHeaders === 'json') { } else if (specifyHeaders === 'json') {
// body is specified using JSON // body is specified using JSON
try { try {
@ -1191,8 +1206,10 @@ export class HttpRequestV3 implements INodeType {
if (autoDetectResponseFormat || responseFormat === 'file') { if (autoDetectResponseFormat || responseFormat === 'file') {
requestOptions.encoding = null; requestOptions.encoding = null;
requestOptions.json = false; requestOptions.json = false;
requestOptions.useStream = true;
} else if (bodyContentType === 'raw') { } else if (bodyContentType === 'raw') {
requestOptions.json = false; requestOptions.json = false;
requestOptions.useStream = true;
} else { } else {
requestOptions.json = true; requestOptions.json = true;
} }
@ -1284,13 +1301,11 @@ export class HttpRequestV3 implements INodeType {
requestPromises.push(requestWithAuthentication); requestPromises.push(requestWithAuthentication);
} }
} }
const promisesResponses = await Promise.allSettled(requestPromises); const promisesResponses = await Promise.allSettled(requestPromises);
let response: any; let response: any;
for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
response = promisesResponses.shift(); response = promisesResponses.shift();
if (response!.status !== 'fulfilled') { if (response!.status !== 'fulfilled') {
if (!this.continueOnFail()) { if (!this.continueOnFail()) {
if (autoDetectResponseFormat && response.reason.error instanceof Buffer) { if (autoDetectResponseFormat && response.reason.error instanceof Buffer) {
@ -1336,7 +1351,10 @@ export class HttpRequestV3 implements INodeType {
0, 0,
false, false,
) as boolean; ) as boolean;
const data = Buffer.from(response.body as Buffer).toString();
const data = await this.helpers
.binaryToBuffer(response.body as Buffer | Readable)
.then((body) => body.toString());
response.body = jsonParse(data, { response.body = jsonParse(data, {
...(neverError ...(neverError
? { fallbackValue: {} } ? { fallbackValue: {} }
@ -1346,7 +1364,9 @@ export class HttpRequestV3 implements INodeType {
responseFormat = 'file'; responseFormat = 'file';
} else { } else {
responseFormat = 'text'; responseFormat = 'text';
const data = Buffer.from(response.body as Buffer).toString(); const data = await this.helpers
.binaryToBuffer(response.body as Buffer | Readable)
.then((body) => body.toString());
response.body = !data ? undefined : data; response.body = !data ? undefined : data;
} }
} }
@ -1395,14 +1415,14 @@ export class HttpRequestV3 implements INodeType {
newItem.json = returnItem; newItem.json = returnItem;
newItem.binary![outputPropertyName] = await this.helpers.prepareBinaryData( newItem.binary![outputPropertyName] = await this.helpers.prepareBinaryData(
response!.body as Buffer, response!.body as Buffer | Readable,
fileName, fileName,
); );
} else { } else {
newItem.json = items[itemIndex].json; newItem.json = items[itemIndex].json;
newItem.binary![outputPropertyName] = await this.helpers.prepareBinaryData( newItem.binary![outputPropertyName] = await this.helpers.prepareBinaryData(
response! as Buffer, response! as Buffer | Readable,
fileName, fileName,
); );
} }

View file

@ -673,7 +673,7 @@ export interface BinaryHelperFunctions {
): Promise<IBinaryData>; ): Promise<IBinaryData>;
setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData>; setBinaryDataBuffer(data: IBinaryData, binaryData: Buffer): Promise<IBinaryData>;
copyBinaryFile(filePath: string, fileName: string, mimeType?: string): Promise<IBinaryData>; copyBinaryFile(filePath: string, fileName: string, mimeType?: string): Promise<IBinaryData>;
binaryToBuffer(body: Buffer | Readable): Promise<Buffer>;
getBinaryStream(binaryDataId: string, chunkSize?: number): Readable; getBinaryStream(binaryDataId: string, chunkSize?: number): Readable;
getBinaryMetadata(binaryDataId: string): Promise<BinaryMetadata>; getBinaryMetadata(binaryDataId: string): Promise<BinaryMetadata>;
} }