2019-11-02 04:16:20 -07:00
import { ContainerOptions } from 'rhea' ;
2019-11-01 00:42:14 -07:00
import { ITriggerFunctions } from 'n8n-core' ;
import {
INodeType ,
INodeTypeDescription ,
ITriggerResponse ,
} from 'n8n-workflow' ;
2019-11-01 05:50:33 -07:00
export class AmqpTrigger implements INodeType {
2019-11-01 00:42:14 -07:00
description : INodeTypeDescription = {
2019-11-01 05:50:33 -07:00
displayName : 'AMQP Trigger' ,
name : 'amqpTrigger' ,
2019-11-01 00:42:14 -07:00
icon : 'file:amqp.png' ,
group : [ 'trigger' ] ,
version : 1 ,
description : 'Listens to AMQP 1.0 Messages' ,
defaults : {
2019-11-01 05:50:33 -07:00
name : 'AMQP Trigger' ,
2019-11-01 00:42:14 -07:00
color : '#00FF00' ,
} ,
inputs : [ ] ,
outputs : [ 'main' ] ,
credentials : [ {
name : 'amqp' ,
2019-11-01 05:50:33 -07:00
required : true ,
2019-11-01 00:42:14 -07:00
} ] ,
properties : [
// Node properties which the user gets displayed and
// can change on the node.
{
displayName : 'Queue / Topic' ,
name : 'sink' ,
type : 'string' ,
default : '' ,
placeholder : 'topic://sourcename.something' ,
description : 'name of the queue of topic to listen to' ,
} ,
{
displayName : 'Clientname' ,
name : 'clientname' ,
type : 'string' ,
default : '' ,
placeholder : 'for durable/persistent topic subscriptions, example: "n8n"' ,
description : 'Leave empty for non-durable topic subscriptions or queues. ' ,
} ,
{
displayName : 'Subscription' ,
name : 'subscription' ,
type : 'string' ,
default : '' ,
placeholder : 'for durable/persistent topic subscriptions, example: "order-worker"' ,
description : 'Leave empty for non-durable topic subscriptions or queues' ,
} ,
]
} ;
async trigger ( this : ITriggerFunctions ) : Promise < ITriggerResponse > {
const credentials = this . getCredentials ( 'amqp' ) ;
2019-11-01 05:50:33 -07:00
if ( ! credentials ) {
throw new Error ( 'Credentials are mandatory!' ) ;
}
2019-11-01 00:42:14 -07:00
const sink = this . getNodeParameter ( 'sink' , '' ) as string ;
const clientname = this . getNodeParameter ( 'clientname' , '' ) as string ;
const subscription = this . getNodeParameter ( 'subscription' , '' ) as string ;
2019-11-02 04:16:20 -07:00
if ( sink === '' ) {
2019-11-01 00:42:14 -07:00
throw new Error ( 'Queue or Topic required!' ) ;
}
2019-11-02 04:16:20 -07:00
let durable = false ;
2019-11-01 00:42:14 -07:00
if ( subscription && clientname ) {
durable = true ;
}
2019-11-02 04:16:20 -07:00
const container = require ( 'rhea' ) ;
const connectOptions : ContainerOptions = {
2019-11-01 05:50:33 -07:00
host : credentials.hostname ,
port : credentials.port ,
2019-11-01 00:42:14 -07:00
reconnect : true , // this id the default anyway
reconnect_limit : 50 , // try for max 50 times, based on a back-off algorithm
container_id : ( durable ? clientname : null )
2019-11-02 04:16:20 -07:00
} ;
2019-11-01 05:50:33 -07:00
if ( credentials . username || credentials . password ) {
2019-11-01 00:42:14 -07:00
container . options . username = credentials . username ;
container . options . password = credentials . password ;
}
2019-11-02 04:16:20 -07:00
let lastMsgId : number | undefined = undefined ;
const self = this ;
2019-11-01 00:42:14 -07:00
2019-11-02 04:16:20 -07:00
container . on ( 'message' , ( context : any ) = > { // tslint:disable-line:no-any
if ( context . message . message_id && context . message . message_id === lastMsgId ) {
2019-11-01 00:42:14 -07:00
// ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code
lastMsgId = context . message . message_id ;
return ;
}
self . emit ( [ self . helpers . returnJsonArray ( [ context . message ] ) ] ) ;
} ) ;
2019-11-02 04:16:20 -07:00
const connection = container . connect ( connectOptions ) ;
2019-11-01 00:42:14 -07:00
let clientOptions = undefined ;
if ( durable ) {
clientOptions = {
name : subscription ,
source : {
address : sink ,
durable : 2 ,
expiry_policy : 'never'
} ,
credit_window : 1 // prefetch 1
2019-11-02 04:16:20 -07:00
} ;
2019-11-01 00:42:14 -07:00
} else {
clientOptions = {
source : {
address : sink ,
} ,
credit_window : 1 // prefetch 1
2019-11-02 04:16:20 -07:00
} ;
2019-11-01 00:42:14 -07:00
}
connection . open_receiver ( clientOptions ) ;
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
connection . close ( ) ;
}
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the
// workflow manually.
2019-11-01 05:50:33 -07:00
// for AMQP it doesn't make much sense to wait here but
// for a new user who doesn't know how this works, it's better to wait and show a respective info message
2019-11-01 00:42:14 -07:00
async function manualTriggerFunction() {
2019-11-02 04:16:20 -07:00
await new Promise ( ( resolve , reject ) = > {
const timeoutHandler = setTimeout ( ( ) = > {
reject ( new Error ( 'Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.' ) ) ;
2019-11-01 05:50:33 -07:00
} , 30000 ) ;
2019-11-02 04:16:20 -07:00
container . on ( 'message' , ( context : any ) = > { // tslint:disable-line:no-any
2019-11-01 05:50:33 -07:00
clearTimeout ( timeoutHandler ) ;
resolve ( true ) ;
} ) ;
} ) ;
2019-11-01 00:42:14 -07:00
}
return {
closeFunction ,
manualTriggerFunction ,
} ;
}
}