mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-12 13:27:31 -08:00
fix(FTP Node): Continue of fail looping support with paired item (#8659)
This commit is contained in:
parent
bee17dd6cc
commit
3279762221
|
@ -18,7 +18,7 @@ import type {
|
|||
INodeTypeDescription,
|
||||
JsonObject,
|
||||
} from 'n8n-workflow';
|
||||
import { formatPrivateKey } from '@utils/utilities';
|
||||
import { formatPrivateKey, generatePairedItemData } from '@utils/utilities';
|
||||
|
||||
interface ReturnFtpItem {
|
||||
type: string;
|
||||
|
@ -515,287 +515,312 @@ export class Ftp implements INodeType {
|
|||
}
|
||||
let ftp: ftpClient;
|
||||
let sftp: sftpClient;
|
||||
|
||||
try {
|
||||
if (protocol === 'sftp') {
|
||||
sftp = new sftpClient();
|
||||
if (credentials.privateKey) {
|
||||
await sftp.connect({
|
||||
host: credentials.host as string,
|
||||
port: credentials.port as number,
|
||||
username: credentials.username as string,
|
||||
password: (credentials.password as string) || undefined,
|
||||
privateKey: formatPrivateKey(credentials.privateKey as string),
|
||||
passphrase: credentials.passphrase as string | undefined,
|
||||
});
|
||||
try {
|
||||
if (protocol === 'sftp') {
|
||||
sftp = new sftpClient();
|
||||
if (credentials.privateKey) {
|
||||
await sftp.connect({
|
||||
host: credentials.host as string,
|
||||
port: credentials.port as number,
|
||||
username: credentials.username as string,
|
||||
password: (credentials.password as string) || undefined,
|
||||
privateKey: formatPrivateKey(credentials.privateKey as string),
|
||||
passphrase: credentials.passphrase as string | undefined,
|
||||
});
|
||||
} else {
|
||||
await sftp.connect({
|
||||
host: credentials.host as string,
|
||||
port: credentials.port as number,
|
||||
username: credentials.username as string,
|
||||
password: credentials.password as string,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
await sftp.connect({
|
||||
ftp = new ftpClient();
|
||||
await ftp.connect({
|
||||
host: credentials.host as string,
|
||||
port: credentials.port as number,
|
||||
username: credentials.username as string,
|
||||
user: credentials.username as string,
|
||||
password: credentials.password as string,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
ftp = new ftpClient();
|
||||
await ftp.connect({
|
||||
host: credentials.host as string,
|
||||
port: credentials.port as number,
|
||||
user: credentials.username as string,
|
||||
password: credentials.password as string,
|
||||
});
|
||||
} catch (error) {
|
||||
if (this.continueOnFail()) {
|
||||
const pairedItem = generatePairedItemData(items.length);
|
||||
|
||||
return [[{ json: { error: error.message }, pairedItem }]];
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
|
||||
for (let i = 0; i < items.length; i++) {
|
||||
const newItem: INodeExecutionData = {
|
||||
json: items[i].json,
|
||||
binary: {},
|
||||
pairedItem: items[i].pairedItem,
|
||||
};
|
||||
try {
|
||||
const newItem: INodeExecutionData = {
|
||||
json: items[i].json,
|
||||
binary: {},
|
||||
pairedItem: items[i].pairedItem,
|
||||
};
|
||||
|
||||
if (items[i].binary !== undefined && newItem.binary) {
|
||||
// Create a shallow copy of the binary data so that the old
|
||||
// data references which do not get changed still stay behind
|
||||
// but the incoming data does not get changed.
|
||||
Object.assign(newItem.binary, items[i].binary);
|
||||
}
|
||||
|
||||
items[i] = newItem;
|
||||
|
||||
if (protocol === 'sftp') {
|
||||
if (operation === 'list') {
|
||||
const path = this.getNodeParameter('path', i) as string;
|
||||
|
||||
const recursive = this.getNodeParameter('recursive', i) as boolean;
|
||||
|
||||
let responseData: sftpClient.FileInfo[];
|
||||
if (recursive) {
|
||||
responseData = await callRecursiveList(path, sftp!, normalizeSFtpItem);
|
||||
} else {
|
||||
responseData = await sftp!.list(path);
|
||||
responseData.forEach((item) => normalizeSFtpItem(item, path));
|
||||
}
|
||||
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
this.helpers.returnJsonArray(responseData as unknown as IDataObject[]),
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
if (items[i].binary !== undefined && newItem.binary) {
|
||||
// Create a shallow copy of the binary data so that the old
|
||||
// data references which do not get changed still stay behind
|
||||
// but the incoming data does not get changed.
|
||||
Object.assign(newItem.binary, items[i].binary);
|
||||
}
|
||||
|
||||
if (operation === 'delete') {
|
||||
const path = this.getNodeParameter('path', i) as string;
|
||||
const options = this.getNodeParameter('options', i);
|
||||
items[i] = newItem;
|
||||
|
||||
if (options.folder === true) {
|
||||
await sftp!.rmdir(path, !!options.recursive);
|
||||
} else {
|
||||
await sftp!.delete(path);
|
||||
}
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
[{ json: { success: true } }],
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
}
|
||||
if (protocol === 'sftp') {
|
||||
if (operation === 'list') {
|
||||
const path = this.getNodeParameter('path', i) as string;
|
||||
|
||||
if (operation === 'rename') {
|
||||
const oldPath = this.getNodeParameter('oldPath', i) as string;
|
||||
const { createDirectories = false } = this.getNodeParameter('options', i) as {
|
||||
createDirectories: boolean;
|
||||
};
|
||||
const newPath = this.getNodeParameter('newPath', i) as string;
|
||||
const recursive = this.getNodeParameter('recursive', i) as boolean;
|
||||
|
||||
if (createDirectories) {
|
||||
await recursivelyCreateSftpDirs(sftp!, newPath);
|
||||
}
|
||||
|
||||
await sftp!.rename(oldPath, newPath);
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
[{ json: { success: true } }],
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
}
|
||||
|
||||
if (operation === 'download') {
|
||||
const path = this.getNodeParameter('path', i) as string;
|
||||
const binaryFile = await tmpFile({ prefix: 'n8n-sftp-' });
|
||||
try {
|
||||
await sftp!.get(path, createWriteStream(binaryFile.path));
|
||||
|
||||
const dataPropertyNameDownload = this.getNodeParameter('binaryPropertyName', i);
|
||||
const remoteFilePath = this.getNodeParameter('path', i) as string;
|
||||
|
||||
items[i].binary![dataPropertyNameDownload] = await this.nodeHelpers.copyBinaryFile(
|
||||
binaryFile.path,
|
||||
basename(remoteFilePath),
|
||||
);
|
||||
let responseData: sftpClient.FileInfo[];
|
||||
if (recursive) {
|
||||
responseData = await callRecursiveList(path, sftp!, normalizeSFtpItem);
|
||||
} else {
|
||||
responseData = await sftp!.list(path);
|
||||
responseData.forEach((item) => normalizeSFtpItem(item, path));
|
||||
}
|
||||
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
this.helpers.returnJsonArray(items[i]),
|
||||
this.helpers.returnJsonArray(responseData as unknown as IDataObject[]),
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
} finally {
|
||||
await binaryFile.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
if (operation === 'upload') {
|
||||
const remotePath = this.getNodeParameter('path', i) as string;
|
||||
await recursivelyCreateSftpDirs(sftp!, remotePath);
|
||||
if (operation === 'delete') {
|
||||
const path = this.getNodeParameter('path', i) as string;
|
||||
const options = this.getNodeParameter('options', i);
|
||||
|
||||
if (this.getNodeParameter('binaryData', i)) {
|
||||
const binaryPropertyName = this.getNodeParameter('binaryPropertyName', i);
|
||||
const binaryData = this.helpers.assertBinaryData(i, binaryPropertyName);
|
||||
|
||||
let uploadData: Buffer | Readable;
|
||||
if (binaryData.id) {
|
||||
uploadData = await this.helpers.getBinaryStream(binaryData.id);
|
||||
if (options.folder === true) {
|
||||
await sftp!.rmdir(path, !!options.recursive);
|
||||
} else {
|
||||
uploadData = Buffer.from(binaryData.data, BINARY_ENCODING);
|
||||
await sftp!.delete(path);
|
||||
}
|
||||
await sftp!.put(uploadData, remotePath);
|
||||
} else {
|
||||
// Is text file
|
||||
const buffer = Buffer.from(this.getNodeParameter('fileContent', i) as string, 'utf8');
|
||||
await sftp!.put(buffer, remotePath);
|
||||
}
|
||||
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
this.helpers.returnJsonArray(items[i]),
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
}
|
||||
}
|
||||
|
||||
if (protocol === 'ftp') {
|
||||
if (operation === 'list') {
|
||||
const path = this.getNodeParameter('path', i) as string;
|
||||
|
||||
const recursive = this.getNodeParameter('recursive', i) as boolean;
|
||||
|
||||
let responseData;
|
||||
if (recursive) {
|
||||
responseData = await callRecursiveList(path, ftp!, normalizeFtpItem);
|
||||
} else {
|
||||
responseData = await ftp!.list(path);
|
||||
responseData.forEach((item) =>
|
||||
normalizeFtpItem(item as ftpClient.ListingElement, path),
|
||||
);
|
||||
}
|
||||
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
this.helpers.returnJsonArray(responseData as unknown as IDataObject[]),
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
}
|
||||
|
||||
if (operation === 'delete') {
|
||||
const path = this.getNodeParameter('path', i) as string;
|
||||
const options = this.getNodeParameter('options', i);
|
||||
|
||||
if (options.folder === true) {
|
||||
await ftp!.rmdir(path, !!options.recursive);
|
||||
} else {
|
||||
await ftp!.delete(path);
|
||||
}
|
||||
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
[{ json: { success: true } }],
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
}
|
||||
|
||||
if (operation === 'download') {
|
||||
const path = this.getNodeParameter('path', i) as string;
|
||||
const binaryFile = await tmpFile({ prefix: 'n8n-sftp-' });
|
||||
try {
|
||||
const stream = await ftp!.get(path);
|
||||
await pipeline(stream, createWriteStream(binaryFile.path));
|
||||
|
||||
const dataPropertyNameDownload = this.getNodeParameter('binaryPropertyName', i);
|
||||
const remoteFilePath = this.getNodeParameter('path', i) as string;
|
||||
|
||||
items[i].binary![dataPropertyNameDownload] = await this.nodeHelpers.copyBinaryFile(
|
||||
binaryFile.path,
|
||||
basename(remoteFilePath),
|
||||
);
|
||||
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
this.helpers.returnJsonArray(items[i]),
|
||||
[{ json: { success: true } }],
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
} finally {
|
||||
await binaryFile.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
if (operation === 'rename') {
|
||||
const oldPath = this.getNodeParameter('oldPath', i) as string;
|
||||
if (operation === 'rename') {
|
||||
const oldPath = this.getNodeParameter('oldPath', i) as string;
|
||||
const { createDirectories = false } = this.getNodeParameter('options', i) as {
|
||||
createDirectories: boolean;
|
||||
};
|
||||
const newPath = this.getNodeParameter('newPath', i) as string;
|
||||
|
||||
const newPath = this.getNodeParameter('newPath', i) as string;
|
||||
|
||||
await ftp!.rename(oldPath, newPath);
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
[{ json: { success: true } }],
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
}
|
||||
|
||||
if (operation === 'upload') {
|
||||
const remotePath = this.getNodeParameter('path', i) as string;
|
||||
const fileName = basename(remotePath);
|
||||
const dirPath = remotePath.replace(fileName, '');
|
||||
|
||||
if (this.getNodeParameter('binaryData', i)) {
|
||||
const binaryPropertyName = this.getNodeParameter('binaryPropertyName', i);
|
||||
const binaryData = this.helpers.assertBinaryData(i, binaryPropertyName);
|
||||
|
||||
let uploadData: Buffer | Readable;
|
||||
if (binaryData.id) {
|
||||
uploadData = await this.helpers.getBinaryStream(binaryData.id);
|
||||
} else {
|
||||
uploadData = Buffer.from(binaryData.data, BINARY_ENCODING);
|
||||
if (createDirectories) {
|
||||
await recursivelyCreateSftpDirs(sftp!, newPath);
|
||||
}
|
||||
|
||||
await sftp!.rename(oldPath, newPath);
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
[{ json: { success: true } }],
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
}
|
||||
|
||||
if (operation === 'download') {
|
||||
const path = this.getNodeParameter('path', i) as string;
|
||||
const binaryFile = await tmpFile({ prefix: 'n8n-sftp-' });
|
||||
try {
|
||||
await ftp!.put(uploadData, remotePath);
|
||||
} catch (error) {
|
||||
if (error.code === 553) {
|
||||
// Create directory
|
||||
await ftp!.mkdir(dirPath, true);
|
||||
await sftp!.get(path, createWriteStream(binaryFile.path));
|
||||
|
||||
const dataPropertyNameDownload = this.getNodeParameter('binaryPropertyName', i);
|
||||
const remoteFilePath = this.getNodeParameter('path', i) as string;
|
||||
|
||||
items[i].binary![dataPropertyNameDownload] = await this.nodeHelpers.copyBinaryFile(
|
||||
binaryFile.path,
|
||||
basename(remoteFilePath),
|
||||
);
|
||||
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
this.helpers.returnJsonArray(items[i]),
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
} finally {
|
||||
await binaryFile.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
if (operation === 'upload') {
|
||||
const remotePath = this.getNodeParameter('path', i) as string;
|
||||
await recursivelyCreateSftpDirs(sftp!, remotePath);
|
||||
|
||||
if (this.getNodeParameter('binaryData', i)) {
|
||||
const binaryPropertyName = this.getNodeParameter('binaryPropertyName', i);
|
||||
const binaryData = this.helpers.assertBinaryData(i, binaryPropertyName);
|
||||
|
||||
let uploadData: Buffer | Readable;
|
||||
if (binaryData.id) {
|
||||
uploadData = await this.helpers.getBinaryStream(binaryData.id);
|
||||
} else {
|
||||
uploadData = Buffer.from(binaryData.data, BINARY_ENCODING);
|
||||
}
|
||||
await sftp!.put(uploadData, remotePath);
|
||||
} else {
|
||||
// Is text file
|
||||
const buffer = Buffer.from(
|
||||
this.getNodeParameter('fileContent', i) as string,
|
||||
'utf8',
|
||||
);
|
||||
await sftp!.put(buffer, remotePath);
|
||||
}
|
||||
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
this.helpers.returnJsonArray(items[i]),
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
}
|
||||
}
|
||||
|
||||
if (protocol === 'ftp') {
|
||||
if (operation === 'list') {
|
||||
const path = this.getNodeParameter('path', i) as string;
|
||||
|
||||
const recursive = this.getNodeParameter('recursive', i) as boolean;
|
||||
|
||||
let responseData;
|
||||
if (recursive) {
|
||||
responseData = await callRecursiveList(path, ftp!, normalizeFtpItem);
|
||||
} else {
|
||||
responseData = await ftp!.list(path);
|
||||
responseData.forEach((item) =>
|
||||
normalizeFtpItem(item as ftpClient.ListingElement, path),
|
||||
);
|
||||
}
|
||||
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
this.helpers.returnJsonArray(responseData as unknown as IDataObject[]),
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
}
|
||||
|
||||
if (operation === 'delete') {
|
||||
const path = this.getNodeParameter('path', i) as string;
|
||||
const options = this.getNodeParameter('options', i);
|
||||
|
||||
if (options.folder === true) {
|
||||
await ftp!.rmdir(path, !!options.recursive);
|
||||
} else {
|
||||
await ftp!.delete(path);
|
||||
}
|
||||
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
[{ json: { success: true } }],
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
}
|
||||
|
||||
if (operation === 'download') {
|
||||
const path = this.getNodeParameter('path', i) as string;
|
||||
const binaryFile = await tmpFile({ prefix: 'n8n-sftp-' });
|
||||
try {
|
||||
const stream = await ftp!.get(path);
|
||||
await pipeline(stream, createWriteStream(binaryFile.path));
|
||||
|
||||
const dataPropertyNameDownload = this.getNodeParameter('binaryPropertyName', i);
|
||||
const remoteFilePath = this.getNodeParameter('path', i) as string;
|
||||
|
||||
items[i].binary![dataPropertyNameDownload] = await this.nodeHelpers.copyBinaryFile(
|
||||
binaryFile.path,
|
||||
basename(remoteFilePath),
|
||||
);
|
||||
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
this.helpers.returnJsonArray(items[i]),
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
} finally {
|
||||
await binaryFile.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
if (operation === 'rename') {
|
||||
const oldPath = this.getNodeParameter('oldPath', i) as string;
|
||||
|
||||
const newPath = this.getNodeParameter('newPath', i) as string;
|
||||
|
||||
await ftp!.rename(oldPath, newPath);
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
[{ json: { success: true } }],
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
}
|
||||
|
||||
if (operation === 'upload') {
|
||||
const remotePath = this.getNodeParameter('path', i) as string;
|
||||
const fileName = basename(remotePath);
|
||||
const dirPath = remotePath.replace(fileName, '');
|
||||
|
||||
if (this.getNodeParameter('binaryData', i)) {
|
||||
const binaryPropertyName = this.getNodeParameter('binaryPropertyName', i);
|
||||
const binaryData = this.helpers.assertBinaryData(i, binaryPropertyName);
|
||||
|
||||
let uploadData: Buffer | Readable;
|
||||
if (binaryData.id) {
|
||||
uploadData = await this.helpers.getBinaryStream(binaryData.id);
|
||||
} else {
|
||||
uploadData = Buffer.from(binaryData.data, BINARY_ENCODING);
|
||||
}
|
||||
|
||||
try {
|
||||
await ftp!.put(uploadData, remotePath);
|
||||
} else {
|
||||
throw new NodeApiError(this.getNode(), error as JsonObject);
|
||||
} catch (error) {
|
||||
if (error.code === 553) {
|
||||
// Create directory
|
||||
await ftp!.mkdir(dirPath, true);
|
||||
await ftp!.put(uploadData, remotePath);
|
||||
} else {
|
||||
throw new NodeApiError(this.getNode(), error as JsonObject);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Is text file
|
||||
const buffer = Buffer.from(this.getNodeParameter('fileContent', i) as string, 'utf8');
|
||||
try {
|
||||
await ftp!.put(buffer, remotePath);
|
||||
} catch (error) {
|
||||
if (error.code === 553) {
|
||||
// Create directory
|
||||
await ftp!.mkdir(dirPath, true);
|
||||
} else {
|
||||
// Is text file
|
||||
const buffer = Buffer.from(
|
||||
this.getNodeParameter('fileContent', i) as string,
|
||||
'utf8',
|
||||
);
|
||||
try {
|
||||
await ftp!.put(buffer, remotePath);
|
||||
} else {
|
||||
throw new NodeApiError(this.getNode(), error as JsonObject);
|
||||
} catch (error) {
|
||||
if (error.code === 553) {
|
||||
// Create directory
|
||||
await ftp!.mkdir(dirPath, true);
|
||||
await ftp!.put(buffer, remotePath);
|
||||
} else {
|
||||
throw new NodeApiError(this.getNode(), error as JsonObject);
|
||||
}
|
||||
}
|
||||
}
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
this.helpers.returnJsonArray(items[i]),
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
}
|
||||
const executionData = this.helpers.constructExecutionMetaData(
|
||||
this.helpers.returnJsonArray(items[i]),
|
||||
{ itemData: { item: i } },
|
||||
);
|
||||
returnItems = returnItems.concat(executionData);
|
||||
}
|
||||
} catch (error) {
|
||||
if (this.continueOnFail()) {
|
||||
returnItems.push({ json: { error: error.message }, pairedItem: { item: i } });
|
||||
continue;
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -810,10 +835,6 @@ export class Ftp implements INodeType {
|
|||
} else {
|
||||
await ftp!.end();
|
||||
}
|
||||
if (this.continueOnFail()) {
|
||||
return [[{ json: { error: error.message } }]];
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue