feat(RabbitMQ Node): Add mode for acknowledging and deleting from queue later in workflow (#6225)

* Add later in workflow mode

* Add new operation

* Acknowledge message in next node

* Add response and emit for responsePromiseHook

* Remove double success message, close channel correctly

* Answser messages correctly

* Remove option from delete operation

* move operation name to camelCase

* Fix versioning

* To remove: add action item in v1

* Add notice for delete from queue

* Correctly only execute only the delete operation

* Refactor delete from queue operator and add return last items

---------

Co-authored-by: Marcus <marcus@n8n.io>
This commit is contained in:
agobrech 2023-05-22 14:37:09 +02:00 committed by GitHub
parent 2d13b3f43f
commit f5950b201c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 110 additions and 17 deletions

View file

@ -23,7 +23,7 @@ export class RabbitMQ implements INodeType {
// eslint-disable-next-line n8n-nodes-base/node-class-description-icon-not-svg
icon: 'file:rabbitmq.png',
group: ['transform'],
version: 1,
version: [1, 1.1],
description: 'Sends messages to a RabbitMQ topic',
defaults: {
name: 'RabbitMQ',
@ -43,18 +43,71 @@ export class RabbitMQ implements INodeType {
name: 'operation',
type: 'hidden',
noDataExpression: true,
default: 'send_message',
default: 'sendMessage',
displayOptions: {
show: {
'@version': [1],
},
},
// To remove when action view is fixed
options: [
{
name: 'Send a Message to RabbitMQ',
value: 'send_message',
value: 'sendMessage',
action: 'Send a Message to RabbitMQ',
},
{
name: 'Delete From Queue',
value: 'deleteMessage',
action: 'Delete From Queue',
},
],
},
{
displayName: 'Operation',
name: 'operation',
type: 'options',
noDataExpression: true,
default: 'sendMessage',
displayOptions: {
show: {
'@version': [1.1],
},
},
options: [
{
name: 'Send a Message to RabbitMQ',
value: 'sendMessage',
action: 'Send a Message to RabbitMQ',
},
{
name: 'Delete From Queue',
value: 'deleteMessage',
action: 'Delete From Queue',
},
],
},
{
displayName:
'Will delete an item from the queue triggered earlier in the workflow by a RabbitMQ Trigger node',
name: 'deleteMessage',
type: 'notice',
default: '',
displayOptions: {
show: {
operation: ['deleteMessage'],
},
},
},
{
displayName: 'Mode',
name: 'mode',
type: 'options',
displayOptions: {
hide: {
operation: ['deleteMessage'],
},
},
options: [
{
name: 'Queue',
@ -82,6 +135,9 @@ export class RabbitMQ implements INodeType {
show: {
mode: ['queue'],
},
hide: {
operation: ['deleteMessage'],
},
},
default: '',
placeholder: 'queue-name',
@ -161,6 +217,11 @@ export class RabbitMQ implements INodeType {
displayName: 'Send Input Data',
name: 'sendInputData',
type: 'boolean',
displayOptions: {
show: {
operation: ['sendMessage'],
},
},
default: true,
description: 'Whether to send the the data the node receives as JSON',
},
@ -181,6 +242,11 @@ export class RabbitMQ implements INodeType {
name: 'options',
type: 'collection',
default: {},
displayOptions: {
show: {
operation: ['sendMessage'],
},
},
placeholder: 'Add Option',
options: [
{
@ -341,10 +407,13 @@ export class RabbitMQ implements INodeType {
let channel, options: IDataObject;
try {
const items = this.getInputData();
const mode = this.getNodeParameter('mode', 0) as string;
const operation = this.getNodeParameter('operation', 0);
if (operation === 'deleteMessage') {
this.sendResponse(items[0].json);
return await this.prepareOutputData(items);
}
const mode = (this.getNodeParameter('mode', 0) as string) || 'queue';
const returnItems: INodeExecutionData[] = [];
if (mode === 'queue') {
const queue = this.getNodeParameter('queue', 0) as string;
@ -355,7 +424,6 @@ export class RabbitMQ implements INodeType {
const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;
let message: string;
const queuePromises = [];
for (let i = 0; i < items.length; i++) {
if (sendInputData) {
@ -378,7 +446,6 @@ export class RabbitMQ implements INodeType {
);
headers = additionalHeaders;
}
queuePromises.push(channel.sendToQueue(queue, Buffer.from(message), { headers }));
}

View file

@ -2,6 +2,7 @@
import type {
IDataObject,
IDeferredPromise,
IExecuteResponsePromiseData,
INodeExecutionData,
INodeProperties,
INodeType,
@ -45,7 +46,6 @@ export class RabbitMQTrigger implements INodeType {
placeholder: 'queue-name',
description: 'The name of the queue to read from',
},
{
displayName: 'Options',
name: 'options',
@ -81,6 +81,11 @@ export class RabbitMQTrigger implements INodeType {
value: 'immediately',
description: 'As soon as the message got received',
},
{
name: 'Specified Later in Workflow',
value: 'laterMessageNode',
description: 'Using a RabbitMQ node to remove the item from the queue',
},
],
default: 'immediately',
description: 'When to acknowledge the message',
@ -139,6 +144,18 @@ export class RabbitMQTrigger implements INodeType {
return 0;
}) as INodeProperties[],
},
{
displayName:
"To delete an item from the queue, insert a RabbitMQ node later in the workflow and use the 'Delete from queue' operation",
name: 'laterMessageNode',
type: 'notice',
displayOptions: {
show: {
'/options.acknowledge': ['laterMessageNode'],
},
},
default: '',
},
],
};
@ -201,7 +218,6 @@ export class RabbitMQTrigger implements INodeType {
const item: INodeExecutionData = {
json: {},
};
if (options.contentIsBinary === true) {
item.binary = {
data: await this.helpers.prepareBinaryData(message.content),
@ -222,13 +238,20 @@ export class RabbitMQTrigger implements INodeType {
}
let responsePromise: IDeferredPromise<IRun> | undefined = undefined;
if (acknowledgeMode !== 'immediately') {
let responsePromiseHook: IDeferredPromise<IExecuteResponsePromiseData> | undefined =
undefined;
if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') {
responsePromise = await this.helpers.createDeferredPromise();
} else if (acknowledgeMode === 'laterMessageNode') {
responsePromiseHook =
await this.helpers.createDeferredPromise<IExecuteResponsePromiseData>();
}
if (responsePromiseHook) {
this.emit([[item]], responsePromiseHook, undefined);
} else {
this.emit([[item]], undefined, responsePromise);
if (responsePromise) {
}
if (responsePromise && acknowledgeMode !== 'laterMessageNode') {
// Acknowledge message after the execution finished
await responsePromise.promise().then(async (data: IRun) => {
if (data.data.resultData.error) {
@ -239,7 +262,11 @@ export class RabbitMQTrigger implements INodeType {
return;
}
}
channel.ack(message);
messageTracker.answered(message);
});
} else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') {
await responsePromiseHook.promise().then(() => {
channel.ack(message);
messageTracker.answered(message);
});
@ -266,7 +293,6 @@ export class RabbitMQTrigger implements INodeType {
});
consumerTag = consumerInfo.consumerTag;
};
await startConsumer();
// The "closeFunction" function gets called by n8n whenever