From f5950b201c6ff412b9a304052f05eb2c3b8a7c51 Mon Sep 17 00:00:00 2001 From: agobrech <45268029+agobrech@users.noreply.github.com> Date: Mon, 22 May 2023 14:37:09 +0200 Subject: [PATCH] feat(RabbitMQ Node): Add mode for acknowledging and deleting from queue later in workflow (#6225) * Add later in workflow mode * Add new operation * Acknowledge message in next node * Add response and emit for responsePromiseHook * Remove double success message, close channel correctly * Answser messages correctly * Remove option from delete operation * move operation name to camelCase * Fix versioning * To remove: add action item in v1 * Add notice for delete from queue * Correctly only execute only the delete operation * Refactor delete from queue operator and add return last items --------- Co-authored-by: Marcus --- .../nodes/RabbitMQ/RabbitMQ.node.ts | 83 +++++++++++++++++-- .../nodes/RabbitMQ/RabbitMQTrigger.node.ts | 44 ++++++++-- 2 files changed, 110 insertions(+), 17 deletions(-) diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts index b2e47208ef..a0bdd74d05 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts @@ -23,7 +23,7 @@ export class RabbitMQ implements INodeType { // eslint-disable-next-line n8n-nodes-base/node-class-description-icon-not-svg icon: 'file:rabbitmq.png', group: ['transform'], - version: 1, + version: [1, 1.1], description: 'Sends messages to a RabbitMQ topic', defaults: { name: 'RabbitMQ', @@ -43,18 +43,71 @@ export class RabbitMQ implements INodeType { name: 'operation', type: 'hidden', noDataExpression: true, - default: 'send_message', + default: 'sendMessage', + displayOptions: { + show: { + '@version': [1], + }, + }, + // To remove when action view is fixed options: [ { name: 'Send a Message to RabbitMQ', - value: 'send_message', + value: 'sendMessage', + action: 'Send a Message to RabbitMQ', + }, + { + name: 'Delete From Queue', + value: 'deleteMessage', + action: 'Delete From Queue', }, ], }, + { + displayName: 'Operation', + name: 'operation', + type: 'options', + noDataExpression: true, + default: 'sendMessage', + displayOptions: { + show: { + '@version': [1.1], + }, + }, + options: [ + { + name: 'Send a Message to RabbitMQ', + value: 'sendMessage', + action: 'Send a Message to RabbitMQ', + }, + { + name: 'Delete From Queue', + value: 'deleteMessage', + action: 'Delete From Queue', + }, + ], + }, + { + displayName: + 'Will delete an item from the queue triggered earlier in the workflow by a RabbitMQ Trigger node', + name: 'deleteMessage', + type: 'notice', + default: '', + displayOptions: { + show: { + operation: ['deleteMessage'], + }, + }, + }, { displayName: 'Mode', name: 'mode', type: 'options', + displayOptions: { + hide: { + operation: ['deleteMessage'], + }, + }, options: [ { name: 'Queue', @@ -82,6 +135,9 @@ export class RabbitMQ implements INodeType { show: { mode: ['queue'], }, + hide: { + operation: ['deleteMessage'], + }, }, default: '', placeholder: 'queue-name', @@ -161,6 +217,11 @@ export class RabbitMQ implements INodeType { displayName: 'Send Input Data', name: 'sendInputData', type: 'boolean', + displayOptions: { + show: { + operation: ['sendMessage'], + }, + }, default: true, description: 'Whether to send the the data the node receives as JSON', }, @@ -181,6 +242,11 @@ export class RabbitMQ implements INodeType { name: 'options', type: 'collection', default: {}, + displayOptions: { + show: { + operation: ['sendMessage'], + }, + }, placeholder: 'Add Option', options: [ { @@ -341,10 +407,13 @@ export class RabbitMQ implements INodeType { let channel, options: IDataObject; try { const items = this.getInputData(); - const mode = this.getNodeParameter('mode', 0) as string; - + const operation = this.getNodeParameter('operation', 0); + if (operation === 'deleteMessage') { + this.sendResponse(items[0].json); + return await this.prepareOutputData(items); + } + const mode = (this.getNodeParameter('mode', 0) as string) || 'queue'; const returnItems: INodeExecutionData[] = []; - if (mode === 'queue') { const queue = this.getNodeParameter('queue', 0) as string; @@ -355,7 +424,6 @@ export class RabbitMQ implements INodeType { const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean; let message: string; - const queuePromises = []; for (let i = 0; i < items.length; i++) { if (sendInputData) { @@ -378,7 +446,6 @@ export class RabbitMQ implements INodeType { ); headers = additionalHeaders; } - queuePromises.push(channel.sendToQueue(queue, Buffer.from(message), { headers })); } diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts index 3c7541a1b4..7c20a0c65b 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts @@ -2,6 +2,7 @@ import type { IDataObject, IDeferredPromise, + IExecuteResponsePromiseData, INodeExecutionData, INodeProperties, INodeType, @@ -45,7 +46,6 @@ export class RabbitMQTrigger implements INodeType { placeholder: 'queue-name', description: 'The name of the queue to read from', }, - { displayName: 'Options', name: 'options', @@ -81,6 +81,11 @@ export class RabbitMQTrigger implements INodeType { value: 'immediately', description: 'As soon as the message got received', }, + { + name: 'Specified Later in Workflow', + value: 'laterMessageNode', + description: 'Using a RabbitMQ node to remove the item from the queue', + }, ], default: 'immediately', description: 'When to acknowledge the message', @@ -139,6 +144,18 @@ export class RabbitMQTrigger implements INodeType { return 0; }) as INodeProperties[], }, + { + displayName: + "To delete an item from the queue, insert a RabbitMQ node later in the workflow and use the 'Delete from queue' operation", + name: 'laterMessageNode', + type: 'notice', + displayOptions: { + show: { + '/options.acknowledge': ['laterMessageNode'], + }, + }, + default: '', + }, ], }; @@ -201,7 +218,6 @@ export class RabbitMQTrigger implements INodeType { const item: INodeExecutionData = { json: {}, }; - if (options.contentIsBinary === true) { item.binary = { data: await this.helpers.prepareBinaryData(message.content), @@ -222,13 +238,20 @@ export class RabbitMQTrigger implements INodeType { } let responsePromise: IDeferredPromise | undefined = undefined; - if (acknowledgeMode !== 'immediately') { + let responsePromiseHook: IDeferredPromise | undefined = + undefined; + if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') { responsePromise = await this.helpers.createDeferredPromise(); + } else if (acknowledgeMode === 'laterMessageNode') { + responsePromiseHook = + await this.helpers.createDeferredPromise(); } - - this.emit([[item]], undefined, responsePromise); - - if (responsePromise) { + if (responsePromiseHook) { + this.emit([[item]], responsePromiseHook, undefined); + } else { + this.emit([[item]], undefined, responsePromise); + } + if (responsePromise && acknowledgeMode !== 'laterMessageNode') { // Acknowledge message after the execution finished await responsePromise.promise().then(async (data: IRun) => { if (data.data.resultData.error) { @@ -239,7 +262,11 @@ export class RabbitMQTrigger implements INodeType { return; } } - + channel.ack(message); + messageTracker.answered(message); + }); + } else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') { + await responsePromiseHook.promise().then(() => { channel.ack(message); messageTracker.answered(message); }); @@ -266,7 +293,6 @@ export class RabbitMQTrigger implements INodeType { }); consumerTag = consumerInfo.consumerTag; }; - await startConsumer(); // The "closeFunction" function gets called by n8n whenever