mirror of
https://github.com/n8n-io/n8n.git
synced 2025-03-05 20:50:17 -08:00
feat(FTP Node): Stream binary data for uploads and downloads (#5296)
This commit is contained in:
parent
c7e9a4375f
commit
448c295314
|
@ -1,4 +1,5 @@
|
||||||
import type { IExecuteFunctions } from 'n8n-core';
|
import type { IExecuteFunctions } from 'n8n-core';
|
||||||
|
import { BINARY_ENCODING } from 'n8n-core';
|
||||||
import type {
|
import type {
|
||||||
ICredentialDataDecryptedObject,
|
ICredentialDataDecryptedObject,
|
||||||
ICredentialsDecrypted,
|
ICredentialsDecrypted,
|
||||||
|
@ -10,7 +11,12 @@ import type {
|
||||||
INodeTypeDescription,
|
INodeTypeDescription,
|
||||||
} from 'n8n-workflow';
|
} from 'n8n-workflow';
|
||||||
import { NodeApiError, NodeOperationError } from 'n8n-workflow';
|
import { NodeApiError, NodeOperationError } from 'n8n-workflow';
|
||||||
|
import { createWriteStream } from 'fs';
|
||||||
import { basename, dirname } from 'path';
|
import { basename, dirname } from 'path';
|
||||||
|
import type { Readable } from 'stream';
|
||||||
|
import { pipeline } from 'stream';
|
||||||
|
import { promisify } from 'util';
|
||||||
|
import { file as tmpFile } from 'tmp-promise';
|
||||||
|
|
||||||
import ftpClient from 'promise-ftp';
|
import ftpClient from 'promise-ftp';
|
||||||
import sftpClient from 'ssh2-sftp-client';
|
import sftpClient from 'ssh2-sftp-client';
|
||||||
|
@ -33,6 +39,8 @@ interface ReturnFtpItem {
|
||||||
path: string;
|
path: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const streamPipeline = promisify(pipeline);
|
||||||
|
|
||||||
async function callRecursiveList(
|
async function callRecursiveList(
|
||||||
path: string,
|
path: string,
|
||||||
client: sftpClient | ftpClient,
|
client: sftpClient | ftpClient,
|
||||||
|
@ -580,18 +588,22 @@ export class Ftp implements INodeType {
|
||||||
|
|
||||||
if (operation === 'download') {
|
if (operation === 'download') {
|
||||||
const path = this.getNodeParameter('path', i) as string;
|
const path = this.getNodeParameter('path', i) as string;
|
||||||
|
const binaryFile = await tmpFile({ prefix: 'n8n-sftp-' });
|
||||||
responseData = await sftp!.get(path);
|
try {
|
||||||
|
await sftp!.get(path, createWriteStream(binaryFile.path));
|
||||||
|
|
||||||
const dataPropertyNameDownload = this.getNodeParameter('binaryPropertyName', i);
|
const dataPropertyNameDownload = this.getNodeParameter('binaryPropertyName', i);
|
||||||
|
|
||||||
const filePathDownload = this.getNodeParameter('path', i) as string;
|
const filePathDownload = this.getNodeParameter('path', i) as string;
|
||||||
items[i].binary![dataPropertyNameDownload] = await this.helpers.prepareBinaryData(
|
|
||||||
responseData as Buffer,
|
items[i].binary![dataPropertyNameDownload] = await this.helpers.copyBinaryFile(
|
||||||
|
binaryFile.path,
|
||||||
filePathDownload,
|
filePathDownload,
|
||||||
);
|
);
|
||||||
|
|
||||||
returnItems.push(items[i]);
|
returnItems.push(items[i]);
|
||||||
|
} finally {
|
||||||
|
await binaryFile.cleanup();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (operation === 'upload') {
|
if (operation === 'upload') {
|
||||||
|
@ -609,8 +621,8 @@ export class Ftp implements INodeType {
|
||||||
}
|
}
|
||||||
|
|
||||||
const propertyNameUpload = this.getNodeParameter('binaryPropertyName', i);
|
const propertyNameUpload = this.getNodeParameter('binaryPropertyName', i);
|
||||||
|
const itemBinaryData = item.binary[propertyNameUpload];
|
||||||
if (item.binary[propertyNameUpload] === undefined) {
|
if (itemBinaryData === undefined) {
|
||||||
throw new NodeOperationError(
|
throw new NodeOperationError(
|
||||||
this.getNode(),
|
this.getNode(),
|
||||||
`No binary data property "${propertyNameUpload}" does not exists on item!`,
|
`No binary data property "${propertyNameUpload}" does not exists on item!`,
|
||||||
|
@ -618,8 +630,13 @@ export class Ftp implements INodeType {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const buffer = await this.helpers.getBinaryDataBuffer(i, propertyNameUpload);
|
let uploadData: Buffer | Readable;
|
||||||
await sftp!.put(buffer, remotePath);
|
if (itemBinaryData.id) {
|
||||||
|
uploadData = this.helpers.getBinaryStream(itemBinaryData.id);
|
||||||
|
} else {
|
||||||
|
uploadData = Buffer.from(itemBinaryData.data, BINARY_ENCODING);
|
||||||
|
}
|
||||||
|
await sftp!.put(uploadData, remotePath);
|
||||||
} else {
|
} else {
|
||||||
// Is text file
|
// Is text file
|
||||||
const buffer = Buffer.from(this.getNodeParameter('fileContent', i) as string, 'utf8');
|
const buffer = Buffer.from(this.getNodeParameter('fileContent', i) as string, 'utf8');
|
||||||
|
@ -669,27 +686,23 @@ export class Ftp implements INodeType {
|
||||||
|
|
||||||
if (operation === 'download') {
|
if (operation === 'download') {
|
||||||
const path = this.getNodeParameter('path', i) as string;
|
const path = this.getNodeParameter('path', i) as string;
|
||||||
|
const binaryFile = await tmpFile({ prefix: 'n8n-sftp-' });
|
||||||
responseData = await ftp!.get(path);
|
try {
|
||||||
|
const stream = await ftp!.get(path);
|
||||||
// Convert readable stream to buffer so that can be displayed properly
|
await streamPipeline(stream, createWriteStream(binaryFile.path));
|
||||||
const chunks = [];
|
|
||||||
for await (const chunk of responseData) {
|
|
||||||
chunks.push(chunk);
|
|
||||||
}
|
|
||||||
|
|
||||||
// @ts-ignore
|
|
||||||
responseData = Buffer.concat(chunks);
|
|
||||||
|
|
||||||
const dataPropertyNameDownload = this.getNodeParameter('binaryPropertyName', i);
|
const dataPropertyNameDownload = this.getNodeParameter('binaryPropertyName', i);
|
||||||
|
|
||||||
const filePathDownload = this.getNodeParameter('path', i) as string;
|
const filePathDownload = this.getNodeParameter('path', i) as string;
|
||||||
items[i].binary![dataPropertyNameDownload] = await this.helpers.prepareBinaryData(
|
|
||||||
responseData,
|
items[i].binary![dataPropertyNameDownload] = await this.helpers.copyBinaryFile(
|
||||||
|
binaryFile.path,
|
||||||
filePathDownload,
|
filePathDownload,
|
||||||
);
|
);
|
||||||
|
|
||||||
returnItems.push(items[i]);
|
returnItems.push(items[i]);
|
||||||
|
} finally {
|
||||||
|
await binaryFile.cleanup();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (operation === 'rename') {
|
if (operation === 'rename') {
|
||||||
|
@ -718,8 +731,8 @@ export class Ftp implements INodeType {
|
||||||
}
|
}
|
||||||
|
|
||||||
const propertyNameUpload = this.getNodeParameter('binaryPropertyName', i);
|
const propertyNameUpload = this.getNodeParameter('binaryPropertyName', i);
|
||||||
|
const itemBinaryData = item.binary[propertyNameUpload];
|
||||||
if (item.binary[propertyNameUpload] === undefined) {
|
if (itemBinaryData === undefined) {
|
||||||
throw new NodeOperationError(
|
throw new NodeOperationError(
|
||||||
this.getNode(),
|
this.getNode(),
|
||||||
`No binary data property "${propertyNameUpload}" does not exists on item!`,
|
`No binary data property "${propertyNameUpload}" does not exists on item!`,
|
||||||
|
@ -727,15 +740,20 @@ export class Ftp implements INodeType {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const buffer = await this.helpers.getBinaryDataBuffer(i, propertyNameUpload);
|
let uploadData: Buffer | Readable;
|
||||||
|
if (itemBinaryData.id) {
|
||||||
|
uploadData = this.helpers.getBinaryStream(itemBinaryData.id);
|
||||||
|
} else {
|
||||||
|
uploadData = Buffer.from(itemBinaryData.data, BINARY_ENCODING);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await ftp!.put(buffer, remotePath);
|
await ftp!.put(uploadData, remotePath);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (error.code === 553) {
|
if (error.code === 553) {
|
||||||
// Create directory
|
// Create directory
|
||||||
await ftp!.mkdir(dirPath, true);
|
await ftp!.mkdir(dirPath, true);
|
||||||
await ftp!.put(buffer, remotePath);
|
await ftp!.put(uploadData, remotePath);
|
||||||
} else {
|
} else {
|
||||||
throw new NodeApiError(this.getNode(), error);
|
throw new NodeApiError(this.getNode(), error);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue