2019-11-02 04:16:20 -07:00
import { ContainerOptions } from 'rhea' ;
2019-11-01 00:42:14 -07:00
import { ITriggerFunctions } from 'n8n-core' ;
import {
2020-08-06 05:42:26 -07:00
IDataObject ,
2019-11-01 00:42:14 -07:00
INodeType ,
INodeTypeDescription ,
ITriggerResponse ,
} from 'n8n-workflow' ;
2019-11-01 05:50:33 -07:00
export class AmqpTrigger implements INodeType {
2019-11-01 00:42:14 -07:00
description : INodeTypeDescription = {
2019-11-01 05:50:33 -07:00
displayName : 'AMQP Trigger' ,
name : 'amqpTrigger' ,
2019-11-01 00:42:14 -07:00
icon : 'file:amqp.png' ,
group : [ 'trigger' ] ,
version : 1 ,
description : 'Listens to AMQP 1.0 Messages' ,
defaults : {
2019-11-01 05:50:33 -07:00
name : 'AMQP Trigger' ,
2019-11-01 00:42:14 -07:00
color : '#00FF00' ,
} ,
inputs : [ ] ,
outputs : [ 'main' ] ,
credentials : [ {
name : 'amqp' ,
2019-11-01 05:50:33 -07:00
required : true ,
2019-11-01 00:42:14 -07:00
} ] ,
properties : [
// Node properties which the user gets displayed and
// can change on the node.
{
displayName : 'Queue / Topic' ,
name : 'sink' ,
type : 'string' ,
default : '' ,
placeholder : 'topic://sourcename.something' ,
description : 'name of the queue of topic to listen to' ,
} ,
{
displayName : 'Clientname' ,
name : 'clientname' ,
type : 'string' ,
default : '' ,
placeholder : 'for durable/persistent topic subscriptions, example: "n8n"' ,
description : 'Leave empty for non-durable topic subscriptions or queues. ' ,
} ,
{
displayName : 'Subscription' ,
name : 'subscription' ,
type : 'string' ,
default : '' ,
placeholder : 'for durable/persistent topic subscriptions, example: "order-worker"' ,
description : 'Leave empty for non-durable topic subscriptions or queues' ,
} ,
2020-08-06 05:42:26 -07:00
{
displayName : 'Options' ,
name : 'options' ,
type : 'collection' ,
placeholder : 'Add Option' ,
default : { } ,
options : [
{
2020-10-28 15:30:30 -07:00
displayName : 'Convert Body To String' ,
name : 'jsonConvertByteArrayToString' ,
2020-08-06 05:42:26 -07:00
type : 'boolean' ,
default : false ,
2020-10-28 15:30:30 -07:00
description : 'Convert JSON Body content (["body"]["content"]) from Byte Array to string. Needed for Azure Service Bus.' ,
2020-08-06 05:42:26 -07:00
} ,
{
displayName : 'JSON Parse Body' ,
name : 'jsonParseBody' ,
type : 'boolean' ,
default : false ,
description : 'Parse the body to an object.' ,
} ,
2020-10-28 15:29:45 -07:00
{
2020-10-28 15:30:30 -07:00
displayName : 'Only Body' ,
name : 'onlyBody' ,
2020-10-28 15:29:45 -07:00
type : 'boolean' ,
default : false ,
2020-10-28 15:30:30 -07:00
description : 'Returns only the body property.' ,
} ,
2020-11-10 23:45:31 -08:00
{
displayName : 'Messages per Cicle' ,
name : 'pullMessagesNumber' ,
type : 'number' ,
default : 100 ,
description : 'Number of messages to pull from the bus for every cicle' ,
} ,
{
displayName : 'Sleep Time' ,
name : 'sleepTime' ,
type : 'number' ,
default : 10 ,
description : 'Milliseconds to sleep after every cicle.' ,
} ,
2020-08-06 05:42:26 -07:00
] ,
} ,
2020-11-10 23:45:31 -08:00
] ,
2019-11-01 00:42:14 -07:00
} ;
async trigger ( this : ITriggerFunctions ) : Promise < ITriggerResponse > {
const credentials = this . getCredentials ( 'amqp' ) ;
2019-11-01 05:50:33 -07:00
if ( ! credentials ) {
throw new Error ( 'Credentials are mandatory!' ) ;
}
2019-11-01 00:42:14 -07:00
const sink = this . getNodeParameter ( 'sink' , '' ) as string ;
const clientname = this . getNodeParameter ( 'clientname' , '' ) as string ;
const subscription = this . getNodeParameter ( 'subscription' , '' ) as string ;
2020-08-06 05:42:26 -07:00
const options = this . getNodeParameter ( 'options' , { } ) as IDataObject ;
2020-11-10 23:45:31 -08:00
const pullMessagesNumber = options . pullMessagesNumber || 100 ;
2019-11-01 00:42:14 -07:00
2019-11-02 04:16:20 -07:00
if ( sink === '' ) {
2019-11-01 00:42:14 -07:00
throw new Error ( 'Queue or Topic required!' ) ;
}
2020-08-05 02:09:36 -07:00
2019-11-02 04:16:20 -07:00
let durable = false ;
2020-08-05 02:09:36 -07:00
if ( subscription && clientname ) {
2019-11-01 00:42:14 -07:00
durable = true ;
}
2019-11-02 04:16:20 -07:00
const container = require ( 'rhea' ) ;
const connectOptions : ContainerOptions = {
2019-11-01 05:50:33 -07:00
host : credentials.hostname ,
2020-10-28 15:29:45 -07:00
hostname : credentials.hostname ,
2019-11-01 05:50:33 -07:00
port : credentials.port ,
2019-11-01 00:42:14 -07:00
reconnect : true , // this id the default anyway
reconnect_limit : 50 , // try for max 50 times, based on a back-off algorithm
2020-11-10 23:45:31 -08:00
container_id : ( durable ? clientname : null ) ,
2019-11-02 04:16:20 -07:00
} ;
2019-11-01 05:50:33 -07:00
if ( credentials . username || credentials . password ) {
2020-10-28 15:29:45 -07:00
// Old rhea implementation. not shure if it is neccessary
2019-11-01 00:42:14 -07:00
container . options . username = credentials . username ;
container . options . password = credentials . password ;
2020-10-28 15:29:45 -07:00
connectOptions . username = credentials . username ;
connectOptions . password = credentials . password ;
2020-10-28 15:30:30 -07:00
}
2020-10-28 15:29:45 -07:00
if ( credentials . transportType ) {
connectOptions . transport = credentials . transportType ;
2019-11-01 00:42:14 -07:00
}
2019-11-02 04:16:20 -07:00
let lastMsgId : number | undefined = undefined ;
const self = this ;
2019-11-01 00:42:14 -07:00
2020-11-10 23:45:31 -08:00
container . on ( 'receiver_open' , ( context : any ) = > { // tslint:disable-line:no-any
2020-11-10 23:44:59 -08:00
context . receiver . add_credit ( pullMessagesNumber ) ;
} ) ;
2019-11-02 04:16:20 -07:00
container . on ( 'message' , ( context : any ) = > { // tslint:disable-line:no-any
2020-10-28 15:29:45 -07:00
// ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code
2019-11-02 04:16:20 -07:00
if ( context . message . message_id && context . message . message_id === lastMsgId ) {
2019-11-01 00:42:14 -07:00
return ;
}
2020-10-28 15:29:45 -07:00
lastMsgId = context . message . message_id ;
2020-08-05 02:09:36 -07:00
2020-08-06 05:42:26 -07:00
let data = context . message ;
2020-11-10 23:45:31 -08:00
if ( options . jsonConvertByteArrayToString === true && data . body . content !== undefined ) {
2020-11-10 23:44:59 -08:00
// The buffer is not ready... Stringify and parse back to load it.
2020-11-10 23:45:31 -08:00
const cont = JSON . stringify ( data . body . content ) ;
data . body = String . fromCharCode . apply ( null , JSON . parse ( cont ) . data ) ;
2020-11-10 23:44:59 -08:00
}
2020-10-28 15:30:30 -07:00
if ( options . jsonConvertByteArrayToString === true && data . body . content !== undefined ) {
2020-10-28 15:29:45 -07:00
// The buffer is not ready... Stringify and parse back to load it.
2020-10-28 15:30:30 -07:00
const content = JSON . stringify ( data . body . content ) ;
data . body = String . fromCharCode . apply ( null , JSON . parse ( content ) . data ) ;
2020-10-28 15:29:45 -07:00
}
2020-08-06 05:42:26 -07:00
if ( options . jsonParseBody === true ) {
data . body = JSON . parse ( data . body ) ;
}
if ( options . onlyBody === true ) {
data = data . body ;
2020-08-05 02:09:36 -07:00
}
2020-08-06 05:42:26 -07:00
2020-10-28 15:29:45 -07:00
2020-08-06 05:42:26 -07:00
self . emit ( [ self . helpers . returnJsonArray ( [ data ] ) ] ) ;
2020-11-10 23:44:59 -08:00
2020-11-10 23:45:31 -08:00
if ( context . receiver . credit === 0 ) {
setTimeout ( ( ) = > {
context . receiver . add_credit ( pullMessagesNumber ) ;
} , options . sleepTime as number || 10 ) ;
}
2019-11-01 00:42:14 -07:00
} ) ;
2019-11-02 04:16:20 -07:00
const connection = container . connect ( connectOptions ) ;
2019-11-01 00:42:14 -07:00
let clientOptions = undefined ;
if ( durable ) {
clientOptions = {
name : subscription ,
source : {
address : sink ,
durable : 2 ,
2020-11-10 23:45:31 -08:00
expiry_policy : 'never' ,
2019-11-01 00:42:14 -07:00
} ,
2020-11-10 23:45:31 -08:00
credit_window : 0 , // prefetch 1
2019-11-02 04:16:20 -07:00
} ;
2019-11-01 00:42:14 -07:00
} else {
clientOptions = {
source : {
address : sink ,
} ,
2020-11-10 23:45:31 -08:00
credit_window : 0 , // prefetch 1
2019-11-02 04:16:20 -07:00
} ;
2019-11-01 00:42:14 -07:00
}
connection . open_receiver ( clientOptions ) ;
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
2020-11-10 23:45:31 -08:00
container . removeAllListeners ( 'receiver_open' ) ;
container . removeAllListeners ( 'message' ) ;
2019-11-01 00:42:14 -07:00
connection . close ( ) ;
}
2020-11-10 23:45:31 -08:00
2019-11-01 00:42:14 -07:00
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the
// workflow manually.
2019-11-01 05:50:33 -07:00
// for AMQP it doesn't make much sense to wait here but
// for a new user who doesn't know how this works, it's better to wait and show a respective info message
2019-11-01 00:42:14 -07:00
async function manualTriggerFunction() {
2020-10-28 15:30:30 -07:00
await new Promise ( ( resolve , reject ) = > {
2019-11-02 04:16:20 -07:00
const timeoutHandler = setTimeout ( ( ) = > {
reject ( new Error ( 'Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.' ) ) ;
2020-11-10 23:45:31 -08:00
} , 30000 ) ;
2019-11-02 04:16:20 -07:00
container . on ( 'message' , ( context : any ) = > { // tslint:disable-line:no-any
2020-08-05 02:09:36 -07:00
// Check if the only property present in the message is body
// in which case we only emit the content of the body property
// otherwise we emit all properties and their content
if ( Object . keys ( context . message ) [ 0 ] === 'body' && Object . keys ( context . message ) . length === 1 ) {
self . emit ( [ self . helpers . returnJsonArray ( [ context . message . body ] ) ] ) ;
} else {
self . emit ( [ self . helpers . returnJsonArray ( [ context . message ] ) ] ) ;
}
2019-11-01 05:50:33 -07:00
clearTimeout ( timeoutHandler ) ;
resolve ( true ) ;
} ) ;
} ) ;
2019-11-01 00:42:14 -07:00
}
return {
closeFunction ,
manualTriggerFunction ,
} ;
}
}