feat(Respond to Webhook Node): Move from Binary Buffer to Binary streaming (#5613)

* replace binary buffer with binary streaming

* Add binary assertion and remove duplicate code

* handle streams correctly

* fix binary response in `own` mode

* fix stream response missing headers

---------

Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <netroy@users.noreply.github.com>
Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
Co-authored-by: Marcus <marcus@n8n.io>
This commit is contained in:
agobrech 2023-05-17 10:06:24 +02:00 committed by GitHub
parent 77ac953eaf
commit 8ae2d801d8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 13 deletions

View file

@ -15,10 +15,13 @@
/* eslint-disable prefer-destructuring */ /* eslint-disable prefer-destructuring */
import type express from 'express'; import type express from 'express';
import get from 'lodash.get'; import get from 'lodash.get';
import stream from 'stream';
import { promisify } from 'util';
import { BinaryDataManager, NodeExecuteFunctions, eventEmitter } from 'n8n-core'; import { BinaryDataManager, NodeExecuteFunctions, eventEmitter } from 'n8n-core';
import type { import type {
IBinaryData,
IBinaryKeyData, IBinaryKeyData,
IDataObject, IDataObject,
IDeferredPromise, IDeferredPromise,
@ -59,6 +62,8 @@ import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper'; import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { Container } from 'typedi'; import { Container } from 'typedi';
const pipeline = promisify(stream.pipeline);
export const WEBHOOK_METHODS = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT']; export const WEBHOOK_METHODS = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT'];
/** /**
@ -418,13 +423,17 @@ export async function executeWebhook(
return; return;
} }
if (Buffer.isBuffer(response.body)) { const binaryData = (response.body as IDataObject)?.binaryData as IBinaryData;
if (binaryData?.id) {
res.header(response.headers);
const stream = NodeExecuteFunctions.getBinaryStream(binaryData.id);
void pipeline(stream, res).then(() =>
responseCallback(null, { noWebhookResponse: true }),
);
} else if (Buffer.isBuffer(response.body)) {
res.header(response.headers); res.header(response.headers);
res.end(response.body); res.end(response.body);
responseCallback(null, { noWebhookResponse: true });
responseCallback(null, {
noWebhookResponse: true,
});
} else { } else {
// TODO: This probably needs some more changes depending on the options on the // TODO: This probably needs some more changes depending on the options on the
// Webhook Response node // Webhook Response node

View file

@ -1,3 +1,4 @@
import type { Readable } from 'stream';
import type { import type {
IDataObject, IDataObject,
IExecuteFunctions, IExecuteFunctions,
@ -7,7 +8,7 @@ import type {
INodeType, INodeType,
INodeTypeDescription, INodeTypeDescription,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { jsonParse, NodeOperationError } from 'n8n-workflow'; import { jsonParse, BINARY_ENCODING, NodeOperationError } from 'n8n-workflow';
export class RespondToWebhook implements INodeType { export class RespondToWebhook implements INodeType {
description: INodeTypeDescription = { description: INodeTypeDescription = {
@ -201,7 +202,7 @@ export class RespondToWebhook implements INodeType {
} }
} }
let responseBody: IN8nHttpResponse; let responseBody: IN8nHttpResponse | Readable;
if (respondWith === 'json') { if (respondWith === 'json') {
const responseBodyParameter = this.getNodeParameter('responseBody', 0) as string; const responseBodyParameter = this.getNodeParameter('responseBody', 0) as string;
if (responseBodyParameter) { if (responseBodyParameter) {
@ -235,15 +236,16 @@ export class RespondToWebhook implements INodeType {
} }
const binaryData = this.helpers.assertBinaryData(0, responseBinaryPropertyName); const binaryData = this.helpers.assertBinaryData(0, responseBinaryPropertyName);
const binaryDataBuffer = await this.helpers.getBinaryDataBuffer( if (binaryData.id) {
0, responseBody = { binaryData };
responseBinaryPropertyName, } else {
); responseBody = Buffer.from(binaryData.data, BINARY_ENCODING);
headers['content-length'] = (responseBody as Buffer).length;
}
if (!headers['content-type']) { if (!headers['content-type']) {
headers['content-type'] = binaryData.mimeType; headers['content-type'] = binaryData.mimeType;
} }
responseBody = binaryDataBuffer;
} else if (respondWith !== 'noData') { } else if (respondWith !== 'noData') {
throw new NodeOperationError( throw new NodeOperationError(
this.getNode(), this.getNode(),

View file

@ -527,7 +527,7 @@ export interface IHttpRequestOptions {
export type IN8nHttpResponse = IDataObject | Buffer | GenericValue | GenericValue[] | null; export type IN8nHttpResponse = IDataObject | Buffer | GenericValue | GenericValue[] | null;
export interface IN8nHttpFullResponse { export interface IN8nHttpFullResponse {
body: IN8nHttpResponse; body: IN8nHttpResponse | Readable;
headers: IDataObject; headers: IDataObject;
statusCode: number; statusCode: number;
statusMessage?: string; statusMessage?: string;