2022-07-04 02:12:08 -07:00
/* eslint-disable n8n-nodes-base/node-filename-against-convention */
2023-01-27 03:22:44 -08:00
import type {
2020-12-22 23:05:02 -08:00
IDataObject ,
2023-03-22 06:04:15 -07:00
IDeferredPromise ,
2023-05-22 05:37:09 -07:00
IExecuteResponsePromiseData ,
2020-12-22 23:05:02 -08:00
INodeExecutionData ,
INodeProperties ,
INodeType ,
INodeTypeDescription ,
2022-05-30 03:16:44 -07:00
IRun ,
2020-12-22 23:05:02 -08:00
ITriggerFunctions ,
ITriggerResponse ,
} from 'n8n-workflow' ;
2023-03-22 06:04:15 -07:00
import { NodeOperationError } from 'n8n-workflow' ;
2020-12-22 23:05:02 -08:00
2022-08-17 08:50:24 -07:00
import { rabbitDefaultOptions } from './DefaultOptions' ;
2020-12-22 23:05:02 -08:00
2022-08-17 08:50:24 -07:00
import { MessageTracker , rabbitmqConnectQueue } from './GenericFunctions' ;
2020-12-22 23:05:02 -08:00
export class RabbitMQTrigger implements INodeType {
description : INodeTypeDescription = {
displayName : 'RabbitMQ Trigger' ,
name : 'rabbitmqTrigger' ,
2022-06-20 07:54:01 -07:00
// eslint-disable-next-line n8n-nodes-base/node-class-description-icon-not-svg
2020-12-22 23:05:02 -08:00
icon : 'file:rabbitmq.png' ,
group : [ 'trigger' ] ,
version : 1 ,
description : 'Listens to RabbitMQ messages' ,
2023-06-23 03:29:24 -07:00
eventTriggerDescription : '' ,
2020-12-22 23:05:02 -08:00
defaults : {
2021-11-30 15:18:57 -08:00
name : 'RabbitMQ Trigger' ,
2020-12-22 23:05:02 -08:00
} ,
2023-06-23 03:29:24 -07:00
triggerPanel : {
header : '' ,
executionsHelp : {
inactive :
"<b>While building your workflow</b>, click the 'listen' button, then trigger a Rabbit MQ event. This will trigger an execution, which will show up in this editor.<br /> <br /><b>Once you're happy with your workflow</b>, <a data-key='activate'>activate</a> it. Then every time a change is detected, the workflow will execute. These executions will show up in the <a data-key='executions'>executions list</a>, but not in the editor." ,
active :
"<b>While building your workflow</b>, click the 'listen' button, then trigger a Rabbit MQ event. This will trigger an execution, which will show up in this editor.<br /> <br /><b>Your workflow will also execute automatically</b>, since it's activated. Every time a change is detected, this node will trigger an execution. These executions will show up in the <a data-key='executions'>executions list</a>, but not in the editor." ,
} ,
activationHint :
"Once you’ ve finished building your workflow, <a data-key='activate'>activate</a> it to have it also listen continuously (you just won’ t see those executions here)." ,
} ,
2020-12-22 23:05:02 -08:00
inputs : [ ] ,
outputs : [ 'main' ] ,
credentials : [
{
name : 'rabbitmq' ,
required : true ,
} ,
] ,
properties : [
{
displayName : 'Queue / Topic' ,
name : 'queue' ,
type : 'string' ,
default : '' ,
placeholder : 'queue-name' ,
2022-05-30 03:16:44 -07:00
description : 'The name of the queue to read from' ,
2020-12-22 23:05:02 -08:00
} ,
{
displayName : 'Options' ,
name : 'options' ,
type : 'collection' ,
default : { } ,
placeholder : 'Add Option' ,
options : [
{
2022-06-03 10:23:49 -07:00
displayName : 'Content Is Binary' ,
2020-12-22 23:05:02 -08:00
name : 'contentIsBinary' ,
type : 'boolean' ,
default : false ,
2022-06-20 07:54:01 -07:00
description : 'Whether to save the content as binary' ,
2020-12-22 23:05:02 -08:00
} ,
2022-05-30 03:16:44 -07:00
{
2022-06-03 10:23:49 -07:00
displayName : 'Delete From Queue When' ,
2022-05-30 03:16:44 -07:00
name : 'acknowledge' ,
type : 'options' ,
options : [
{
2022-06-03 10:23:49 -07:00
name : 'Execution Finishes' ,
2022-05-30 03:16:44 -07:00
value : 'executionFinishes' ,
2022-08-17 08:50:24 -07:00
description :
'After the workflow execution finished. No matter if the execution was successful or not.' ,
2022-05-30 03:16:44 -07:00
} ,
{
2022-06-03 10:23:49 -07:00
name : 'Execution Finishes Successfully' ,
2022-05-30 03:16:44 -07:00
value : 'executionFinishesSuccessfully' ,
description : 'After the workflow execution finished successfully' ,
} ,
{
name : 'Immediately' ,
value : 'immediately' ,
description : 'As soon as the message got received' ,
} ,
2023-05-22 05:37:09 -07:00
{
name : 'Specified Later in Workflow' ,
value : 'laterMessageNode' ,
description : 'Using a RabbitMQ node to remove the item from the queue' ,
} ,
2022-05-30 03:16:44 -07:00
] ,
default : 'immediately' ,
description : 'When to acknowledge the message' ,
} ,
2020-12-22 23:05:02 -08:00
{
displayName : 'JSON Parse Body' ,
name : 'jsonParseBody' ,
type : 'boolean' ,
displayOptions : {
hide : {
2022-08-17 08:50:24 -07:00
contentIsBinary : [ true ] ,
2020-12-22 23:05:02 -08:00
} ,
} ,
default : false ,
2022-06-20 07:54:01 -07:00
description : 'Whether to parse the body to an object' ,
2020-12-22 23:05:02 -08:00
} ,
{
displayName : 'Only Content' ,
name : 'onlyContent' ,
type : 'boolean' ,
displayOptions : {
hide : {
2022-08-17 08:50:24 -07:00
contentIsBinary : [ true ] ,
2020-12-22 23:05:02 -08:00
} ,
} ,
default : false ,
2022-06-20 07:54:01 -07:00
description : 'Whether to return only the content property' ,
2020-12-22 23:05:02 -08:00
} ,
2023-08-01 06:32:33 -07:00
2022-05-30 03:16:44 -07:00
{
2022-06-03 10:23:49 -07:00
displayName : 'Parallel Message Processing Limit' ,
2022-05-30 03:16:44 -07:00
name : 'parallelMessages' ,
type : 'number' ,
default : - 1 ,
displayOptions : {
hide : {
2022-08-17 08:50:24 -07:00
acknowledge : [ 'immediately' ] ,
2022-05-30 03:16:44 -07:00
} ,
} ,
description : 'Max number of executions at a time. Use -1 for no limit.' ,
} ,
2023-11-15 03:02:54 -08:00
{
displayName : 'Binding' ,
name : 'binding' ,
placeholder : 'Add Binding' ,
description : 'Add binding to queu' ,
type : 'fixedCollection' ,
typeOptions : {
multipleValues : true ,
} ,
default : { } ,
options : [
{
name : 'bindings' ,
displayName : 'Binding' ,
values : [
{
displayName : 'Exchange' ,
name : 'exchange' ,
type : 'string' ,
default : '' ,
placeholder : 'exchange' ,
} ,
{
displayName : 'RoutingKey' ,
name : 'routingKey' ,
type : 'string' ,
default : '' ,
placeholder : 'routing-key' ,
} ,
] ,
} ,
] ,
} ,
2020-12-22 23:05:02 -08:00
. . . rabbitDefaultOptions ,
] . sort ( ( a , b ) = > {
2022-08-17 08:50:24 -07:00
if (
( a as INodeProperties ) . displayName . toLowerCase ( ) <
( b as INodeProperties ) . displayName . toLowerCase ( )
) {
return - 1 ;
}
if (
( a as INodeProperties ) . displayName . toLowerCase ( ) >
( b as INodeProperties ) . displayName . toLowerCase ( )
) {
return 1 ;
}
2020-12-22 23:05:02 -08:00
return 0 ;
} ) as INodeProperties [ ] ,
} ,
2023-05-22 05:37:09 -07:00
{
displayName :
"To delete an item from the queue, insert a RabbitMQ node later in the workflow and use the 'Delete from queue' operation" ,
name : 'laterMessageNode' ,
type : 'notice' ,
displayOptions : {
show : {
'/options.acknowledge' : [ 'laterMessageNode' ] ,
} ,
} ,
default : '' ,
} ,
2020-12-22 23:05:02 -08:00
] ,
} ;
async trigger ( this : ITriggerFunctions ) : Promise < ITriggerResponse > {
const queue = this . getNodeParameter ( 'queue' ) as string ;
const options = this . getNodeParameter ( 'options' , { } ) as IDataObject ;
2021-01-12 23:57:06 -08:00
const channel = await rabbitmqConnectQueue . call ( this , queue , options ) ;
2020-12-22 23:05:02 -08:00
2022-08-17 08:50:24 -07:00
let parallelMessages =
options . parallelMessages !== undefined && options . parallelMessages !== - 1
? parseInt ( options . parallelMessages as string , 10 )
: - 1 ;
2022-05-30 03:16:44 -07:00
if ( parallelMessages === 0 || parallelMessages < - 1 ) {
2022-08-17 08:50:24 -07:00
throw new NodeOperationError (
this . getNode ( ) ,
'Parallel message processing limit must be greater than zero (or -1 for no limit)' ,
) ;
2022-05-30 03:16:44 -07:00
}
if ( this . getMode ( ) === 'manual' ) {
// Do only catch a single message when executing manually, else messages will leak
parallelMessages = 1 ;
}
let acknowledgeMode = options . acknowledge ? options . acknowledge : 'immediately' ;
if ( parallelMessages !== - 1 && acknowledgeMode === 'immediately' ) {
// If parallel message limit is set, then the default mode is "executionFinishes"
// unless acknowledgeMode got set specifically. Be aware that the mode "immediately"
// can not be supported in this case.
acknowledgeMode = 'executionFinishes' ;
}
const messageTracker = new MessageTracker ( ) ;
let consumerTag : string ;
2022-09-29 02:50:18 -07:00
let closeGotCalled = false ;
2022-05-30 03:16:44 -07:00
2020-12-22 23:05:02 -08:00
const startConsumer = async ( ) = > {
2022-05-30 03:16:44 -07:00
if ( parallelMessages !== - 1 ) {
2022-12-02 12:54:28 -08:00
await channel . prefetch ( parallelMessages ) ;
2022-05-30 03:16:44 -07:00
}
2022-09-29 02:50:18 -07:00
channel . on ( 'close' , ( ) = > {
if ( ! closeGotCalled ) {
2023-01-13 09:11:56 -08:00
this . emitError ( new Error ( 'Connection got closed unexpectedly' ) ) ;
2022-09-29 02:50:18 -07:00
}
} ) ;
2022-05-30 03:16:44 -07:00
const consumerInfo = await channel . consume ( queue , async ( message ) = > {
2020-12-22 23:05:02 -08:00
if ( message !== null ) {
2022-05-30 03:16:44 -07:00
try {
if ( acknowledgeMode !== 'immediately' ) {
messageTracker . received ( message ) ;
}
2022-12-02 12:54:28 -08:00
let content : IDataObject | string = message . content . toString ( ) ;
2021-04-08 14:34:10 -07:00
2022-05-30 03:16:44 -07:00
const item : INodeExecutionData = {
json : { } ,
2020-12-22 23:05:02 -08:00
} ;
2022-05-30 03:16:44 -07:00
if ( options . contentIsBinary === true ) {
item . binary = {
data : await this . helpers . prepareBinaryData ( message . content ) ,
} ;
item . json = message as unknown as IDataObject ;
message . content = undefined as unknown as Buffer ;
} else {
if ( options . jsonParseBody === true ) {
2022-12-02 12:54:28 -08:00
content = JSON . parse ( content ) ;
2022-05-30 03:16:44 -07:00
}
if ( options . onlyContent === true ) {
item . json = content as IDataObject ;
} else {
message . content = content as unknown as Buffer ;
item . json = message as unknown as IDataObject ;
}
}
2023-03-22 06:04:15 -07:00
let responsePromise : IDeferredPromise < IRun > | undefined = undefined ;
2023-05-22 05:37:09 -07:00
let responsePromiseHook : IDeferredPromise < IExecuteResponsePromiseData > | undefined =
undefined ;
if ( acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode' ) {
2023-03-22 06:04:15 -07:00
responsePromise = await this . helpers . createDeferredPromise ( ) ;
2023-05-22 05:37:09 -07:00
} else if ( acknowledgeMode === 'laterMessageNode' ) {
responsePromiseHook =
await this . helpers . createDeferredPromise < IExecuteResponsePromiseData > ( ) ;
2020-12-22 23:05:02 -08:00
}
2023-05-22 05:37:09 -07:00
if ( responsePromiseHook ) {
this . emit ( [ [ item ] ] , responsePromiseHook , undefined ) ;
} else {
this . emit ( [ [ item ] ] , undefined , responsePromise ) ;
}
if ( responsePromise && acknowledgeMode !== 'laterMessageNode' ) {
2022-05-30 03:16:44 -07:00
// Acknowledge message after the execution finished
2022-08-17 08:50:24 -07:00
await responsePromise . promise ( ) . then ( async ( data : IRun ) = > {
2022-05-30 03:16:44 -07:00
if ( data . data . resultData . error ) {
// The execution did fail
if ( acknowledgeMode === 'executionFinishesSuccessfully' ) {
channel . nack ( message ) ;
messageTracker . answered ( message ) ;
return ;
}
}
2023-05-22 05:37:09 -07:00
channel . ack ( message ) ;
messageTracker . answered ( message ) ;
} ) ;
} else if ( responsePromiseHook && acknowledgeMode === 'laterMessageNode' ) {
await responsePromiseHook . promise ( ) . then ( ( ) = > {
2022-05-30 03:16:44 -07:00
channel . ack ( message ) ;
messageTracker . answered ( message ) ;
} ) ;
2020-12-22 23:05:02 -08:00
} else {
2022-05-30 03:16:44 -07:00
// Acknowledge message directly
channel . ack ( message ) ;
2020-12-22 23:05:02 -08:00
}
2022-05-30 03:16:44 -07:00
} catch ( error ) {
const workflow = this . getWorkflow ( ) ;
const node = this . getNode ( ) ;
if ( acknowledgeMode !== 'immediately' ) {
messageTracker . answered ( message ) ;
}
2023-03-22 06:04:15 -07:00
this . logger . error (
2022-08-17 08:50:24 -07:00
` There was a problem with the RabbitMQ Trigger node " ${ node . name } " in workflow " ${ workflow . id } ": " ${ error . message } " ` ,
2022-05-30 03:16:44 -07:00
{
node : node.name ,
workflowId : workflow.id ,
} ,
) ;
}
2020-12-22 23:05:02 -08:00
}
} ) ;
2022-05-30 03:16:44 -07:00
consumerTag = consumerInfo . consumerTag ;
2020-12-22 23:05:02 -08:00
} ;
2022-12-02 12:54:28 -08:00
await startConsumer ( ) ;
2020-12-22 23:05:02 -08:00
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
2023-01-13 09:11:56 -08:00
const closeFunction = async ( ) = > {
2022-09-29 02:50:18 -07:00
closeGotCalled = true ;
2022-05-30 03:16:44 -07:00
try {
2022-12-02 12:54:28 -08:00
return await messageTracker . closeChannel ( channel , consumerTag ) ;
2022-08-17 08:50:24 -07:00
} catch ( error ) {
2023-01-13 09:11:56 -08:00
const workflow = this . getWorkflow ( ) ;
const node = this . getNode ( ) ;
2023-03-22 06:04:15 -07:00
this . logger . error (
2022-08-17 08:50:24 -07:00
` There was a problem closing the RabbitMQ Trigger node connection " ${ node . name } " in workflow " ${ workflow . id } ": " ${ error . message } " ` ,
2022-05-30 03:16:44 -07:00
{
node : node.name ,
workflowId : workflow.id ,
} ,
) ;
}
2023-01-13 09:11:56 -08:00
} ;
2020-12-22 23:05:02 -08:00
return {
closeFunction ,
} ;
}
}