2019-11-02 04:16:20 -07:00
import { ContainerOptions } from 'rhea' ;
2019-11-01 00:42:14 -07:00
import { ITriggerFunctions } from 'n8n-core' ;
import {
2020-08-06 05:42:26 -07:00
IDataObject ,
2019-11-01 00:42:14 -07:00
INodeType ,
INodeTypeDescription ,
ITriggerResponse ,
} from 'n8n-workflow' ;
2019-11-01 05:50:33 -07:00
export class AmqpTrigger implements INodeType {
2019-11-01 00:42:14 -07:00
description : INodeTypeDescription = {
2019-11-01 05:50:33 -07:00
displayName : 'AMQP Trigger' ,
name : 'amqpTrigger' ,
2019-11-01 00:42:14 -07:00
icon : 'file:amqp.png' ,
group : [ 'trigger' ] ,
version : 1 ,
description : 'Listens to AMQP 1.0 Messages' ,
defaults : {
2019-11-01 05:50:33 -07:00
name : 'AMQP Trigger' ,
2019-11-01 00:42:14 -07:00
color : '#00FF00' ,
} ,
inputs : [ ] ,
outputs : [ 'main' ] ,
credentials : [ {
name : 'amqp' ,
2019-11-01 05:50:33 -07:00
required : true ,
2019-11-01 00:42:14 -07:00
} ] ,
properties : [
// Node properties which the user gets displayed and
// can change on the node.
{
displayName : 'Queue / Topic' ,
name : 'sink' ,
type : 'string' ,
default : '' ,
placeholder : 'topic://sourcename.something' ,
description : 'name of the queue of topic to listen to' ,
} ,
{
displayName : 'Clientname' ,
name : 'clientname' ,
type : 'string' ,
default : '' ,
placeholder : 'for durable/persistent topic subscriptions, example: "n8n"' ,
description : 'Leave empty for non-durable topic subscriptions or queues. ' ,
} ,
{
displayName : 'Subscription' ,
name : 'subscription' ,
type : 'string' ,
default : '' ,
placeholder : 'for durable/persistent topic subscriptions, example: "order-worker"' ,
description : 'Leave empty for non-durable topic subscriptions or queues' ,
} ,
2020-08-06 05:42:26 -07:00
{
displayName : 'Options' ,
name : 'options' ,
type : 'collection' ,
placeholder : 'Add Option' ,
default : { } ,
options : [
{
displayName : 'Only Body' ,
name : 'onlyBody' ,
type : 'boolean' ,
default : false ,
description : 'Returns only the body property.' ,
} ,
{
displayName : 'JSON Parse Body' ,
name : 'jsonParseBody' ,
type : 'boolean' ,
default : false ,
description : 'Parse the body to an object.' ,
} ,
] ,
} ,
2020-10-22 06:46:03 -07:00
] ,
2019-11-01 00:42:14 -07:00
} ;
async trigger ( this : ITriggerFunctions ) : Promise < ITriggerResponse > {
const credentials = this . getCredentials ( 'amqp' ) ;
2019-11-01 05:50:33 -07:00
if ( ! credentials ) {
throw new Error ( 'Credentials are mandatory!' ) ;
}
2019-11-01 00:42:14 -07:00
const sink = this . getNodeParameter ( 'sink' , '' ) as string ;
const clientname = this . getNodeParameter ( 'clientname' , '' ) as string ;
const subscription = this . getNodeParameter ( 'subscription' , '' ) as string ;
2020-08-06 05:42:26 -07:00
const options = this . getNodeParameter ( 'options' , { } ) as IDataObject ;
2019-11-01 00:42:14 -07:00
2019-11-02 04:16:20 -07:00
if ( sink === '' ) {
2019-11-01 00:42:14 -07:00
throw new Error ( 'Queue or Topic required!' ) ;
}
2020-08-05 02:09:36 -07:00
2019-11-02 04:16:20 -07:00
let durable = false ;
2020-08-05 02:09:36 -07:00
if ( subscription && clientname ) {
2019-11-01 00:42:14 -07:00
durable = true ;
}
2019-11-02 04:16:20 -07:00
const container = require ( 'rhea' ) ;
const connectOptions : ContainerOptions = {
2019-11-01 05:50:33 -07:00
host : credentials.hostname ,
port : credentials.port ,
2019-11-01 00:42:14 -07:00
reconnect : true , // this id the default anyway
reconnect_limit : 50 , // try for max 50 times, based on a back-off algorithm
2020-10-22 06:46:03 -07:00
container_id : ( durable ? clientname : null ) ,
2019-11-02 04:16:20 -07:00
} ;
2019-11-01 05:50:33 -07:00
if ( credentials . username || credentials . password ) {
2019-11-01 00:42:14 -07:00
container . options . username = credentials . username ;
container . options . password = credentials . password ;
}
2019-11-02 04:16:20 -07:00
let lastMsgId : number | undefined = undefined ;
const self = this ;
2019-11-01 00:42:14 -07:00
2019-11-02 04:16:20 -07:00
container . on ( 'message' , ( context : any ) = > { // tslint:disable-line:no-any
if ( context . message . message_id && context . message . message_id === lastMsgId ) {
2019-11-01 00:42:14 -07:00
// ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code
lastMsgId = context . message . message_id ;
return ;
}
2020-08-05 02:09:36 -07:00
2020-08-06 05:42:26 -07:00
let data = context . message ;
if ( options . jsonParseBody === true ) {
data . body = JSON . parse ( data . body ) ;
}
if ( options . onlyBody === true ) {
data = data . body ;
2020-08-05 02:09:36 -07:00
}
2020-08-06 05:42:26 -07:00
self . emit ( [ self . helpers . returnJsonArray ( [ data ] ) ] ) ;
2019-11-01 00:42:14 -07:00
} ) ;
2019-11-02 04:16:20 -07:00
const connection = container . connect ( connectOptions ) ;
2019-11-01 00:42:14 -07:00
let clientOptions = undefined ;
if ( durable ) {
clientOptions = {
name : subscription ,
source : {
address : sink ,
durable : 2 ,
2020-10-22 06:46:03 -07:00
expiry_policy : 'never' ,
2019-11-01 00:42:14 -07:00
} ,
2020-10-22 06:46:03 -07:00
credit_window : 1 , // prefetch 1
2019-11-02 04:16:20 -07:00
} ;
2019-11-01 00:42:14 -07:00
} else {
clientOptions = {
source : {
address : sink ,
} ,
2020-10-22 06:46:03 -07:00
credit_window : 1 , // prefetch 1
2019-11-02 04:16:20 -07:00
} ;
2019-11-01 00:42:14 -07:00
}
connection . open_receiver ( clientOptions ) ;
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
connection . close ( ) ;
}
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the
// workflow manually.
2019-11-01 05:50:33 -07:00
// for AMQP it doesn't make much sense to wait here but
// for a new user who doesn't know how this works, it's better to wait and show a respective info message
2019-11-01 00:42:14 -07:00
async function manualTriggerFunction() {
2019-11-02 04:16:20 -07:00
await new Promise ( ( resolve , reject ) = > {
const timeoutHandler = setTimeout ( ( ) = > {
reject ( new Error ( 'Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.' ) ) ;
2019-11-01 05:50:33 -07:00
} , 30000 ) ;
2019-11-02 04:16:20 -07:00
container . on ( 'message' , ( context : any ) = > { // tslint:disable-line:no-any
2020-08-05 02:09:36 -07:00
// Check if the only property present in the message is body
// in which case we only emit the content of the body property
// otherwise we emit all properties and their content
if ( Object . keys ( context . message ) [ 0 ] === 'body' && Object . keys ( context . message ) . length === 1 ) {
self . emit ( [ self . helpers . returnJsonArray ( [ context . message . body ] ) ] ) ;
} else {
self . emit ( [ self . helpers . returnJsonArray ( [ context . message ] ) ] ) ;
}
2019-11-01 05:50:33 -07:00
clearTimeout ( timeoutHandler ) ;
resolve ( true ) ;
} ) ;
} ) ;
2019-11-01 00:42:14 -07:00
}
return {
closeFunction ,
manualTriggerFunction ,
} ;
}
}