diff --git a/packages/nodes-base/credentials/RabbitMQ.credentials.ts b/packages/nodes-base/credentials/RabbitMQ.credentials.ts new file mode 100644 index 0000000000..046650f58a --- /dev/null +++ b/packages/nodes-base/credentials/RabbitMQ.credentials.ts @@ -0,0 +1,157 @@ +import { + ICredentialType, + NodePropertyTypes, +} from 'n8n-workflow'; + +export class RabbitMQ implements ICredentialType { + name = 'rabbitmq'; + displayName = 'RabbitMQ'; + documentationUrl = 'rabbitmq'; + properties = [ + { + displayName: 'Hostname', + name: 'hostname', + type: 'string' as NodePropertyTypes, + default: '', + placeholder: 'localhost', + }, + { + displayName: 'Port', + name: 'port', + type: 'number' as NodePropertyTypes, + default: 5672, + }, + { + displayName: 'User', + name: 'username', + type: 'string' as NodePropertyTypes, + default: '', + placeholder: 'guest', + }, + { + displayName: 'Password', + name: 'password', + type: 'string' as NodePropertyTypes, + typeOptions: { + password: true, + }, + default: '', + placeholder: 'guest', + }, + { + displayName: 'Vhost', + name: 'vhost', + type: 'string' as NodePropertyTypes, + default: '/', + }, + { + displayName: 'SSL', + name: 'ssl', + type: 'boolean' as NodePropertyTypes, + default: false, + }, + { + displayName: 'Client Certificate', + name: 'cert', + type: 'string' as NodePropertyTypes, + typeOptions: { + password: true, + }, + displayOptions: { + show: { + ssl: [ + true, + ], + }, + }, + default: '', + description: 'SSL Client Certificate to use.', + }, + { + displayName: 'Client Key', + name: 'key', + type: 'string' as NodePropertyTypes, + typeOptions: { + password: true, + }, + displayOptions: { + show: { + ssl: [ + true, + ], + }, + }, + default: '', + description: 'SSL Client Key to use.', + }, + { + displayName: 'Passphrase', + name: 'passphrase', + type: 'string' as NodePropertyTypes, + typeOptions: { + password: true, + }, + displayOptions: { + show: { + ssl: [ + true, + ], + }, + }, + default: '', + description: 'SSL passphrase to use.', + }, + { + displayName: 'CA Certificates', + name: 'ca', + type: 'string' as NodePropertyTypes, + typeOptions: { + password: true, + }, + // typeOptions: { + // multipleValues: true, + // multipleValueButtonText: 'Add Certificate', + // }, + displayOptions: { + show: { + ssl: [ + true, + ], + }, + }, + default: '', + description: 'SSL CA Certificates to use.', + }, + // { + // displayName: 'Client ID', + // name: 'clientId', + // type: 'string' as NodePropertyTypes, + // default: '', + // placeholder: 'my-app', + // }, + // { + // displayName: 'Brokers', + // name: 'brokers', + // type: 'string' as NodePropertyTypes, + // default: '', + // placeholder: 'kafka1:9092,kafka2:9092', + // }, + // { + // displayName: 'Username', + // name: 'username', + // type: 'string' as NodePropertyTypes, + // default: '', + // description: 'Optional username if authenticated is required.', + // }, + // { + // displayName: 'Password', + // name: 'password', + // type: 'string' as NodePropertyTypes, + // typeOptions: { + // password: true, + // }, + // default: '', + // description: 'Optional password if authenticated is required.', + // }, + ]; +} diff --git a/packages/nodes-base/nodes/RabbitMQ/DefaultOptions.ts b/packages/nodes-base/nodes/RabbitMQ/DefaultOptions.ts new file mode 100644 index 0000000000..10d295dc03 --- /dev/null +++ b/packages/nodes-base/nodes/RabbitMQ/DefaultOptions.ts @@ -0,0 +1,60 @@ +import { + INodeProperties, + INodePropertyCollection, + INodePropertyOptions, +} from 'n8n-workflow'; + +export const rabbitDefaultOptions: Array = [ + { + displayName: 'Arguments', + name: 'arguments', + placeholder: 'Add Argument', + description: 'Arguments to add.', + type: 'fixedCollection', + typeOptions: { + multipleValues: true, + }, + default: {}, + options: [ + { + name: 'argument', + displayName: 'Argument', + values: [ + { + displayName: 'Key', + name: 'key', + type: 'string', + default: '', + }, + { + displayName: 'Value', + name: 'value', + type: 'string', + default: '', + }, + ], + }, + ], + }, + { + displayName: 'Auto Delete', + name: 'autoDelete', + type: 'boolean', + default: false, + description: 'The queue will be deleted when the number of consumers drops to zero .', + }, + { + displayName: 'Durable', + name: 'durable', + type: 'boolean', + default: true, + description: 'The queue will survive broker restarts.', + }, + { + displayName: 'Exclusive', + name: 'exclusive', + type: 'boolean', + default: false, + description: 'Scopes the queue to the connection.', + }, +]; diff --git a/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts b/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts new file mode 100644 index 0000000000..13659704a1 --- /dev/null +++ b/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts @@ -0,0 +1,62 @@ +import { + IDataObject, + IExecuteFunctions, + ITriggerFunctions, +} from 'n8n-workflow'; + +const amqplib = require('amqplib'); + +export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunctions, queue: string, options: IDataObject): Promise { // tslint:disable-line:no-any + const credentials = this.getCredentials('rabbitmq') as IDataObject; + + const credentialKeys = [ + 'hostname', + 'port', + 'username', + 'password', + 'vhost', + ]; + const credentialData: IDataObject = {}; + credentialKeys.forEach(key => { + credentialData[key] = credentials[key] === '' ? undefined : credentials[key]; + }); + + const optsData: IDataObject = {}; + if (credentials.ssl === true) { + credentialData.protocol = 'amqps'; + + optsData.cert = credentials.cert === '' ? undefined : Buffer.from(credentials.cert as string); + optsData.key = credentials.key === '' ? undefined : Buffer.from(credentials.key as string); + optsData.passphrase = credentials.passphrase === '' ? undefined : credentials.passphrase; + optsData.ca = credentials.ca === '' ? undefined : [Buffer.from(credentials.ca as string)]; + optsData.credentials = amqplib.credentials.external(); + } + + + return new Promise(async (resolve, reject) => { + try { + const connection = await amqplib.connect(credentialData, optsData); + + connection.on('error', (error: Error) => { + reject(error); + }); + + const channel = await connection.createChannel().catch(console.warn); + + if (options.arguments && ((options.arguments as IDataObject).argument! as IDataObject[]).length) { + const additionalArguments: IDataObject = {}; + ((options.arguments as IDataObject).argument as IDataObject[]).forEach((argument: IDataObject) => { + additionalArguments[argument.key as string] = argument.value; + }); + options.arguments = additionalArguments; + } + + await channel.assertQueue(queue, options); + + resolve(channel); + } catch (error) { + reject(error); + } + }); + +} diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts new file mode 100644 index 0000000000..9103fa5027 --- /dev/null +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts @@ -0,0 +1,147 @@ +import { + IExecuteFunctions, +} from 'n8n-core'; + +import { + IDataObject, + INodeExecutionData, + INodeType, + INodeTypeDescription, +} from 'n8n-workflow'; + +import { + rabbitDefaultOptions, +} from './DefaultOptions'; + +import { + rabbitmqConnect, +} from './GenericFunctions'; + +export class RabbitMQ implements INodeType { + description: INodeTypeDescription = { + displayName: 'RabbitMQ', + name: 'rabbitmq', + icon: 'file:rabbitmq.png', + group: ['transform'], + version: 1, + description: 'Sends messages to a RabbitMQ topic', + defaults: { + name: 'RabbitMQ', + color: '#ff6600', + }, + inputs: ['main'], + outputs: ['main'], + credentials: [ + { + name: 'rabbitmq', + required: true, + }, + ], + properties: [ + { + displayName: 'Queue / Topic', + name: 'queue', + type: 'string', + default: '', + placeholder: 'queue-name', + description: 'Name of the queue to publish to.', + }, + { + displayName: 'Send Input Data', + name: 'sendInputData', + type: 'boolean', + default: true, + description: 'Send the the data the node receives as JSON to Kafka.', + }, + { + displayName: 'Message', + name: 'message', + type: 'string', + displayOptions: { + show: { + sendInputData: [ + false, + ], + }, + }, + default: '', + description: 'The message to be sent.', + }, + { + displayName: 'Options', + name: 'options', + type: 'collection', + default: {}, + placeholder: 'Add Option', + options: rabbitDefaultOptions, + }, + ], + }; + + async execute(this: IExecuteFunctions): Promise { + let channel; + try { + const items = this.getInputData(); + + const queue = this.getNodeParameter('queue', 0) as string; + + const options = this.getNodeParameter('options', 0, {}) as IDataObject; + + channel = await rabbitmqConnect.call(this, queue, options); + + const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean; + + let message: string; + + const queuePromises = []; + for (let i = 0; i < items.length; i++) { + if (sendInputData === true) { + message = JSON.stringify(items[i].json); + } else { + message = this.getNodeParameter('message', i) as string; + } + + queuePromises.push(channel.sendToQueue(queue, Buffer.from(message))); + } + + // @ts-ignore + const promisesResponses = await Promise.allSettled(queuePromises); + + const returnItems: INodeExecutionData[] = []; + + promisesResponses.forEach((response: IDataObject) => { + if (response!.status !== 'fulfilled') { + + if (this.continueOnFail() !== true) { + throw new Error(response!.reason as string); + } else { + // Return the actual reason as error + returnItems.push( + { + json: { + error: response.reason, + }, + }, + ); + return; + } + } + + returnItems.push({ + json: { + success: response.value, + }, + }); + }); + + await channel.close(); + + return this.prepareOutputData(returnItems); + } catch (error) { + if (channel) { + await channel.close(); + } + throw error; + } + } +} diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts new file mode 100644 index 0000000000..67deb8d882 --- /dev/null +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts @@ -0,0 +1,172 @@ +import { + IDataObject, + INodeExecutionData, + INodeProperties, + INodeType, + INodeTypeDescription, + ITriggerFunctions, + ITriggerResponse, +} from 'n8n-workflow'; + +import { + rabbitDefaultOptions, +} from './DefaultOptions'; + +import { + rabbitmqConnect, +} from './GenericFunctions'; + +export class RabbitMQTrigger implements INodeType { + description: INodeTypeDescription = { + displayName: 'RabbitMQ Trigger', + name: 'rabbitmqTrigger', + icon: 'file:rabbitmq.png', + group: ['trigger'], + version: 1, + description: 'Listens to RabbitMQ messages', + defaults: { + name: 'RabbitMQ', + color: '#ff6600', + }, + inputs: [], + outputs: ['main'], + credentials: [ + { + name: 'rabbitmq', + required: true, + }, + ], + properties: [ + { + displayName: 'Queue / Topic', + name: 'queue', + type: 'string', + default: '', + placeholder: 'queue-name', + description: 'Name of the queue to publish to.', + }, + + { + displayName: 'Options', + name: 'options', + type: 'collection', + default: {}, + placeholder: 'Add Option', + options: [ + { + displayName: 'Content is Binary', + name: 'contentIsBinary', + type: 'boolean', + default: false, + description: 'Saves the content as binary.', + }, + { + displayName: 'JSON Parse Body', + name: 'jsonParseBody', + type: 'boolean', + displayOptions: { + hide: { + contentIsBinary: [ + true, + ], + }, + }, + default: false, + description: 'Parse the body to an object.', + }, + { + displayName: 'Only Content', + name: 'onlyContent', + type: 'boolean', + displayOptions: { + hide: { + contentIsBinary: [ + true, + ], + }, + }, + default: false, + description: 'Returns only the content property.', + }, + ...rabbitDefaultOptions, + ].sort((a, b) => { + if ((a as INodeProperties).displayName.toLowerCase() < (b as INodeProperties).displayName.toLowerCase()) { return -1; } + if ((a as INodeProperties).displayName.toLowerCase() > (b as INodeProperties).displayName.toLowerCase()) { return 1; } + return 0; + }) as INodeProperties[], + }, + ], + }; + + + async trigger(this: ITriggerFunctions): Promise { + const queue = this.getNodeParameter('queue') as string; + const options = this.getNodeParameter('options', {}) as IDataObject; + + const channel = await rabbitmqConnect.call(this, queue, options); + + const self = this; + + const item: INodeExecutionData = { + json: {}, + }; + + const startConsumer = async () => { + await channel.consume(queue, async (message: IDataObject) => { + if (message !== null) { + let content: IDataObject | string = message!.content!.toString(); + + if (options.contentIsBinary === true) { + item.binary = { + data: await this.helpers.prepareBinaryData(message.content), + }; + + item.json = message; + message.content = undefined; + } else { + if (options.jsonParseBody === true) { + content = JSON.parse(content as string); + } + if (options.onlyContent === true) { + item.json = content as IDataObject; + } else { + message.content = content; + item.json = message; + } + } + + self.emit([ + [ + item, + ], + ]); + channel.ack(message); + } + }); + }; + + startConsumer(); + + // The "closeFunction" function gets called by n8n whenever + // the workflow gets deactivated and can so clean up. + async function closeFunction() { + await channel.close(); + } + + // The "manualTriggerFunction" function gets called by n8n + // when a user is in the workflow editor and starts the + // workflow manually. So the function has to make sure that + // the emit() gets called with similar data like when it + // would trigger by itself so that the user knows what data + // to expect. + async function manualTriggerFunction() { + startConsumer(); + } + + return { + closeFunction, + manualTriggerFunction, + }; + } + +} diff --git a/packages/nodes-base/nodes/RabbitMQ/rabbitmq.png b/packages/nodes-base/nodes/RabbitMQ/rabbitmq.png new file mode 100644 index 0000000000..d37a2a0fbe Binary files /dev/null and b/packages/nodes-base/nodes/RabbitMQ/rabbitmq.png differ diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index 9751b8aa46..fbf83d2d86 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -171,6 +171,7 @@ "dist/credentials/PushcutApi.credentials.js", "dist/credentials/QuestDb.credentials.js", "dist/credentials/QuickBaseApi.credentials.js", + "dist/credentials/RabbitMQ.credentials.js", "dist/credentials/Redis.credentials.js", "dist/credentials/RocketchatApi.credentials.js", "dist/credentials/RundeckApi.credentials.js", @@ -405,6 +406,8 @@ "dist/nodes/Pushover/Pushover.node.js", "dist/nodes/QuestDb/QuestDb.node.js", "dist/nodes/QuickBase/QuickBase.node.js", + "dist/nodes/RabbitMQ/RabbitMQ.node.js", + "dist/nodes/RabbitMQ/RabbitMQTrigger.node.js", "dist/nodes/ReadBinaryFile.node.js", "dist/nodes/ReadBinaryFiles.node.js", "dist/nodes/ReadPdf.node.js", @@ -513,6 +516,7 @@ "dependencies": { "@types/promise-ftp": "^1.3.4", "@types/snowflake-sdk": "^1.5.1", + "amqplib": "^0.6.0", "aws4": "^1.8.0", "basic-auth": "^2.0.1", "change-case": "^4.1.1",