/* eslint-disable n8n-nodes-base/node-filename-against-convention */ import type { Message } from 'amqplib'; import type { IDeferredPromise, IExecuteResponsePromiseData, INodeProperties, INodeType, INodeTypeDescription, IRun, ITriggerFunctions, ITriggerResponse, } from 'n8n-workflow'; import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; import { rabbitDefaultOptions } from './DefaultOptions'; import { MessageTracker, rabbitmqConnectQueue, parseMessage } from './GenericFunctions'; import type { TriggerOptions } from './types'; export class RabbitMQTrigger implements INodeType { description: INodeTypeDescription = { displayName: 'RabbitMQ Trigger', name: 'rabbitmqTrigger', icon: 'file:rabbitmq.svg', group: ['trigger'], version: 1, description: 'Listens to RabbitMQ messages', eventTriggerDescription: '', defaults: { name: 'RabbitMQ Trigger', }, triggerPanel: { header: '', executionsHelp: { inactive: "While building your workflow, click the 'listen' button, then trigger a Rabbit MQ event. This will trigger an execution, which will show up in this editor.

Once you're happy with your workflow, activate it. Then every time a change is detected, the workflow will execute. These executions will show up in the executions list, but not in the editor.", active: "While building your workflow, click the 'listen' button, then trigger a Rabbit MQ event. This will trigger an execution, which will show up in this editor.

Your workflow will also execute automatically, since it's activated. Every time a change is detected, this node will trigger an execution. These executions will show up in the executions list, but not in the editor.", }, activationHint: "Once you’ve finished building your workflow, activate it to have it also listen continuously (you just won’t see those executions here).", }, inputs: [], outputs: [NodeConnectionType.Main], credentials: [ { name: 'rabbitmq', required: true, }, ], properties: [ { displayName: 'Queue / Topic', name: 'queue', type: 'string', default: '', placeholder: 'queue-name', description: 'The name of the queue to read from', }, { displayName: 'Options', name: 'options', type: 'collection', default: {}, placeholder: 'Add option', options: [ { displayName: 'Content Is Binary', name: 'contentIsBinary', type: 'boolean', default: false, description: 'Whether to save the content as binary', }, { displayName: 'Delete From Queue When', name: 'acknowledge', type: 'options', options: [ { name: 'Execution Finishes', value: 'executionFinishes', description: 'After the workflow execution finished. No matter if the execution was successful or not.', }, { name: 'Execution Finishes Successfully', value: 'executionFinishesSuccessfully', description: 'After the workflow execution finished successfully', }, { name: 'Immediately', value: 'immediately', description: 'As soon as the message got received', }, { name: 'Specified Later in Workflow', value: 'laterMessageNode', description: 'Using a RabbitMQ node to remove the item from the queue', }, ], default: 'immediately', description: 'When to acknowledge the message', }, { displayName: 'JSON Parse Body', name: 'jsonParseBody', type: 'boolean', displayOptions: { hide: { contentIsBinary: [true], }, }, default: false, description: 'Whether to parse the body to an object', }, { displayName: 'Only Content', name: 'onlyContent', type: 'boolean', displayOptions: { hide: { contentIsBinary: [true], }, }, default: false, description: 'Whether to return only the content property', }, { displayName: 'Parallel Message Processing Limit', name: 'parallelMessages', type: 'number', default: -1, displayOptions: { hide: { acknowledge: ['immediately'], }, }, description: 'Max number of executions at a time. Use -1 for no limit.', }, { displayName: 'Binding', name: 'binding', placeholder: 'Add Binding', description: 'Add binding to queu', type: 'fixedCollection', typeOptions: { multipleValues: true, }, default: {}, options: [ { name: 'bindings', displayName: 'Binding', values: [ { displayName: 'Exchange', name: 'exchange', type: 'string', default: '', placeholder: 'exchange', }, { displayName: 'RoutingKey', name: 'routingKey', type: 'string', default: '', placeholder: 'routing-key', }, ], }, ], }, ...rabbitDefaultOptions, ].sort((a, b) => { if ( (a as INodeProperties).displayName.toLowerCase() < (b as INodeProperties).displayName.toLowerCase() ) { return -1; } if ( (a as INodeProperties).displayName.toLowerCase() > (b as INodeProperties).displayName.toLowerCase() ) { return 1; } return 0; }) as INodeProperties[], }, { displayName: "To delete an item from the queue, insert a RabbitMQ node later in the workflow and use the 'Delete from queue' operation", name: 'laterMessageNode', type: 'notice', displayOptions: { show: { '/options.acknowledge': ['laterMessageNode'], }, }, default: '', }, ], }; async trigger(this: ITriggerFunctions): Promise { const queue = this.getNodeParameter('queue') as string; const options = this.getNodeParameter('options', {}) as TriggerOptions; const channel = await rabbitmqConnectQueue.call(this, queue, options); if (this.getMode() === 'manual') { const manualTriggerFunction = async () => { // Do only catch a single message when executing manually, else messages will leak await channel.prefetch(1); const processMessage = async (message: Message | null) => { if (message !== null) { const item = await parseMessage(message, options, this.helpers); channel.ack(message); this.emit([[item]]); } else { this.emitError(new Error('Connection got closed unexpectedly')); } }; const existingMessage = await channel.get(queue); if (existingMessage) await processMessage(existingMessage); else await channel.consume(queue, processMessage); }; const closeFunction = async () => { await channel.close(); await channel.connection.close(); return; }; return { closeFunction, manualTriggerFunction, }; } const parallelMessages = options.parallelMessages ?? -1; if (isNaN(parallelMessages) || parallelMessages === 0 || parallelMessages < -1) { throw new NodeOperationError( this.getNode(), 'Parallel message processing limit must be a number greater than zero (or -1 for no limit)', ); } let acknowledgeMode = options.acknowledge ?? 'immediately'; if (parallelMessages !== -1 && acknowledgeMode === 'immediately') { // If parallel message limit is set, then the default mode is "executionFinishes" // unless acknowledgeMode got set specifically. Be aware that the mode "immediately" // can not be supported in this case. acknowledgeMode = 'executionFinishes'; } const messageTracker = new MessageTracker(); let closeGotCalled = false; if (parallelMessages !== -1) { await channel.prefetch(parallelMessages); } channel.on('close', () => { if (!closeGotCalled) { this.emitError(new Error('Connection got closed unexpectedly')); } }); const consumerInfo = await channel.consume(queue, async (message) => { if (message !== null) { try { if (acknowledgeMode !== 'immediately') { messageTracker.received(message); } const item = await parseMessage(message, options, this.helpers); let responsePromise: IDeferredPromise | undefined = undefined; let responsePromiseHook: IDeferredPromise | undefined = undefined; if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') { responsePromise = this.helpers.createDeferredPromise(); } else if (acknowledgeMode === 'laterMessageNode') { responsePromiseHook = this.helpers.createDeferredPromise(); } if (responsePromiseHook) { this.emit([[item]], responsePromiseHook, undefined); } else { this.emit([[item]], undefined, responsePromise); } if (responsePromise && acknowledgeMode !== 'laterMessageNode') { // Acknowledge message after the execution finished await responsePromise.promise.then(async (data: IRun) => { if (data.data.resultData.error) { // The execution did fail if (acknowledgeMode === 'executionFinishesSuccessfully') { channel.nack(message); messageTracker.answered(message); return; } } channel.ack(message); messageTracker.answered(message); }); } else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') { await responsePromiseHook.promise.then(() => { channel.ack(message); messageTracker.answered(message); }); } else { // Acknowledge message directly channel.ack(message); } } catch (error) { const workflow = this.getWorkflow(); const node = this.getNode(); if (acknowledgeMode !== 'immediately') { messageTracker.answered(message); } this.logger.error( `There was a problem with the RabbitMQ Trigger node "${node.name}" in workflow "${workflow.id}": "${error.message}"`, { node: node.name, workflowId: workflow.id, }, ); } } }); const consumerTag = consumerInfo.consumerTag; // The "closeFunction" function gets called by n8n whenever // the workflow gets deactivated and can so clean up. const closeFunction = async () => { closeGotCalled = true; try { return await messageTracker.closeChannel(channel, consumerTag); } catch (error) { const workflow = this.getWorkflow(); const node = this.getNode(); this.logger.error( `There was a problem closing the RabbitMQ Trigger node connection "${node.name}" in workflow "${workflow.id}": "${error.message}"`, { node: node.name, workflowId: workflow.id, }, ); } }; return { closeFunction, }; } }