2022-08-17 08:50:24 -07:00
import { ITriggerFunctions } from 'n8n-core' ;
2020-09-01 08:40:18 -07:00
import {
2020-09-01 08:46:32 -07:00
IDataObject ,
2020-09-01 08:40:18 -07:00
INodeType ,
INodeTypeDescription ,
ITriggerResponse ,
2021-04-16 09:33:36 -07:00
NodeOperationError ,
2020-09-01 08:40:18 -07:00
} from 'n8n-workflow' ;
2022-04-08 14:32:08 -07:00
import mqtt from 'mqtt' ;
2020-09-01 08:40:18 -07:00
2022-08-17 08:50:24 -07:00
import { IClientOptions , ISubscriptionMap } from 'mqtt' ;
2020-09-01 08:40:18 -07:00
export class MqttTrigger implements INodeType {
description : INodeTypeDescription = {
displayName : 'MQTT Trigger' ,
name : 'mqttTrigger' ,
2021-04-30 18:23:25 -07:00
icon : 'file:mqtt.svg' ,
2020-09-01 08:40:18 -07:00
group : [ 'trigger' ] ,
version : 1 ,
description : 'Listens to MQTT events' ,
defaults : {
name : 'MQTT Trigger' ,
} ,
inputs : [ ] ,
outputs : [ 'main' ] ,
credentials : [
{
name : 'mqtt' ,
required : true ,
} ,
] ,
properties : [
{
displayName : 'Topics' ,
name : 'topics' ,
type : 'string' ,
default : '' ,
2022-08-17 08:50:24 -07:00
description :
'Topics to subscribe to, multiple can be defined with comma. Wildcard characters are supported (+ - for single level and # - for multi level). By default all subscription used QoS=0. To set a different QoS, write the QoS desired after the topic preceded by a colom. For Example: topicA:1,topicB:2' ,
2020-09-01 08:40:18 -07:00
} ,
{
displayName : 'Options' ,
name : 'options' ,
type : 'collection' ,
placeholder : 'Add Option' ,
default : { } ,
options : [
{
2021-04-30 18:23:25 -07:00
displayName : 'JSON Parse Body' ,
name : 'jsonParseBody' ,
2020-09-01 08:40:18 -07:00
type : 'boolean' ,
default : false ,
2022-06-20 07:54:01 -07:00
description : 'Whether to try parse the message to an object' ,
2020-09-01 08:40:18 -07:00
} ,
{
2021-04-30 18:23:25 -07:00
displayName : 'Only Message' ,
name : 'onlyMessage' ,
2020-09-01 08:40:18 -07:00
type : 'boolean' ,
default : false ,
2022-06-20 07:54:01 -07:00
description : 'Whether to return only the message property' ,
2020-09-01 08:40:18 -07:00
} ,
] ,
} ,
] ,
} ;
async trigger ( this : ITriggerFunctions ) : Promise < ITriggerResponse > {
2021-08-20 09:57:30 -07:00
const credentials = await this . getCredentials ( 'mqtt' ) ;
2020-09-01 08:40:18 -07:00
const topics = ( this . getNodeParameter ( 'topics' ) as string ) . split ( ',' ) ;
2021-04-30 18:23:25 -07:00
const topicsQoS : IDataObject = { } ;
for ( const data of topics ) {
const [ topic , qos ] = data . split ( ':' ) ;
2022-08-17 08:50:24 -07:00
topicsQoS [ topic ] = qos ? { qos : parseInt ( qos , 10 ) } : { qos : 0 } ;
2021-04-30 18:23:25 -07:00
}
2020-09-01 08:40:18 -07:00
const options = this . getNodeParameter ( 'options' ) as IDataObject ;
if ( ! topics ) {
2021-04-16 09:33:36 -07:00
throw new NodeOperationError ( this . getNode ( ) , 'Topics are mandatory!' ) ;
2020-09-01 08:40:18 -07:00
}
2022-08-17 08:50:24 -07:00
const protocol = ( credentials . protocol as string ) || 'mqtt' ;
2020-09-01 08:40:18 -07:00
const host = credentials . host as string ;
const brokerUrl = ` ${ protocol } :// ${ host } ` ;
2022-08-17 08:50:24 -07:00
const port = ( credentials . port as number ) || 1883 ;
const clientId =
( credentials . clientId as string ) || ` mqttjs_ ${ Math . random ( ) . toString ( 16 ) . substr ( 2 , 8 ) } ` ;
2021-04-30 18:23:25 -07:00
const clean = credentials . clean as boolean ;
2021-09-03 05:37:19 -07:00
const ssl = credentials . ssl as boolean ;
const ca = credentials . ca as string ;
const cert = credentials . cert as string ;
const key = credentials . key as string ;
const rejectUnauthorized = credentials . rejectUnauthorized as boolean ;
let client : mqtt.MqttClient ;
if ( ssl === false ) {
const clientOptions : IClientOptions = {
port ,
clean ,
clientId ,
} ;
if ( credentials . username && credentials . password ) {
2022-08-17 08:50:24 -07:00
clientOptions . username = credentials . username as string ;
clientOptions . password = credentials . password as string ;
2021-09-03 05:37:19 -07:00
}
2022-08-17 08:50:24 -07:00
client = mqtt . connect ( brokerUrl , clientOptions ) ;
} else {
2021-09-03 05:37:19 -07:00
const clientOptions : IClientOptions = {
port ,
clean ,
clientId ,
ca ,
cert ,
key ,
2021-11-25 09:10:06 -08:00
rejectUnauthorized ,
2021-09-03 05:37:19 -07:00
} ;
if ( credentials . username && credentials . password ) {
clientOptions . username = credentials . username as string ;
clientOptions . password = credentials . password as string ;
}
2022-08-17 08:50:24 -07:00
client = mqtt . connect ( brokerUrl , clientOptions ) ;
2020-09-01 08:40:18 -07:00
}
const self = this ;
async function manualTriggerFunction() {
2020-09-01 08:46:32 -07:00
await new Promise ( ( resolve , reject ) = > {
2020-09-01 08:40:18 -07:00
client . on ( 'connect' , ( ) = > {
2022-11-08 06:28:21 -08:00
client . subscribe ( topicsQoS as ISubscriptionMap , ( err , _granted ) = > {
2020-09-01 08:40:18 -07:00
if ( err ) {
reject ( err ) ;
}
2022-08-17 08:50:24 -07:00
client . on ( 'message' , ( topic : string , message : Buffer | string ) = > {
// tslint:disable-line:no-any
2020-09-01 08:40:18 -07:00
let result : IDataObject = { } ;
message = message . toString ( ) as string ;
2021-04-30 18:23:25 -07:00
if ( options . jsonParseBody ) {
2020-09-01 08:40:18 -07:00
try {
message = JSON . parse ( message . toString ( ) ) ;
2022-08-17 08:50:24 -07:00
} catch ( err ) { }
2020-09-01 08:40:18 -07:00
}
result . message = message ;
result . topic = topic ;
if ( options . onlyMessage ) {
//@ts-ignore
2021-04-30 18:23:25 -07:00
result = [ message as string ] ;
2020-09-01 08:40:18 -07:00
}
2021-04-30 18:23:25 -07:00
self . emit ( [ self . helpers . returnJsonArray ( result ) ] ) ;
2020-09-01 08:40:18 -07:00
resolve ( true ) ;
} ) ;
} ) ;
} ) ;
client . on ( 'error' , ( error ) = > {
reject ( error ) ;
} ) ;
} ) ;
}
2021-04-30 18:23:25 -07:00
if ( this . getMode ( ) === 'trigger' ) {
manualTriggerFunction ( ) ;
}
2020-09-01 08:40:18 -07:00
async function closeFunction() {
client . end ( ) ;
}
return {
closeFunction ,
manualTriggerFunction ,
} ;
}
}