2022-07-04 02:12:08 -07:00
/* eslint-disable n8n-nodes-base/node-filename-against-convention */
2024-09-04 23:11:38 -07:00
import type { Message } from 'amqplib' ;
2023-01-27 03:22:44 -08:00
import type {
2023-03-22 06:04:15 -07:00
IDeferredPromise ,
2023-05-22 05:37:09 -07:00
IExecuteResponsePromiseData ,
2020-12-22 23:05:02 -08:00
INodeProperties ,
INodeType ,
INodeTypeDescription ,
2022-05-30 03:16:44 -07:00
IRun ,
2020-12-22 23:05:02 -08:00
ITriggerFunctions ,
ITriggerResponse ,
} from 'n8n-workflow' ;
2024-08-29 06:55:53 -07:00
import { NodeConnectionType , NodeOperationError } from 'n8n-workflow' ;
2020-12-22 23:05:02 -08:00
2022-08-17 08:50:24 -07:00
import { rabbitDefaultOptions } from './DefaultOptions' ;
2020-12-22 23:05:02 -08:00
2024-09-04 23:11:38 -07:00
import { MessageTracker , rabbitmqConnectQueue , parseMessage } from './GenericFunctions' ;
import type { TriggerOptions } from './types' ;
2020-12-22 23:05:02 -08:00
export class RabbitMQTrigger implements INodeType {
description : INodeTypeDescription = {
displayName : 'RabbitMQ Trigger' ,
name : 'rabbitmqTrigger' ,
2024-06-26 00:35:59 -07:00
icon : 'file:rabbitmq.svg' ,
2020-12-22 23:05:02 -08:00
group : [ 'trigger' ] ,
version : 1 ,
description : 'Listens to RabbitMQ messages' ,
2023-06-23 03:29:24 -07:00
eventTriggerDescription : '' ,
2020-12-22 23:05:02 -08:00
defaults : {
2021-11-30 15:18:57 -08:00
name : 'RabbitMQ Trigger' ,
2020-12-22 23:05:02 -08:00
} ,
2023-06-23 03:29:24 -07:00
triggerPanel : {
header : '' ,
executionsHelp : {
inactive :
"<b>While building your workflow</b>, click the 'listen' button, then trigger a Rabbit MQ event. This will trigger an execution, which will show up in this editor.<br /> <br /><b>Once you're happy with your workflow</b>, <a data-key='activate'>activate</a> it. Then every time a change is detected, the workflow will execute. These executions will show up in the <a data-key='executions'>executions list</a>, but not in the editor." ,
active :
"<b>While building your workflow</b>, click the 'listen' button, then trigger a Rabbit MQ event. This will trigger an execution, which will show up in this editor.<br /> <br /><b>Your workflow will also execute automatically</b>, since it's activated. Every time a change is detected, this node will trigger an execution. These executions will show up in the <a data-key='executions'>executions list</a>, but not in the editor." ,
} ,
activationHint :
"Once you’ ve finished building your workflow, <a data-key='activate'>activate</a> it to have it also listen continuously (you just won’ t see those executions here)." ,
} ,
2020-12-22 23:05:02 -08:00
inputs : [ ] ,
2024-08-29 06:55:53 -07:00
outputs : [ NodeConnectionType . Main ] ,
2020-12-22 23:05:02 -08:00
credentials : [
{
name : 'rabbitmq' ,
required : true ,
} ,
] ,
properties : [
{
displayName : 'Queue / Topic' ,
name : 'queue' ,
type : 'string' ,
default : '' ,
placeholder : 'queue-name' ,
2022-05-30 03:16:44 -07:00
description : 'The name of the queue to read from' ,
2020-12-22 23:05:02 -08:00
} ,
{
displayName : 'Options' ,
name : 'options' ,
type : 'collection' ,
default : { } ,
2024-07-29 05:27:23 -07:00
placeholder : 'Add option' ,
2020-12-22 23:05:02 -08:00
options : [
{
2022-06-03 10:23:49 -07:00
displayName : 'Content Is Binary' ,
2020-12-22 23:05:02 -08:00
name : 'contentIsBinary' ,
type : 'boolean' ,
default : false ,
2022-06-20 07:54:01 -07:00
description : 'Whether to save the content as binary' ,
2020-12-22 23:05:02 -08:00
} ,
2022-05-30 03:16:44 -07:00
{
2022-06-03 10:23:49 -07:00
displayName : 'Delete From Queue When' ,
2022-05-30 03:16:44 -07:00
name : 'acknowledge' ,
type : 'options' ,
options : [
{
2022-06-03 10:23:49 -07:00
name : 'Execution Finishes' ,
2022-05-30 03:16:44 -07:00
value : 'executionFinishes' ,
2022-08-17 08:50:24 -07:00
description :
'After the workflow execution finished. No matter if the execution was successful or not.' ,
2022-05-30 03:16:44 -07:00
} ,
{
2022-06-03 10:23:49 -07:00
name : 'Execution Finishes Successfully' ,
2022-05-30 03:16:44 -07:00
value : 'executionFinishesSuccessfully' ,
description : 'After the workflow execution finished successfully' ,
} ,
{
name : 'Immediately' ,
value : 'immediately' ,
description : 'As soon as the message got received' ,
} ,
2023-05-22 05:37:09 -07:00
{
name : 'Specified Later in Workflow' ,
value : 'laterMessageNode' ,
description : 'Using a RabbitMQ node to remove the item from the queue' ,
} ,
2022-05-30 03:16:44 -07:00
] ,
default : 'immediately' ,
description : 'When to acknowledge the message' ,
} ,
2020-12-22 23:05:02 -08:00
{
displayName : 'JSON Parse Body' ,
name : 'jsonParseBody' ,
type : 'boolean' ,
displayOptions : {
hide : {
2022-08-17 08:50:24 -07:00
contentIsBinary : [ true ] ,
2020-12-22 23:05:02 -08:00
} ,
} ,
default : false ,
2022-06-20 07:54:01 -07:00
description : 'Whether to parse the body to an object' ,
2020-12-22 23:05:02 -08:00
} ,
{
displayName : 'Only Content' ,
name : 'onlyContent' ,
type : 'boolean' ,
displayOptions : {
hide : {
2022-08-17 08:50:24 -07:00
contentIsBinary : [ true ] ,
2020-12-22 23:05:02 -08:00
} ,
} ,
default : false ,
2022-06-20 07:54:01 -07:00
description : 'Whether to return only the content property' ,
2020-12-22 23:05:02 -08:00
} ,
2023-08-01 06:32:33 -07:00
2022-05-30 03:16:44 -07:00
{
2022-06-03 10:23:49 -07:00
displayName : 'Parallel Message Processing Limit' ,
2022-05-30 03:16:44 -07:00
name : 'parallelMessages' ,
type : 'number' ,
default : - 1 ,
displayOptions : {
hide : {
2022-08-17 08:50:24 -07:00
acknowledge : [ 'immediately' ] ,
2022-05-30 03:16:44 -07:00
} ,
} ,
description : 'Max number of executions at a time. Use -1 for no limit.' ,
} ,
2023-11-15 03:02:54 -08:00
{
displayName : 'Binding' ,
name : 'binding' ,
placeholder : 'Add Binding' ,
description : 'Add binding to queu' ,
type : 'fixedCollection' ,
typeOptions : {
multipleValues : true ,
} ,
default : { } ,
options : [
{
name : 'bindings' ,
displayName : 'Binding' ,
values : [
{
displayName : 'Exchange' ,
name : 'exchange' ,
type : 'string' ,
default : '' ,
placeholder : 'exchange' ,
} ,
{
displayName : 'RoutingKey' ,
name : 'routingKey' ,
type : 'string' ,
default : '' ,
placeholder : 'routing-key' ,
} ,
] ,
} ,
] ,
} ,
2020-12-22 23:05:02 -08:00
. . . rabbitDefaultOptions ,
] . sort ( ( a , b ) = > {
2022-08-17 08:50:24 -07:00
if (
( a as INodeProperties ) . displayName . toLowerCase ( ) <
( b as INodeProperties ) . displayName . toLowerCase ( )
) {
return - 1 ;
}
if (
( a as INodeProperties ) . displayName . toLowerCase ( ) >
( b as INodeProperties ) . displayName . toLowerCase ( )
) {
return 1 ;
}
2020-12-22 23:05:02 -08:00
return 0 ;
} ) as INodeProperties [ ] ,
} ,
2023-05-22 05:37:09 -07:00
{
displayName :
"To delete an item from the queue, insert a RabbitMQ node later in the workflow and use the 'Delete from queue' operation" ,
name : 'laterMessageNode' ,
type : 'notice' ,
displayOptions : {
show : {
'/options.acknowledge' : [ 'laterMessageNode' ] ,
} ,
} ,
default : '' ,
} ,
2020-12-22 23:05:02 -08:00
] ,
} ;
async trigger ( this : ITriggerFunctions ) : Promise < ITriggerResponse > {
const queue = this . getNodeParameter ( 'queue' ) as string ;
2024-09-04 23:11:38 -07:00
const options = this . getNodeParameter ( 'options' , { } ) as TriggerOptions ;
2021-01-12 23:57:06 -08:00
const channel = await rabbitmqConnectQueue . call ( this , queue , options ) ;
2020-12-22 23:05:02 -08:00
2024-09-04 23:11:38 -07:00
if ( this . getMode ( ) === 'manual' ) {
const manualTriggerFunction = async ( ) = > {
// Do only catch a single message when executing manually, else messages will leak
await channel . prefetch ( 1 ) ;
const processMessage = async ( message : Message | null ) = > {
if ( message !== null ) {
const item = await parseMessage ( message , options , this . helpers ) ;
channel . ack ( message ) ;
this . emit ( [ [ item ] ] ) ;
} else {
this . emitError ( new Error ( 'Connection got closed unexpectedly' ) ) ;
}
} ;
const existingMessage = await channel . get ( queue ) ;
if ( existingMessage ) await processMessage ( existingMessage ) ;
else await channel . consume ( queue , processMessage ) ;
} ;
2022-05-30 03:16:44 -07:00
2024-09-04 23:11:38 -07:00
const closeFunction = async ( ) = > {
await channel . close ( ) ;
await channel . connection . close ( ) ;
return ;
} ;
return {
closeFunction ,
manualTriggerFunction ,
} ;
}
const parallelMessages = options . parallelMessages ? ? - 1 ;
if ( isNaN ( parallelMessages ) || parallelMessages === 0 || parallelMessages < - 1 ) {
2022-08-17 08:50:24 -07:00
throw new NodeOperationError (
this . getNode ( ) ,
2024-09-04 23:11:38 -07:00
'Parallel message processing limit must be a number greater than zero (or -1 for no limit)' ,
2022-08-17 08:50:24 -07:00
) ;
2022-05-30 03:16:44 -07:00
}
2024-09-04 23:11:38 -07:00
let acknowledgeMode = options . acknowledge ? ? 'immediately' ;
2022-05-30 03:16:44 -07:00
if ( parallelMessages !== - 1 && acknowledgeMode === 'immediately' ) {
// If parallel message limit is set, then the default mode is "executionFinishes"
// unless acknowledgeMode got set specifically. Be aware that the mode "immediately"
// can not be supported in this case.
acknowledgeMode = 'executionFinishes' ;
}
const messageTracker = new MessageTracker ( ) ;
2022-09-29 02:50:18 -07:00
let closeGotCalled = false ;
2022-05-30 03:16:44 -07:00
2024-09-04 23:11:38 -07:00
if ( parallelMessages !== - 1 ) {
await channel . prefetch ( parallelMessages ) ;
}
2022-05-30 03:16:44 -07:00
2024-09-04 23:11:38 -07:00
channel . on ( 'close' , ( ) = > {
if ( ! closeGotCalled ) {
this . emitError ( new Error ( 'Connection got closed unexpectedly' ) ) ;
}
} ) ;
2021-04-08 14:34:10 -07:00
2024-09-04 23:11:38 -07:00
const consumerInfo = await channel . consume ( queue , async ( message ) = > {
if ( message !== null ) {
try {
if ( acknowledgeMode !== 'immediately' ) {
messageTracker . received ( message ) ;
}
2022-05-30 03:16:44 -07:00
2024-09-04 23:11:38 -07:00
const item = await parseMessage ( message , options , this . helpers ) ;
2022-05-30 03:16:44 -07:00
2024-09-04 23:11:38 -07:00
let responsePromise : IDeferredPromise < IRun > | undefined = undefined ;
let responsePromiseHook : IDeferredPromise < IExecuteResponsePromiseData > | undefined =
undefined ;
if ( acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode' ) {
responsePromise = await this . helpers . createDeferredPromise ( ) ;
} else if ( acknowledgeMode === 'laterMessageNode' ) {
responsePromiseHook =
await this . helpers . createDeferredPromise < IExecuteResponsePromiseData > ( ) ;
}
if ( responsePromiseHook ) {
this . emit ( [ [ item ] ] , responsePromiseHook , undefined ) ;
} else {
this . emit ( [ [ item ] ] , undefined , responsePromise ) ;
}
if ( responsePromise && acknowledgeMode !== 'laterMessageNode' ) {
// Acknowledge message after the execution finished
await responsePromise . promise ( ) . then ( async ( data : IRun ) = > {
if ( data . data . resultData . error ) {
// The execution did fail
if ( acknowledgeMode === 'executionFinishesSuccessfully' ) {
channel . nack ( message ) ;
messageTracker . answered ( message ) ;
return ;
2022-05-30 03:16:44 -07:00
}
2024-09-04 23:11:38 -07:00
}
2022-05-30 03:16:44 -07:00
channel . ack ( message ) ;
messageTracker . answered ( message ) ;
2024-09-04 23:11:38 -07:00
} ) ;
} else if ( responsePromiseHook && acknowledgeMode === 'laterMessageNode' ) {
await responsePromiseHook . promise ( ) . then ( ( ) = > {
channel . ack ( message ) ;
messageTracker . answered ( message ) ;
} ) ;
} else {
// Acknowledge message directly
channel . ack ( message ) ;
}
} catch ( error ) {
const workflow = this . getWorkflow ( ) ;
const node = this . getNode ( ) ;
if ( acknowledgeMode !== 'immediately' ) {
messageTracker . answered ( message ) ;
2022-05-30 03:16:44 -07:00
}
2024-09-04 23:11:38 -07:00
this . logger . error (
` There was a problem with the RabbitMQ Trigger node " ${ node . name } " in workflow " ${ workflow . id } ": " ${ error . message } " ` ,
{
node : node.name ,
workflowId : workflow.id ,
} ,
) ;
2020-12-22 23:05:02 -08:00
}
2024-09-04 23:11:38 -07:00
}
} ) ;
const consumerTag = consumerInfo . consumerTag ;
2020-12-22 23:05:02 -08:00
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
2023-01-13 09:11:56 -08:00
const closeFunction = async ( ) = > {
2022-09-29 02:50:18 -07:00
closeGotCalled = true ;
2022-05-30 03:16:44 -07:00
try {
2022-12-02 12:54:28 -08:00
return await messageTracker . closeChannel ( channel , consumerTag ) ;
2022-08-17 08:50:24 -07:00
} catch ( error ) {
2023-01-13 09:11:56 -08:00
const workflow = this . getWorkflow ( ) ;
const node = this . getNode ( ) ;
2023-03-22 06:04:15 -07:00
this . logger . error (
2022-08-17 08:50:24 -07:00
` There was a problem closing the RabbitMQ Trigger node connection " ${ node . name } " in workflow " ${ workflow . id } ": " ${ error . message } " ` ,
2022-05-30 03:16:44 -07:00
{
node : node.name ,
workflowId : workflow.id ,
} ,
) ;
}
2023-01-13 09:11:56 -08:00
} ;
2020-12-22 23:05:02 -08:00
return {
closeFunction ,
} ;
}
}