refactor(core)!: Make getBinaryStream async (#7247)

Story: [PAY-846](https://linear.app/n8n/issue/PAY-846) | Related:
https://github.com/n8n-io/n8n/pull/7225

For the S3 backend for external storage of binary data and execution
data, the `getAsStream` method in the binary data manager interface used
by FS and S3 will need to become async. This is a breaking change for
nodes-base.
This commit is contained in:
Iván Ovejero 2023-09-25 16:59:45 +02:00 committed by GitHub
parent 484035eb51
commit 75541e91f2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 74 additions and 46 deletions

View file

@ -2,6 +2,23 @@
This list shows all the versions which include breaking changes and how to upgrade. This list shows all the versions which include breaking changes and how to upgrade.
## 1.9.0
### What changed?
In nodes, `this.helpers.getBinaryStream()` is now async.
### When is action necessary?
If your node uses `this.helpers.getBinaryStream()`, add `await` when calling it.
Example:
```typescript
const binaryStream = this.helpers.getBinaryStream(id); // until 1.9.0
const binaryStream = await this.helpers.getBinaryStream(id); // since 1.9.0
```
## 1.5.0 ## 1.5.0
### What changed? ### What changed?

View file

@ -506,7 +506,7 @@ export async function executeWebhook(
responsePromise = await createDeferredPromise<IN8nHttpFullResponse>(); responsePromise = await createDeferredPromise<IN8nHttpFullResponse>();
responsePromise responsePromise
.promise() .promise()
.then((response: IN8nHttpFullResponse) => { .then(async (response: IN8nHttpFullResponse) => {
if (didSendResponse) { if (didSendResponse) {
return; return;
} }
@ -514,10 +514,9 @@ export async function executeWebhook(
const binaryData = (response.body as IDataObject)?.binaryData as IBinaryData; const binaryData = (response.body as IDataObject)?.binaryData as IBinaryData;
if (binaryData?.id) { if (binaryData?.id) {
res.header(response.headers); res.header(response.headers);
const stream = Container.get(BinaryDataService).getAsStream(binaryData.id); const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id);
void pipeline(stream, res).then(() => await pipeline(stream, res);
responseCallback(null, { noWebhookResponse: true }), responseCallback(null, { noWebhookResponse: true });
);
} else if (Buffer.isBuffer(response.body)) { } else if (Buffer.isBuffer(response.body)) {
res.header(response.headers); res.header(response.headers);
res.end(response.body); res.end(response.body);
@ -734,7 +733,7 @@ export async function executeWebhook(
// Send the webhook response manually // Send the webhook response manually
res.setHeader('Content-Type', binaryData.mimeType); res.setHeader('Content-Type', binaryData.mimeType);
if (binaryData.id) { if (binaryData.id) {
const stream = Container.get(BinaryDataService).getAsStream(binaryData.id); const stream = await Container.get(BinaryDataService).getAsStream(binaryData.id);
await pipeline(stream, res); await pipeline(stream, res);
} else { } else {
res.end(Buffer.from(binaryData.data, BINARY_ENCODING)); res.end(Buffer.from(binaryData.data, BINARY_ENCODING));

View file

@ -89,7 +89,7 @@ export class BinaryDataService {
}); });
} }
getAsStream(binaryDataId: string, chunkSize?: number) { async getAsStream(binaryDataId: string, chunkSize?: number) {
const [mode, fileId] = binaryDataId.split(':'); const [mode, fileId] = binaryDataId.split(':');
return this.getManager(mode).getAsStream(fileId, chunkSize); return this.getManager(mode).getAsStream(fileId, chunkSize);

View file

@ -36,7 +36,7 @@ export class FileSystemManager implements BinaryData.Manager {
} }
} }
getAsStream(fileId: string, chunkSize?: number) { async getAsStream(fileId: string, chunkSize?: number) {
const filePath = this.getPath(fileId); const filePath = this.getPath(fileId);
return createReadStream(filePath, { highWaterMark: chunkSize }); return createReadStream(filePath, { highWaterMark: chunkSize });

View file

@ -29,7 +29,7 @@ export namespace BinaryData {
getPath(fileId: string): string; getPath(fileId: string): string;
getAsBuffer(fileId: string): Promise<Buffer>; getAsBuffer(fileId: string): Promise<Buffer>;
getAsStream(fileId: string, chunkSize?: number): Readable; getAsStream(fileId: string, chunkSize?: number): Promise<Readable>;
getMetadata(fileId: string): Promise<Metadata>; getMetadata(fileId: string): Promise<Metadata>;
// @TODO: Refactor to also use `workflowId` to support full path-like identifier: // @TODO: Refactor to also use `workflowId` to support full path-like identifier:

View file

@ -954,7 +954,7 @@ export async function getBinaryMetadata(binaryDataId: string): Promise<BinaryDat
/** /**
* Returns binary file stream for piping * Returns binary file stream for piping
*/ */
export function getBinaryStream(binaryDataId: string, chunkSize?: number): Readable { export async function getBinaryStream(binaryDataId: string, chunkSize?: number): Promise<Readable> {
return Container.get(BinaryDataService).getAsStream(binaryDataId, chunkSize); return Container.get(BinaryDataService).getAsStream(binaryDataId, chunkSize);
} }

View file

@ -870,7 +870,10 @@ export class AwsS3V2 implements INodeType {
let uploadData: Buffer | Readable; let uploadData: Buffer | Readable;
multipartHeaders['Content-Type'] = binaryPropertyData.mimeType; multipartHeaders['Content-Type'] = binaryPropertyData.mimeType;
if (binaryPropertyData.id) { if (binaryPropertyData.id) {
uploadData = this.helpers.getBinaryStream(binaryPropertyData.id, UPLOAD_CHUNK_SIZE); uploadData = await this.helpers.getBinaryStream(
binaryPropertyData.id,
UPLOAD_CHUNK_SIZE,
);
const createMultiPartUpload = await awsApiRequestREST.call( const createMultiPartUpload = await awsApiRequestREST.call(
this, this,
servicePath, servicePath,

View file

@ -486,7 +486,7 @@ export class Crypto implements INodeType {
const binaryPropertyName = this.getNodeParameter('binaryPropertyName', i); const binaryPropertyName = this.getNodeParameter('binaryPropertyName', i);
const binaryData = this.helpers.assertBinaryData(i, binaryPropertyName); const binaryData = this.helpers.assertBinaryData(i, binaryPropertyName);
if (binaryData.id) { if (binaryData.id) {
const binaryStream = this.helpers.getBinaryStream(binaryData.id); const binaryStream = await this.helpers.getBinaryStream(binaryData.id);
hashOrHmac.setEncoding(encoding); hashOrHmac.setEncoding(encoding);
await pipeline(binaryStream, hashOrHmac); await pipeline(binaryStream, hashOrHmac);
newValue = hashOrHmac.read(); newValue = hashOrHmac.read();

View file

@ -649,7 +649,7 @@ export class Ftp implements INodeType {
let uploadData: Buffer | Readable; let uploadData: Buffer | Readable;
if (binaryData.id) { if (binaryData.id) {
uploadData = this.helpers.getBinaryStream(binaryData.id); uploadData = await this.helpers.getBinaryStream(binaryData.id);
} else { } else {
uploadData = Buffer.from(binaryData.data, BINARY_ENCODING); uploadData = Buffer.from(binaryData.data, BINARY_ENCODING);
} }
@ -759,7 +759,7 @@ export class Ftp implements INodeType {
let uploadData: Buffer | Readable; let uploadData: Buffer | Readable;
if (binaryData.id) { if (binaryData.id) {
uploadData = this.helpers.getBinaryStream(binaryData.id); uploadData = await this.helpers.getBinaryStream(binaryData.id);
} else { } else {
uploadData = Buffer.from(binaryData.data, BINARY_ENCODING); uploadData = Buffer.from(binaryData.data, BINARY_ENCODING);
} }

View file

@ -160,7 +160,7 @@ export const objectOperations: INodeProperties[] = [
const binaryData = this.helpers.assertBinaryData(binaryPropertyName); const binaryData = this.helpers.assertBinaryData(binaryPropertyName);
if (binaryData.id) { if (binaryData.id) {
content = this.helpers.getBinaryStream(binaryData.id); content = await this.helpers.getBinaryStream(binaryData.id);
const binaryMetadata = await this.helpers.getBinaryMetadata(binaryData.id); const binaryMetadata = await this.helpers.getBinaryMetadata(binaryData.id);
contentType = binaryMetadata.mimeType ?? 'application/octet-stream'; contentType = binaryMetadata.mimeType ?? 'application/octet-stream';
contentLength = binaryMetadata.fileSize; contentLength = binaryMetadata.fileSize;

View file

@ -2442,7 +2442,7 @@ export class GoogleDriveV1 implements INodeType {
const binaryData = this.helpers.assertBinaryData(i, binaryPropertyName); const binaryData = this.helpers.assertBinaryData(i, binaryPropertyName);
if (binaryData.id) { if (binaryData.id) {
// Stream data in 256KB chunks, and upload the via the resumable upload api // Stream data in 256KB chunks, and upload the via the resumable upload api
fileContent = this.helpers.getBinaryStream(binaryData.id, UPLOAD_CHUNK_SIZE); fileContent = await this.helpers.getBinaryStream(binaryData.id, UPLOAD_CHUNK_SIZE);
const metadata = await this.helpers.getBinaryMetadata(binaryData.id); const metadata = await this.helpers.getBinaryMetadata(binaryData.id);
contentLength = metadata.fileSize; contentLength = metadata.fileSize;
originalFilename = metadata.fileName; originalFilename = metadata.fileName;

View file

@ -40,7 +40,7 @@ export async function getItemBinaryData(
if (binaryData.id) { if (binaryData.id) {
// Stream data in 256KB chunks, and upload the via the resumable upload api // Stream data in 256KB chunks, and upload the via the resumable upload api
fileContent = this.helpers.getBinaryStream(binaryData.id, chunkSize); fileContent = await this.helpers.getBinaryStream(binaryData.id, chunkSize);
const metadata = await this.helpers.getBinaryMetadata(binaryData.id); const metadata = await this.helpers.getBinaryMetadata(binaryData.id);
contentLength = metadata.fileSize; contentLength = metadata.fileSize;
originalFilename = metadata.fileName; originalFilename = metadata.fileName;

View file

@ -838,7 +838,7 @@ export class YouTube implements INodeType {
if (binaryData.id) { if (binaryData.id) {
// Stream data in 256KB chunks, and upload the via the resumable upload api // Stream data in 256KB chunks, and upload the via the resumable upload api
fileContent = this.helpers.getBinaryStream(binaryData.id, UPLOAD_CHUNK_SIZE); fileContent = await this.helpers.getBinaryStream(binaryData.id, UPLOAD_CHUNK_SIZE);
const metadata = await this.helpers.getBinaryMetadata(binaryData.id); const metadata = await this.helpers.getBinaryMetadata(binaryData.id);
contentLength = metadata.fileSize; contentLength = metadata.fileSize;
mimeType = metadata.mimeType ?? binaryData.mimeType; mimeType = metadata.mimeType ?? binaryData.mimeType;

View file

@ -138,21 +138,31 @@ export const binaryContentTypes = [
export type BodyParametersReducer = ( export type BodyParametersReducer = (
acc: IDataObject, acc: IDataObject,
cur: { name: string; value: string }, cur: { name: string; value: string },
) => IDataObject; ) => Promise<IDataObject>;
export const prepareRequestBody = ( export async function reduceAsync<T, R>(
arr: T[],
reducer: (acc: Awaited<Promise<R>>, cur: T) => Promise<R>,
init: Promise<R> = Promise.resolve({} as R),
): Promise<R> {
return arr.reduce(async (promiseAcc, item) => {
return reducer(await promiseAcc, item);
}, init);
}
export const prepareRequestBody = async (
parameters: BodyParameter[], parameters: BodyParameter[],
bodyType: string, bodyType: string,
version: number, version: number,
defaultReducer: BodyParametersReducer, defaultReducer: BodyParametersReducer,
) => { ) => {
if (bodyType === 'json' && version >= 4) { if (bodyType === 'json' && version >= 4) {
return parameters.reduce((acc, entry) => { return parameters.reduce(async (acc, entry) => {
const value = entry.value; const result = await acc;
set(acc, entry.name, value); set(result, entry.name, entry.value);
return acc; return result;
}, {} as IDataObject); }, Promise.resolve({}));
} else { } else {
return parameters.reduce(defaultReducer, {}); return reduceAsync(parameters, defaultReducer);
} }
}; };

View file

@ -31,6 +31,7 @@ import {
binaryContentTypes, binaryContentTypes,
getOAuth2AdditionalParameters, getOAuth2AdditionalParameters,
prepareRequestBody, prepareRequestBody,
reduceAsync,
replaceNullValues, replaceNullValues,
sanitizeUiMessage, sanitizeUiMessage,
} from '../GenericFunctions'; } from '../GenericFunctions';
@ -1161,7 +1162,7 @@ export class HttpRequestV3 implements INodeType {
}); });
} }
const parametersToKeyValue = ( const parametersToKeyValue = async (
accumulator: { [key: string]: any }, accumulator: { [key: string]: any },
cur: { name: string; value: string; parameterType?: string; inputDataFieldName?: string }, cur: { name: string; value: string; parameterType?: string; inputDataFieldName?: string },
) => { ) => {
@ -1171,7 +1172,7 @@ export class HttpRequestV3 implements INodeType {
let uploadData: Buffer | Readable; let uploadData: Buffer | Readable;
const itemBinaryData = items[itemIndex].binary![cur.inputDataFieldName]; const itemBinaryData = items[itemIndex].binary![cur.inputDataFieldName];
if (itemBinaryData.id) { if (itemBinaryData.id) {
uploadData = this.helpers.getBinaryStream(itemBinaryData.id); uploadData = await this.helpers.getBinaryStream(itemBinaryData.id);
} else { } else {
uploadData = Buffer.from(itemBinaryData.data, BINARY_ENCODING); uploadData = Buffer.from(itemBinaryData.data, BINARY_ENCODING);
} }
@ -1192,7 +1193,7 @@ export class HttpRequestV3 implements INodeType {
// Get parameters defined in the UI // Get parameters defined in the UI
if (sendBody && bodyParameters) { if (sendBody && bodyParameters) {
if (specifyBody === 'keypair' || bodyContentType === 'multipart-form-data') { if (specifyBody === 'keypair' || bodyContentType === 'multipart-form-data') {
requestOptions.body = prepareRequestBody( requestOptions.body = await prepareRequestBody(
bodyParameters, bodyParameters,
bodyContentType, bodyContentType,
nodeVersion, nodeVersion,
@ -1243,7 +1244,7 @@ export class HttpRequestV3 implements INodeType {
const itemBinaryData = this.helpers.assertBinaryData(itemIndex, inputDataFieldName); const itemBinaryData = this.helpers.assertBinaryData(itemIndex, inputDataFieldName);
if (itemBinaryData.id) { if (itemBinaryData.id) {
uploadData = this.helpers.getBinaryStream(itemBinaryData.id); uploadData = await this.helpers.getBinaryStream(itemBinaryData.id);
const metadata = await this.helpers.getBinaryMetadata(itemBinaryData.id); const metadata = await this.helpers.getBinaryMetadata(itemBinaryData.id);
contentLength = metadata.fileSize; contentLength = metadata.fileSize;
} else { } else {
@ -1264,7 +1265,7 @@ export class HttpRequestV3 implements INodeType {
// Get parameters defined in the UI // Get parameters defined in the UI
if (sendQuery && queryParameters) { if (sendQuery && queryParameters) {
if (specifyQuery === 'keypair') { if (specifyQuery === 'keypair') {
requestOptions.qs = queryParameters.reduce(parametersToKeyValue, {}); requestOptions.qs = await reduceAsync(queryParameters, parametersToKeyValue);
} else if (specifyQuery === 'json') { } else if (specifyQuery === 'json') {
// query is specified using JSON // query is specified using JSON
try { try {
@ -1287,7 +1288,7 @@ export class HttpRequestV3 implements INodeType {
if (sendHeaders && headerParameters) { if (sendHeaders && headerParameters) {
let additionalHeaders: IDataObject = {}; let additionalHeaders: IDataObject = {};
if (specifyHeaders === 'keypair') { if (specifyHeaders === 'keypair') {
additionalHeaders = headerParameters.reduce(parametersToKeyValue, {}); additionalHeaders = await reduceAsync(headerParameters, parametersToKeyValue);
} else if (specifyHeaders === 'json') { } else if (specifyHeaders === 'json') {
// body is specified using JSON // body is specified using JSON
try { try {

View file

@ -2,7 +2,7 @@ import { prepareRequestBody } from '../../GenericFunctions';
import type { BodyParameter, BodyParametersReducer } from '../../GenericFunctions'; import type { BodyParameter, BodyParametersReducer } from '../../GenericFunctions';
describe('HTTP Node Utils, prepareRequestBody', () => { describe('HTTP Node Utils, prepareRequestBody', () => {
it('should call default reducer', () => { it('should call default reducer', async () => {
const bodyParameters: BodyParameter[] = [ const bodyParameters: BodyParameter[] = [
{ {
name: 'foo.bar', name: 'foo.bar',
@ -11,15 +11,13 @@ describe('HTTP Node Utils, prepareRequestBody', () => {
]; ];
const defaultReducer: BodyParametersReducer = jest.fn(); const defaultReducer: BodyParametersReducer = jest.fn();
prepareRequestBody(bodyParameters, 'json', 3, defaultReducer); await prepareRequestBody(bodyParameters, 'json', 3, defaultReducer);
expect(defaultReducer).toBeCalledTimes(1); expect(defaultReducer).toBeCalledTimes(1);
expect(defaultReducer).toBeCalledWith({}, { name: 'foo.bar', value: 'baz' }, 0, [ expect(defaultReducer).toBeCalledWith({}, { name: 'foo.bar', value: 'baz' });
{ name: 'foo.bar', value: 'baz' },
]);
}); });
it('should call process dot notations', () => { it('should call process dot notations', async () => {
const bodyParameters: BodyParameter[] = [ const bodyParameters: BodyParameter[] = [
{ {
name: 'foo.bar.spam', name: 'foo.bar.spam',
@ -28,7 +26,7 @@ describe('HTTP Node Utils, prepareRequestBody', () => {
]; ];
const defaultReducer: BodyParametersReducer = jest.fn(); const defaultReducer: BodyParametersReducer = jest.fn();
const result = prepareRequestBody(bodyParameters, 'json', 4, defaultReducer); const result = await prepareRequestBody(bodyParameters, 'json', 4, defaultReducer);
expect(defaultReducer).toBeCalledTimes(0); expect(defaultReducer).toBeCalledTimes(0);
expect(result).toBeDefined(); expect(result).toBeDefined();

View file

@ -1058,7 +1058,7 @@ export class Jira implements INodeType {
let uploadData: Buffer | Readable; let uploadData: Buffer | Readable;
if (binaryData.id) { if (binaryData.id) {
uploadData = this.helpers.getBinaryStream(binaryData.id); uploadData = await this.helpers.getBinaryStream(binaryData.id);
} else { } else {
uploadData = Buffer.from(binaryData.data, BINARY_ENCODING); uploadData = Buffer.from(binaryData.data, BINARY_ENCODING);
} }

View file

@ -1059,7 +1059,7 @@ export class SlackV2 implements INodeType {
let uploadData: Buffer | Readable; let uploadData: Buffer | Readable;
if (binaryData.id) { if (binaryData.id) {
uploadData = this.helpers.getBinaryStream(binaryData.id); uploadData = await this.helpers.getBinaryStream(binaryData.id);
} else { } else {
uploadData = Buffer.from(binaryData.data, BINARY_ENCODING); uploadData = Buffer.from(binaryData.data, BINARY_ENCODING);
} }

View file

@ -92,7 +92,7 @@ export class SpreadsheetFileV2 implements INodeType {
}, },
}); });
if (binaryData.id) { if (binaryData.id) {
const stream = this.helpers.getBinaryStream(binaryData.id); const stream = await this.helpers.getBinaryStream(binaryData.id);
await pipeline(stream, parser); await pipeline(stream, parser);
} else { } else {
parser.write(binaryData.data, BINARY_ENCODING); parser.write(binaryData.data, BINARY_ENCODING);

View file

@ -441,7 +441,7 @@ export class Ssh implements INodeType {
let uploadData: Buffer | Readable; let uploadData: Buffer | Readable;
if (binaryData.id) { if (binaryData.id) {
uploadData = this.helpers.getBinaryStream(binaryData.id); uploadData = await this.helpers.getBinaryStream(binaryData.id);
} else { } else {
uploadData = Buffer.from(binaryData.data, BINARY_ENCODING); uploadData = Buffer.from(binaryData.data, BINARY_ENCODING);
} }

View file

@ -2005,7 +2005,7 @@ export class Telegram implements INodeType {
let uploadData: Buffer | Readable; let uploadData: Buffer | Readable;
if (itemBinaryData.id) { if (itemBinaryData.id) {
uploadData = this.helpers.getBinaryStream(itemBinaryData.id); uploadData = await this.helpers.getBinaryStream(itemBinaryData.id);
} else { } else {
uploadData = Buffer.from(itemBinaryData.data, BINARY_ENCODING); uploadData = Buffer.from(itemBinaryData.data, BINARY_ENCODING);
} }

View file

@ -91,7 +91,7 @@ export class WriteBinaryFile implements INodeType {
let fileContent: Buffer | Readable; let fileContent: Buffer | Readable;
if (binaryData.id) { if (binaryData.id) {
fileContent = this.helpers.getBinaryStream(binaryData.id); fileContent = await this.helpers.getBinaryStream(binaryData.id);
} else { } else {
fileContent = Buffer.from(binaryData.data, BINARY_ENCODING); fileContent = Buffer.from(binaryData.data, BINARY_ENCODING);
} }

View file

@ -687,7 +687,7 @@ export interface BinaryHelperFunctions {
copyBinaryFile(): Promise<never>; copyBinaryFile(): Promise<never>;
binaryToBuffer(body: Buffer | Readable): Promise<Buffer>; binaryToBuffer(body: Buffer | Readable): Promise<Buffer>;
getBinaryPath(binaryDataId: string): string; getBinaryPath(binaryDataId: string): string;
getBinaryStream(binaryDataId: string, chunkSize?: number): Readable; getBinaryStream(binaryDataId: string, chunkSize?: number): Promise<Readable>;
getBinaryMetadata(binaryDataId: string): Promise<{ getBinaryMetadata(binaryDataId: string): Promise<{
fileName?: string; fileName?: string;
mimeType?: string; mimeType?: string;