2021-01-20 04:20:48 -08:00
import {
ContainerOptions ,
2021-01-21 01:47:45 -08:00
create_container ,
2021-01-20 04:20:48 -08:00
EventContext ,
Message ,
ReceiverOptions ,
} from 'rhea' ;
2019-11-02 04:16:20 -07:00
2019-11-01 00:42:14 -07:00
import { ITriggerFunctions } from 'n8n-core' ;
import {
2020-08-06 05:42:26 -07:00
IDataObject ,
2019-11-01 00:42:14 -07:00
INodeType ,
INodeTypeDescription ,
ITriggerResponse ,
2021-04-16 09:33:36 -07:00
NodeOperationError ,
2019-11-01 00:42:14 -07:00
} from 'n8n-workflow' ;
2019-11-01 05:50:33 -07:00
export class AmqpTrigger implements INodeType {
2019-11-01 00:42:14 -07:00
description : INodeTypeDescription = {
2019-11-01 05:50:33 -07:00
displayName : 'AMQP Trigger' ,
name : 'amqpTrigger' ,
2019-11-01 00:42:14 -07:00
icon : 'file:amqp.png' ,
group : [ 'trigger' ] ,
version : 1 ,
description : 'Listens to AMQP 1.0 Messages' ,
defaults : {
2019-11-01 05:50:33 -07:00
name : 'AMQP Trigger' ,
2019-11-01 00:42:14 -07:00
} ,
inputs : [ ] ,
outputs : [ 'main' ] ,
credentials : [ {
name : 'amqp' ,
2019-11-01 05:50:33 -07:00
required : true ,
2019-11-01 00:42:14 -07:00
} ] ,
properties : [
// Node properties which the user gets displayed and
// can change on the node.
{
displayName : 'Queue / Topic' ,
name : 'sink' ,
type : 'string' ,
default : '' ,
placeholder : 'topic://sourcename.something' ,
description : 'name of the queue of topic to listen to' ,
} ,
{
displayName : 'Clientname' ,
name : 'clientname' ,
type : 'string' ,
default : '' ,
placeholder : 'for durable/persistent topic subscriptions, example: "n8n"' ,
description : 'Leave empty for non-durable topic subscriptions or queues. ' ,
} ,
{
displayName : 'Subscription' ,
name : 'subscription' ,
type : 'string' ,
default : '' ,
placeholder : 'for durable/persistent topic subscriptions, example: "order-worker"' ,
description : 'Leave empty for non-durable topic subscriptions or queues' ,
} ,
2020-08-06 05:42:26 -07:00
{
displayName : 'Options' ,
name : 'options' ,
type : 'collection' ,
placeholder : 'Add Option' ,
default : { } ,
options : [
2021-01-20 04:20:48 -08:00
{
displayName : 'Container ID' ,
name : 'containerId' ,
type : 'string' ,
default : '' ,
description : 'Will be used to pass to the RHEA Backend as container_id' ,
} ,
2020-08-06 05:42:26 -07:00
{
2020-10-28 15:30:30 -07:00
displayName : 'Convert Body To String' ,
name : 'jsonConvertByteArrayToString' ,
2020-08-06 05:42:26 -07:00
type : 'boolean' ,
default : false ,
2020-10-28 15:30:30 -07:00
description : 'Convert JSON Body content (["body"]["content"]) from Byte Array to string. Needed for Azure Service Bus.' ,
2020-08-06 05:42:26 -07:00
} ,
{
displayName : 'JSON Parse Body' ,
name : 'jsonParseBody' ,
type : 'boolean' ,
default : false ,
description : 'Parse the body to an object.' ,
} ,
2020-11-10 23:45:31 -08:00
{
displayName : 'Messages per Cicle' ,
name : 'pullMessagesNumber' ,
type : 'number' ,
default : 100 ,
description : 'Number of messages to pull from the bus for every cicle' ,
} ,
{
2021-01-20 04:20:48 -08:00
displayName : 'Only Body' ,
name : 'onlyBody' ,
type : 'boolean' ,
default : false ,
description : 'Returns only the body property.' ,
2020-12-30 03:42:48 -08:00
} ,
{
displayName : 'Reconnect' ,
name : 'reconnect' ,
type : 'boolean' ,
default : true ,
2021-01-20 04:20:48 -08:00
description : 'Automatically reconnect if disconnected' ,
2020-12-30 03:42:48 -08:00
} ,
{
2021-01-20 04:20:48 -08:00
displayName : 'Reconnect Limit' ,
2020-12-30 03:42:48 -08:00
name : 'reconnectLimit' ,
type : 'number' ,
default : 50 ,
2021-01-20 04:20:48 -08:00
description : 'Maximum number of reconnect attempts' ,
} ,
{
displayName : 'Sleep Time' ,
name : 'sleepTime' ,
type : 'number' ,
default : 10 ,
description : 'Milliseconds to sleep after every cicle.' ,
} ,
2020-08-06 05:42:26 -07:00
] ,
} ,
2021-01-20 04:20:48 -08:00
] ,
2019-11-01 00:42:14 -07:00
} ;
async trigger ( this : ITriggerFunctions ) : Promise < ITriggerResponse > {
2021-08-20 09:57:30 -07:00
const credentials = await this . getCredentials ( 'amqp' ) ;
2019-11-01 05:50:33 -07:00
if ( ! credentials ) {
2021-04-16 09:33:36 -07:00
throw new NodeOperationError ( this . getNode ( ) , 'Credentials are mandatory!' ) ;
2019-11-01 05:50:33 -07:00
}
2019-11-01 00:42:14 -07:00
const sink = this . getNodeParameter ( 'sink' , '' ) as string ;
const clientname = this . getNodeParameter ( 'clientname' , '' ) as string ;
const subscription = this . getNodeParameter ( 'subscription' , '' ) as string ;
2020-08-06 05:42:26 -07:00
const options = this . getNodeParameter ( 'options' , { } ) as IDataObject ;
2020-12-30 04:29:29 -08:00
const pullMessagesNumber = options . pullMessagesNumber as number || 100 ;
2021-01-20 04:20:48 -08:00
const containerId = options . containerId as string ;
const containerReconnect = options . reconnect as boolean || true ;
2020-12-30 04:29:29 -08:00
const containerReconnectLimit = options . reconnectLimit as number || 50 ;
2019-11-01 00:42:14 -07:00
2019-11-02 04:16:20 -07:00
if ( sink === '' ) {
2021-04-16 09:33:36 -07:00
throw new NodeOperationError ( this . getNode ( ) , 'Queue or Topic required!' ) ;
2019-11-01 00:42:14 -07:00
}
2020-08-05 02:09:36 -07:00
2019-11-02 04:16:20 -07:00
let durable = false ;
2020-08-05 02:09:36 -07:00
if ( subscription && clientname ) {
2019-11-01 00:42:14 -07:00
durable = true ;
}
2021-01-20 04:20:48 -08:00
const container = create_container ( ) ;
2019-11-01 00:42:14 -07:00
2021-01-20 04:20:48 -08:00
let lastMsgId : string | number | Buffer | undefined = undefined ;
2019-11-02 04:16:20 -07:00
const self = this ;
2019-11-01 00:42:14 -07:00
2021-01-20 04:20:48 -08:00
container . on ( 'receiver_open' , ( context : EventContext ) = > {
2020-12-30 04:57:53 -08:00
context . receiver ? . add_credit ( pullMessagesNumber ) ;
2020-11-10 23:44:59 -08:00
} ) ;
2021-01-20 04:20:48 -08:00
container . on ( 'message' , ( context : EventContext ) = > {
2020-12-30 04:29:29 -08:00
// No message in the context
2021-01-20 04:20:48 -08:00
if ( ! context . message ) {
return ;
}
2020-12-30 04:29:29 -08:00
2020-10-28 15:29:45 -07:00
// ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code
2019-11-02 04:16:20 -07:00
if ( context . message . message_id && context . message . message_id === lastMsgId ) {
2019-11-01 00:42:14 -07:00
return ;
}
2020-10-28 15:29:45 -07:00
lastMsgId = context . message . message_id ;
2020-08-05 02:09:36 -07:00
2020-08-06 05:42:26 -07:00
let data = context . message ;
2021-01-20 04:20:48 -08:00
if ( options . jsonConvertByteArrayToString === true && data . body . content !== undefined ) {
2020-12-30 04:29:29 -08:00
// The buffer is not ready... Stringify and parse back to load it.
2021-01-20 04:20:48 -08:00
const cont = JSON . stringify ( data . body . content ) ;
data . body = String . fromCharCode . apply ( null , JSON . parse ( cont ) . data ) ;
2020-12-30 04:29:29 -08:00
}
2020-11-10 23:45:31 -08:00
if ( options . jsonConvertByteArrayToString === true && data . body . content !== undefined ) {
2020-11-10 23:44:59 -08:00
// The buffer is not ready... Stringify and parse back to load it.
2020-11-10 23:45:31 -08:00
const cont = JSON . stringify ( data . body . content ) ;
data . body = String . fromCharCode . apply ( null , JSON . parse ( cont ) . data ) ;
2020-11-10 23:44:59 -08:00
}
2020-10-28 15:30:30 -07:00
if ( options . jsonConvertByteArrayToString === true && data . body . content !== undefined ) {
2020-10-28 15:29:45 -07:00
// The buffer is not ready... Stringify and parse back to load it.
2020-10-28 15:30:30 -07:00
const content = JSON . stringify ( data . body . content ) ;
data . body = String . fromCharCode . apply ( null , JSON . parse ( content ) . data ) ;
2020-10-28 15:29:45 -07:00
}
2020-08-06 05:42:26 -07:00
if ( options . jsonParseBody === true ) {
data . body = JSON . parse ( data . body ) ;
}
if ( options . onlyBody === true ) {
data = data . body ;
2020-08-05 02:09:36 -07:00
}
2020-08-06 05:42:26 -07:00
2020-10-28 15:29:45 -07:00
2021-01-20 04:20:48 -08:00
self . emit ( [ self . helpers . returnJsonArray ( [ data as any ] ) ] ) ; // tslint:disable-line:no-any
2020-11-10 23:44:59 -08:00
2020-12-30 04:29:29 -08:00
if ( ! context . receiver ? . has_credit ( ) ) {
2020-11-10 23:45:31 -08:00
setTimeout ( ( ) = > {
2020-12-30 04:29:29 -08:00
context . receiver ? . add_credit ( pullMessagesNumber ) ;
2020-11-10 23:45:31 -08:00
} , options . sleepTime as number || 10 ) ;
}
2019-11-01 00:42:14 -07:00
} ) ;
2019-11-02 04:16:20 -07:00
2020-12-30 04:29:29 -08:00
/ *
Values are documentet here : https : //github.com/amqp/rhea#container
* /
const connectOptions : ContainerOptions = {
host : credentials.hostname ,
hostname : credentials.hostname ,
port : credentials.port ,
2021-01-20 04:20:48 -08:00
reconnect : containerReconnect ,
2020-12-30 04:29:29 -08:00
reconnect_limit : containerReconnectLimit ,
username : credentials.username ? credentials.username : undefined ,
password : credentials.password ? credentials.password : undefined ,
transport : credentials.transportType ? credentials.transportType : undefined ,
2021-01-20 04:20:48 -08:00
container_id : containerId ? containerId : undefined ,
id : containerId ? containerId : undefined ,
2020-12-30 04:29:29 -08:00
} ;
2019-11-02 04:16:20 -07:00
const connection = container . connect ( connectOptions ) ;
2020-12-30 04:29:29 -08:00
2021-01-20 04:20:48 -08:00
const clientOptions : ReceiverOptions = {
2020-12-30 04:57:53 -08:00
name : subscription ? subscription : undefined ,
2020-12-30 04:29:29 -08:00
source : {
address : sink ,
durable : ( durable ? 2 : undefined ) ,
expiry_policy : ( durable ? 'never' : undefined ) ,
} ,
credit_window : 0 , // prefetch 1
} ;
2019-11-01 00:42:14 -07:00
connection . open_receiver ( clientOptions ) ;
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
2020-11-10 23:45:31 -08:00
container . removeAllListeners ( 'receiver_open' ) ;
container . removeAllListeners ( 'message' ) ;
2019-11-01 00:42:14 -07:00
connection . close ( ) ;
}
2021-01-20 04:20:48 -08:00
2019-11-01 00:42:14 -07:00
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the
// workflow manually.
2019-11-01 05:50:33 -07:00
// for AMQP it doesn't make much sense to wait here but
// for a new user who doesn't know how this works, it's better to wait and show a respective info message
2019-11-01 00:42:14 -07:00
async function manualTriggerFunction() {
2020-10-28 15:30:30 -07:00
await new Promise ( ( resolve , reject ) = > {
2019-11-02 04:16:20 -07:00
const timeoutHandler = setTimeout ( ( ) = > {
reject ( new Error ( 'Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.' ) ) ;
2021-01-20 04:20:48 -08:00
} , 30000 ) ;
container . on ( 'message' , ( context : EventContext ) = > {
2020-08-05 02:09:36 -07:00
// Check if the only property present in the message is body
// in which case we only emit the content of the body property
// otherwise we emit all properties and their content
2020-12-30 04:57:53 -08:00
const message = context . message as Message ;
if ( Object . keys ( message ) [ 0 ] === 'body' && Object . keys ( message ) . length === 1 ) {
self . emit ( [ self . helpers . returnJsonArray ( [ message . body ] ) ] ) ;
2020-08-05 02:09:36 -07:00
} else {
2021-01-20 04:20:48 -08:00
self . emit ( [ self . helpers . returnJsonArray ( [ message as any ] ) ] ) ; // tslint:disable-line:no-any
2020-08-05 02:09:36 -07:00
}
2019-11-01 05:50:33 -07:00
clearTimeout ( timeoutHandler ) ;
resolve ( true ) ;
} ) ;
} ) ;
2019-11-01 00:42:14 -07:00
}
return {
closeFunction ,
manualTriggerFunction ,
} ;
}
}