2023-01-27 03:22:44 -08:00
import type { ContainerOptions , EventContext , Message , ReceiverOptions } from 'rhea' ;
import { create_container } from 'rhea' ;
2019-11-02 04:16:20 -07:00
2023-01-27 03:22:44 -08:00
import type { ITriggerFunctions } from 'n8n-core' ;
import type { IDataObject , INodeType , INodeTypeDescription , ITriggerResponse } from 'n8n-workflow' ;
import { deepCopy , jsonParse , NodeOperationError } from 'n8n-workflow' ;
2019-11-01 00:42:14 -07:00
2019-11-01 05:50:33 -07:00
export class AmqpTrigger implements INodeType {
2019-11-01 00:42:14 -07:00
description : INodeTypeDescription = {
2019-11-01 05:50:33 -07:00
displayName : 'AMQP Trigger' ,
name : 'amqpTrigger' ,
2022-06-20 07:54:01 -07:00
// eslint-disable-next-line n8n-nodes-base/node-class-description-icon-not-svg
2019-11-01 00:42:14 -07:00
icon : 'file:amqp.png' ,
group : [ 'trigger' ] ,
version : 1 ,
description : 'Listens to AMQP 1.0 Messages' ,
defaults : {
2019-11-01 05:50:33 -07:00
name : 'AMQP Trigger' ,
2019-11-01 00:42:14 -07:00
} ,
inputs : [ ] ,
outputs : [ 'main' ] ,
2022-08-01 13:47:55 -07:00
credentials : [
{
name : 'amqp' ,
required : true ,
} ,
] ,
2019-11-01 00:42:14 -07:00
properties : [
// Node properties which the user gets displayed and
// can change on the node.
{
displayName : 'Queue / Topic' ,
name : 'sink' ,
type : 'string' ,
default : '' ,
placeholder : 'topic://sourcename.something' ,
2022-06-03 10:23:49 -07:00
description : 'Name of the queue of topic to listen to' ,
2019-11-01 00:42:14 -07:00
} ,
{
displayName : 'Clientname' ,
name : 'clientname' ,
type : 'string' ,
default : '' ,
placeholder : 'for durable/persistent topic subscriptions, example: "n8n"' ,
2022-05-06 14:01:25 -07:00
description : 'Leave empty for non-durable topic subscriptions or queues' ,
2019-11-01 00:42:14 -07:00
} ,
{
displayName : 'Subscription' ,
name : 'subscription' ,
type : 'string' ,
default : '' ,
placeholder : 'for durable/persistent topic subscriptions, example: "order-worker"' ,
description : 'Leave empty for non-durable topic subscriptions or queues' ,
} ,
2020-08-06 05:42:26 -07:00
{
displayName : 'Options' ,
name : 'options' ,
type : 'collection' ,
placeholder : 'Add Option' ,
default : { } ,
options : [
2021-01-20 04:20:48 -08:00
{
displayName : 'Container ID' ,
name : 'containerId' ,
type : 'string' ,
default : '' ,
description : 'Will be used to pass to the RHEA Backend as container_id' ,
} ,
2020-08-06 05:42:26 -07:00
{
2020-10-28 15:30:30 -07:00
displayName : 'Convert Body To String' ,
name : 'jsonConvertByteArrayToString' ,
2020-08-06 05:42:26 -07:00
type : 'boolean' ,
default : false ,
2022-08-01 13:47:55 -07:00
description :
'Whether to convert JSON Body content (["body"]["content"]) from Byte Array to string. Needed for Azure Service Bus.' ,
2020-08-06 05:42:26 -07:00
} ,
{
displayName : 'JSON Parse Body' ,
name : 'jsonParseBody' ,
type : 'boolean' ,
default : false ,
2022-06-20 07:54:01 -07:00
description : 'Whether to parse the body to an object' ,
2020-08-06 05:42:26 -07:00
} ,
2020-11-10 23:45:31 -08:00
{
displayName : 'Messages per Cicle' ,
name : 'pullMessagesNumber' ,
type : 'number' ,
default : 100 ,
description : 'Number of messages to pull from the bus for every cicle' ,
} ,
{
2021-01-20 04:20:48 -08:00
displayName : 'Only Body' ,
name : 'onlyBody' ,
type : 'boolean' ,
default : false ,
2022-06-20 07:54:01 -07:00
description : 'Whether to return only the body property' ,
2020-12-30 03:42:48 -08:00
} ,
{
displayName : 'Reconnect' ,
name : 'reconnect' ,
type : 'boolean' ,
default : true ,
2022-06-20 07:54:01 -07:00
description : 'Whether to automatically reconnect if disconnected' ,
2020-12-30 03:42:48 -08:00
} ,
{
2021-01-20 04:20:48 -08:00
displayName : 'Reconnect Limit' ,
2020-12-30 03:42:48 -08:00
name : 'reconnectLimit' ,
type : 'number' ,
default : 50 ,
2021-01-20 04:20:48 -08:00
description : 'Maximum number of reconnect attempts' ,
} ,
{
displayName : 'Sleep Time' ,
name : 'sleepTime' ,
type : 'number' ,
default : 10 ,
2022-05-06 14:01:25 -07:00
description : 'Milliseconds to sleep after every cicle' ,
2021-01-20 04:20:48 -08:00
} ,
2020-08-06 05:42:26 -07:00
] ,
} ,
2021-01-20 04:20:48 -08:00
] ,
2019-11-01 00:42:14 -07:00
} ;
async trigger ( this : ITriggerFunctions ) : Promise < ITriggerResponse > {
2021-08-20 09:57:30 -07:00
const credentials = await this . getCredentials ( 'amqp' ) ;
2019-11-01 00:42:14 -07:00
const sink = this . getNodeParameter ( 'sink' , '' ) as string ;
const clientname = this . getNodeParameter ( 'clientname' , '' ) as string ;
const subscription = this . getNodeParameter ( 'subscription' , '' ) as string ;
2020-08-06 05:42:26 -07:00
const options = this . getNodeParameter ( 'options' , { } ) as IDataObject ;
2022-08-01 13:47:55 -07:00
const pullMessagesNumber = ( options . pullMessagesNumber as number ) || 100 ;
2021-01-20 04:20:48 -08:00
const containerId = options . containerId as string ;
2022-08-01 13:47:55 -07:00
const containerReconnect = ( options . reconnect as boolean ) || true ;
const containerReconnectLimit = ( options . reconnectLimit as number ) || 50 ;
2019-11-01 00:42:14 -07:00
2019-11-02 04:16:20 -07:00
if ( sink === '' ) {
2021-04-16 09:33:36 -07:00
throw new NodeOperationError ( this . getNode ( ) , 'Queue or Topic required!' ) ;
2019-11-01 00:42:14 -07:00
}
2020-08-05 02:09:36 -07:00
2019-11-02 04:16:20 -07:00
let durable = false ;
2020-08-05 02:09:36 -07:00
if ( subscription && clientname ) {
2019-11-01 00:42:14 -07:00
durable = true ;
}
2021-01-20 04:20:48 -08:00
const container = create_container ( ) ;
2019-11-01 00:42:14 -07:00
2021-01-20 04:20:48 -08:00
let lastMsgId : string | number | Buffer | undefined = undefined ;
2019-11-01 00:42:14 -07:00
2021-01-20 04:20:48 -08:00
container . on ( 'receiver_open' , ( context : EventContext ) = > {
2020-12-30 04:57:53 -08:00
context . receiver ? . add_credit ( pullMessagesNumber ) ;
2020-11-10 23:44:59 -08:00
} ) ;
2021-01-20 04:20:48 -08:00
container . on ( 'message' , ( context : EventContext ) = > {
2020-12-30 04:29:29 -08:00
// No message in the context
2021-01-20 04:20:48 -08:00
if ( ! context . message ) {
return ;
}
2020-12-30 04:29:29 -08:00
2020-10-28 15:29:45 -07:00
// ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code
2019-11-02 04:16:20 -07:00
if ( context . message . message_id && context . message . message_id === lastMsgId ) {
2019-11-01 00:42:14 -07:00
return ;
}
2020-10-28 15:29:45 -07:00
lastMsgId = context . message . message_id ;
2020-08-05 02:09:36 -07:00
2020-08-06 05:42:26 -07:00
let data = context . message ;
2021-01-20 04:20:48 -08:00
if ( options . jsonConvertByteArrayToString === true && data . body . content !== undefined ) {
2020-12-30 04:29:29 -08:00
// The buffer is not ready... Stringify and parse back to load it.
2022-10-21 08:24:58 -07:00
const cont = deepCopy ( data . body . content ) ;
2023-02-27 19:39:43 -08:00
data . body = String . fromCharCode . apply ( null , cont . data as number [ ] ) ;
2020-12-30 04:29:29 -08:00
}
2020-11-10 23:45:31 -08:00
if ( options . jsonConvertByteArrayToString === true && data . body . content !== undefined ) {
2020-11-10 23:44:59 -08:00
// The buffer is not ready... Stringify and parse back to load it.
2022-10-21 08:24:58 -07:00
const cont = deepCopy ( data . body . content ) ;
2023-02-27 19:39:43 -08:00
data . body = String . fromCharCode . apply ( null , cont . data as number [ ] ) ;
2020-11-10 23:44:59 -08:00
}
2020-10-28 15:30:30 -07:00
if ( options . jsonConvertByteArrayToString === true && data . body . content !== undefined ) {
2020-10-28 15:29:45 -07:00
// The buffer is not ready... Stringify and parse back to load it.
2022-10-21 08:24:58 -07:00
const content = deepCopy ( data . body . content ) ;
2023-02-27 19:39:43 -08:00
data . body = String . fromCharCode . apply ( null , content . data as number [ ] ) ;
2020-10-28 15:29:45 -07:00
}
2020-08-06 05:42:26 -07:00
if ( options . jsonParseBody === true ) {
2023-02-27 19:39:43 -08:00
data . body = jsonParse ( data . body as string ) ;
2020-08-06 05:42:26 -07:00
}
if ( options . onlyBody === true ) {
data = data . body ;
2020-08-05 02:09:36 -07:00
}
2020-08-06 05:42:26 -07:00
2023-01-13 09:11:56 -08:00
this . emit ( [ this . helpers . returnJsonArray ( [ data as any ] ) ] ) ;
2020-11-10 23:44:59 -08:00
2020-12-30 04:29:29 -08:00
if ( ! context . receiver ? . has_credit ( ) ) {
2020-11-10 23:45:31 -08:00
setTimeout ( ( ) = > {
2020-12-30 04:29:29 -08:00
context . receiver ? . add_credit ( pullMessagesNumber ) ;
2022-08-01 13:47:55 -07:00
} , ( options . sleepTime as number ) || 10 ) ;
2020-11-10 23:45:31 -08:00
}
2019-11-01 00:42:14 -07:00
} ) ;
2019-11-02 04:16:20 -07:00
2020-12-30 04:29:29 -08:00
/ *
Values are documentet here : https : //github.com/amqp/rhea#container
* /
const connectOptions : ContainerOptions = {
host : credentials.hostname ,
hostname : credentials.hostname ,
port : credentials.port ,
2021-01-20 04:20:48 -08:00
reconnect : containerReconnect ,
2020-12-30 04:29:29 -08:00
reconnect_limit : containerReconnectLimit ,
username : credentials.username ? credentials.username : undefined ,
password : credentials.password ? credentials.password : undefined ,
transport : credentials.transportType ? credentials.transportType : undefined ,
2021-01-20 04:20:48 -08:00
container_id : containerId ? containerId : undefined ,
id : containerId ? containerId : undefined ,
2020-12-30 04:29:29 -08:00
} ;
2019-11-02 04:16:20 -07:00
const connection = container . connect ( connectOptions ) ;
2020-12-30 04:29:29 -08:00
2021-01-20 04:20:48 -08:00
const clientOptions : ReceiverOptions = {
2020-12-30 04:57:53 -08:00
name : subscription ? subscription : undefined ,
2020-12-30 04:29:29 -08:00
source : {
address : sink ,
2022-08-01 13:47:55 -07:00
durable : durable ? 2 : undefined ,
expiry_policy : durable ? 'never' : undefined ,
2020-12-30 04:29:29 -08:00
} ,
2022-08-01 13:47:55 -07:00
credit_window : 0 , // prefetch 1
2020-12-30 04:29:29 -08:00
} ;
2019-11-01 00:42:14 -07:00
connection . open_receiver ( clientOptions ) ;
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
2020-11-10 23:45:31 -08:00
container . removeAllListeners ( 'receiver_open' ) ;
container . removeAllListeners ( 'message' ) ;
2019-11-01 00:42:14 -07:00
connection . close ( ) ;
}
2021-01-20 04:20:48 -08:00
2019-11-01 00:42:14 -07:00
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the
// workflow manually.
2019-11-01 05:50:33 -07:00
// for AMQP it doesn't make much sense to wait here but
// for a new user who doesn't know how this works, it's better to wait and show a respective info message
2023-01-13 09:11:56 -08:00
const manualTriggerFunction = async ( ) = > {
2020-10-28 15:30:30 -07:00
await new Promise ( ( resolve , reject ) = > {
2019-11-02 04:16:20 -07:00
const timeoutHandler = setTimeout ( ( ) = > {
2022-08-01 13:47:55 -07:00
reject (
new Error (
'Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.' ,
) ,
) ;
2021-01-20 04:20:48 -08:00
} , 30000 ) ;
container . on ( 'message' , ( context : EventContext ) = > {
2020-08-05 02:09:36 -07:00
// Check if the only property present in the message is body
// in which case we only emit the content of the body property
// otherwise we emit all properties and their content
2020-12-30 04:57:53 -08:00
const message = context . message as Message ;
if ( Object . keys ( message ) [ 0 ] === 'body' && Object . keys ( message ) . length === 1 ) {
2023-01-13 09:11:56 -08:00
this . emit ( [ this . helpers . returnJsonArray ( [ message . body ] ) ] ) ;
2020-08-05 02:09:36 -07:00
} else {
2023-01-13 09:11:56 -08:00
this . emit ( [ this . helpers . returnJsonArray ( [ message as any ] ) ] ) ;
2020-08-05 02:09:36 -07:00
}
2019-11-01 05:50:33 -07:00
clearTimeout ( timeoutHandler ) ;
resolve ( true ) ;
} ) ;
} ) ;
2023-01-13 09:11:56 -08:00
} ;
2019-11-01 00:42:14 -07:00
return {
closeFunction ,
manualTriggerFunction ,
} ;
}
}