diff --git a/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts b/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts index 13659704a1..527b9dd55d 100644 --- a/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts +++ b/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts @@ -6,7 +6,7 @@ import { const amqplib = require('amqplib'); -export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunctions, queue: string, options: IDataObject): Promise { // tslint:disable-line:no-any +export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunctions, options: IDataObject): Promise { // tslint:disable-line:no-any const credentials = this.getCredentials('rabbitmq') as IDataObject; const credentialKeys = [ @@ -16,6 +16,7 @@ export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunction 'password', 'vhost', ]; + const credentialData: IDataObject = {}; credentialKeys.forEach(key => { credentialData[key] = credentials[key] === '' ? undefined : credentials[key]; @@ -51,12 +52,36 @@ export async function rabbitmqConnect(this: IExecuteFunctions | ITriggerFunction options.arguments = additionalArguments; } - await channel.assertQueue(queue, options); resolve(channel); } catch (error) { reject(error); } }); - +} + +export async function rabbitmqConnectQueue(this: IExecuteFunctions | ITriggerFunctions, queue: string, options: IDataObject): Promise { // tslint:disable-line:no-any + const channel = await rabbitmqConnect.call(this, options); + + return new Promise(async (resolve, reject) => { + try { + await channel.assertQueue(queue, options); + resolve(channel); + } catch (error) { + reject(error); + } + }); +} + +export async function rabbitmqConnectExchange(this: IExecuteFunctions | ITriggerFunctions, exchange: string, type: string, options: IDataObject): Promise { // tslint:disable-line:no-any + const channel = await rabbitmqConnect.call(this, options); + + return new Promise(async (resolve, reject) => { + try { + await channel.assertExchange(exchange, type, options); + resolve(channel); + } catch (error) { + reject(error); + } + }); } diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts index 9103fa5027..e5d8443738 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts @@ -10,11 +10,8 @@ import { } from 'n8n-workflow'; import { - rabbitDefaultOptions, -} from './DefaultOptions'; - -import { - rabbitmqConnect, + rabbitmqConnectExchange, + rabbitmqConnectQueue, } from './GenericFunctions'; export class RabbitMQ implements INodeType { @@ -38,20 +35,126 @@ export class RabbitMQ implements INodeType { }, ], properties: [ + { + displayName: 'Mode', + name: 'mode', + type: 'options', + options: [ + { + name: 'Queue', + value: 'queue', + description: 'Publish data to queue.', + }, + { + name: 'Exchange', + value: 'exchange', + description: 'Publish data to exchange.', + }, + ], + default: 'queue', + description: 'To where data should be moved.', + }, + + // ---------------------------------- + // Queue + // ---------------------------------- { displayName: 'Queue / Topic', name: 'queue', type: 'string', + displayOptions: { + show: { + mode: [ + 'queue', + ], + }, + }, default: '', placeholder: 'queue-name', description: 'Name of the queue to publish to.', }, + + // ---------------------------------- + // Exchange + // ---------------------------------- + + { + displayName: 'Exchange', + name: 'exchange', + type: 'string', + displayOptions: { + show: { + mode: [ + 'exchange', + ], + }, + }, + default: '', + placeholder: 'exchange-name', + description: 'Name of the exchange to publish to.', + }, + { + displayName: 'Type', + name: 'exchangeType', + type: 'options', + displayOptions: { + show: { + mode: [ + 'exchange', + ], + }, + }, + options: [ + { + name: 'Direct', + value: 'direct', + description: 'Direct exchange type.', + }, + { + name: 'Topic', + value: 'topic', + description: 'Topic exchange type.', + }, + { + name: 'Headers', + value: 'headers', + description: 'Headers exchange type.', + }, + { + name: 'Fanout', + value: 'fanout', + description: 'Fanout exchange type.', + }, + ], + default: 'fanout', + description: 'Type of exchange.', + }, + { + displayName: 'Routing key', + name: 'routingKey', + type: 'string', + displayOptions: { + show: { + mode: [ + 'exchange', + ], + }, + }, + default: '', + placeholder: 'routing-key', + description: 'The routing key for the message.', + }, + + // ---------------------------------- + // Default + // ---------------------------------- + { displayName: 'Send Input Data', name: 'sendInputData', type: 'boolean', default: true, - description: 'Send the the data the node receives as JSON to Kafka.', + description: 'Send the the data the node receives as JSON.', }, { displayName: 'Message', @@ -73,7 +176,81 @@ export class RabbitMQ implements INodeType { type: 'collection', default: {}, placeholder: 'Add Option', - options: rabbitDefaultOptions, + options: [ + { + displayName: 'Arguments', + name: 'arguments', + placeholder: 'Add Argument', + description: 'Arguments to add.', + type: 'fixedCollection', + typeOptions: { + multipleValues: true, + }, + default: {}, + options: [ + { + name: 'argument', + displayName: 'Argument', + values: [ + { + displayName: 'Key', + name: 'key', + type: 'string', + default: '', + }, + { + displayName: 'Value', + name: 'value', + type: 'string', + default: '', + }, + ], + }, + ], + }, + { + displayName: 'Auto Delete', + name: 'autoDelete', + type: 'boolean', + default: false, + description: 'The queue will be deleted when the number of consumers drops to zero .', + }, + { + displayName: 'Durable', + name: 'durable', + type: 'boolean', + default: true, + description: 'The queue will survive broker restarts.', + }, + { + displayName: 'Exclusive', + name: 'exclusive', + type: 'boolean', + displayOptions: { + show: { + '/mode': [ + 'queue', + ], + }, + }, + default: false, + description: 'Scopes the queue to the connection.', + }, + { + displayName: 'Alternate Exchange', + name: 'alternateExchange', + type: 'string', + displayOptions: { + show: { + '/mode': [ + 'exchange', + ], + }, + }, + default: '', + description: 'An exchange to send messages to if this exchange can’t route them to any queues', + }, + ], }, ], }; @@ -82,62 +259,122 @@ export class RabbitMQ implements INodeType { let channel; try { const items = this.getInputData(); - - const queue = this.getNodeParameter('queue', 0) as string; - - const options = this.getNodeParameter('options', 0, {}) as IDataObject; - - channel = await rabbitmqConnect.call(this, queue, options); - - const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean; - - let message: string; - - const queuePromises = []; - for (let i = 0; i < items.length; i++) { - if (sendInputData === true) { - message = JSON.stringify(items[i].json); - } else { - message = this.getNodeParameter('message', i) as string; - } - - queuePromises.push(channel.sendToQueue(queue, Buffer.from(message))); - } - - // @ts-ignore - const promisesResponses = await Promise.allSettled(queuePromises); + const mode = this.getNodeParameter('mode', 0) as string; const returnItems: INodeExecutionData[] = []; - promisesResponses.forEach((response: IDataObject) => { - if (response!.status !== 'fulfilled') { + if (mode === 'queue') { + const queue = this.getNodeParameter('queue', 0) as string; - if (this.continueOnFail() !== true) { - throw new Error(response!.reason as string); + const options = this.getNodeParameter('options', 0, {}) as IDataObject; + + channel = await rabbitmqConnectQueue.call(this, queue, options); + + const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean; + + let message: string; + + const queuePromises = []; + for (let i = 0; i < items.length; i++) { + if (sendInputData === true) { + message = JSON.stringify(items[i].json); } else { - // Return the actual reason as error - returnItems.push( - { - json: { - error: response.reason, - }, - }, - ); - return; + message = this.getNodeParameter('message', i) as string; } + + queuePromises.push(channel.sendToQueue(queue, Buffer.from(message))); } - returnItems.push({ - json: { - success: response.value, - }, - }); - }); + // @ts-ignore + const promisesResponses = await Promise.allSettled(queuePromises); - await channel.close(); + promisesResponses.forEach((response: IDataObject) => { + if (response!.status !== 'fulfilled') { + + if (this.continueOnFail() !== true) { + throw new Error(response!.reason as string); + } else { + // Return the actual reason as error + returnItems.push( + { + json: { + error: response.reason, + }, + }, + ); + return; + } + } + + returnItems.push({ + json: { + success: response.value, + }, + }); + }); + + await channel.close(); + } + else if (mode === 'exchange') { + const exchange = this.getNodeParameter('exchange', 0) as string; + const type = this.getNodeParameter('exchangeType', 0) as string; + const routingKey = this.getNodeParameter('routingKey', 0) as string; + + const options = this.getNodeParameter('options', 0, {}) as IDataObject; + + channel = await rabbitmqConnectExchange.call(this, exchange, type, options); + + const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean; + + let message: string; + + const exchangePromises = []; + for (let i = 0; i < items.length; i++) { + if (sendInputData === true) { + message = JSON.stringify(items[i].json); + } else { + message = this.getNodeParameter('message', i) as string; + } + + exchangePromises.push(channel.publish(exchange, routingKey, Buffer.from(message))); + } + + // @ts-ignore + const promisesResponses = await Promise.allSettled(exchangePromises); + + promisesResponses.forEach((response: IDataObject) => { + if (response!.status !== 'fulfilled') { + + if (this.continueOnFail() !== true) { + throw new Error(response!.reason as string); + } else { + // Return the actual reason as error + returnItems.push( + { + json: { + error: response.reason, + }, + }, + ); + return; + } + } + + returnItems.push({ + json: { + success: response.value, + }, + }); + }); + + await channel.close(); + } else { + throw new Error(`The operation "${mode}" is not known!`); + } return this.prepareOutputData(returnItems); - } catch (error) { + } + catch (error) { if (channel) { await channel.close(); } diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts index 67deb8d882..446bd82352 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts @@ -13,7 +13,7 @@ import { } from './DefaultOptions'; import { - rabbitmqConnect, + rabbitmqConnectQueue, } from './GenericFunctions'; export class RabbitMQTrigger implements INodeType { @@ -103,7 +103,7 @@ export class RabbitMQTrigger implements INodeType { const queue = this.getNodeParameter('queue') as string; const options = this.getNodeParameter('options', {}) as IDataObject; - const channel = await rabbitmqConnect.call(this, queue, options); + const channel = await rabbitmqConnectQueue.call(this, queue, options); const self = this;