2023-03-09 09:13:15 -08:00
import type {
ITriggerFunctions ,
IDataObject ,
INodeType ,
INodeTypeDescription ,
ITriggerResponse ,
2023-08-16 04:06:47 -07:00
IDeferredPromise ,
IRun ,
2023-03-09 09:13:15 -08:00
} from 'n8n-workflow' ;
2023-01-27 03:22:44 -08:00
import { NodeOperationError } from 'n8n-workflow' ;
2020-09-01 08:40:18 -07:00
2023-08-03 07:31:55 -07:00
import * as mqtt from 'mqtt' ;
2023-08-09 04:30:53 -07:00
import { formatPrivateKey } from '@utils/utilities' ;
2020-09-01 08:40:18 -07:00
export class MqttTrigger implements INodeType {
description : INodeTypeDescription = {
displayName : 'MQTT Trigger' ,
name : 'mqttTrigger' ,
2021-04-30 18:23:25 -07:00
icon : 'file:mqtt.svg' ,
2020-09-01 08:40:18 -07:00
group : [ 'trigger' ] ,
version : 1 ,
description : 'Listens to MQTT events' ,
2023-06-23 03:29:24 -07:00
eventTriggerDescription : '' ,
2020-09-01 08:40:18 -07:00
defaults : {
name : 'MQTT Trigger' ,
} ,
2023-06-23 03:29:24 -07:00
triggerPanel : {
header : '' ,
executionsHelp : {
inactive :
"<b>While building your workflow</b>, click the 'listen' button, then trigger an MQTT event. This will trigger an execution, which will show up in this editor.<br /> <br /><b>Once you're happy with your workflow</b>, <a data-key='activate'>activate</a> it. Then every time a change is detected, the workflow will execute. These executions will show up in the <a data-key='executions'>executions list</a>, but not in the editor." ,
active :
"<b>While building your workflow</b>, click the 'listen' button, then trigger an MQTT event. This will trigger an execution, which will show up in this editor.<br /> <br /><b>Your workflow will also execute automatically</b>, since it's activated. Every time a change is detected, this node will trigger an execution. These executions will show up in the <a data-key='executions'>executions list</a>, but not in the editor." ,
} ,
activationHint :
"Once you’ ve finished building your workflow, <a data-key='activate'>activate</a> it to have it also listen continuously (you just won’ t see those executions here)." ,
} ,
2020-09-01 08:40:18 -07:00
inputs : [ ] ,
outputs : [ 'main' ] ,
credentials : [
{
name : 'mqtt' ,
required : true ,
} ,
] ,
properties : [
{
displayName : 'Topics' ,
name : 'topics' ,
type : 'string' ,
default : '' ,
2022-08-17 08:50:24 -07:00
description :
'Topics to subscribe to, multiple can be defined with comma. Wildcard characters are supported (+ - for single level and # - for multi level). By default all subscription used QoS=0. To set a different QoS, write the QoS desired after the topic preceded by a colom. For Example: topicA:1,topicB:2' ,
2020-09-01 08:40:18 -07:00
} ,
{
displayName : 'Options' ,
name : 'options' ,
type : 'collection' ,
placeholder : 'Add Option' ,
default : { } ,
options : [
{
2021-04-30 18:23:25 -07:00
displayName : 'JSON Parse Body' ,
name : 'jsonParseBody' ,
2020-09-01 08:40:18 -07:00
type : 'boolean' ,
default : false ,
2022-06-20 07:54:01 -07:00
description : 'Whether to try parse the message to an object' ,
2020-09-01 08:40:18 -07:00
} ,
{
2021-04-30 18:23:25 -07:00
displayName : 'Only Message' ,
name : 'onlyMessage' ,
2020-09-01 08:40:18 -07:00
type : 'boolean' ,
default : false ,
2022-06-20 07:54:01 -07:00
description : 'Whether to return only the message property' ,
2020-09-01 08:40:18 -07:00
} ,
2023-08-16 04:06:47 -07:00
{
displayName : 'Parallel Processing' ,
name : 'parallelProcessing' ,
type : 'boolean' ,
default : true ,
description :
'Whether to process messages in parallel or by keeping the message in order' ,
} ,
2020-09-01 08:40:18 -07:00
] ,
} ,
] ,
} ;
async trigger ( this : ITriggerFunctions ) : Promise < ITriggerResponse > {
2021-08-20 09:57:30 -07:00
const credentials = await this . getCredentials ( 'mqtt' ) ;
2020-09-01 08:40:18 -07:00
const topics = ( this . getNodeParameter ( 'topics' ) as string ) . split ( ',' ) ;
2021-04-30 18:23:25 -07:00
const topicsQoS : IDataObject = { } ;
for ( const data of topics ) {
const [ topic , qos ] = data . split ( ':' ) ;
2022-08-17 08:50:24 -07:00
topicsQoS [ topic ] = qos ? { qos : parseInt ( qos , 10 ) } : { qos : 0 } ;
2021-04-30 18:23:25 -07:00
}
2020-09-01 08:40:18 -07:00
const options = this . getNodeParameter ( 'options' ) as IDataObject ;
2023-08-16 04:06:47 -07:00
const parallelProcessing = this . getNodeParameter ( 'options.parallelProcessing' , true ) as boolean ;
2020-09-01 08:40:18 -07:00
if ( ! topics ) {
2021-04-16 09:33:36 -07:00
throw new NodeOperationError ( this . getNode ( ) , 'Topics are mandatory!' ) ;
2020-09-01 08:40:18 -07:00
}
2022-08-17 08:50:24 -07:00
const protocol = ( credentials . protocol as string ) || 'mqtt' ;
2020-09-01 08:40:18 -07:00
const host = credentials . host as string ;
const brokerUrl = ` ${ protocol } :// ${ host } ` ;
2022-08-17 08:50:24 -07:00
const port = ( credentials . port as number ) || 1883 ;
const clientId =
( credentials . clientId as string ) || ` mqttjs_ ${ Math . random ( ) . toString ( 16 ) . substr ( 2 , 8 ) } ` ;
2021-04-30 18:23:25 -07:00
const clean = credentials . clean as boolean ;
2021-09-03 05:37:19 -07:00
const ssl = credentials . ssl as boolean ;
2023-08-09 04:30:53 -07:00
const ca = formatPrivateKey ( credentials . ca as string ) ;
const cert = formatPrivateKey ( credentials . cert as string ) ;
const key = formatPrivateKey ( credentials . key as string ) ;
2021-09-03 05:37:19 -07:00
const rejectUnauthorized = credentials . rejectUnauthorized as boolean ;
let client : mqtt.MqttClient ;
2022-12-02 12:54:28 -08:00
if ( ! ssl ) {
const clientOptions : mqtt.IClientOptions = {
2021-09-03 05:37:19 -07:00
port ,
clean ,
clientId ,
} ;
if ( credentials . username && credentials . password ) {
2022-08-17 08:50:24 -07:00
clientOptions . username = credentials . username as string ;
clientOptions . password = credentials . password as string ;
2021-09-03 05:37:19 -07:00
}
2022-08-17 08:50:24 -07:00
client = mqtt . connect ( brokerUrl , clientOptions ) ;
} else {
2022-12-02 12:54:28 -08:00
const clientOptions : mqtt.IClientOptions = {
2021-09-03 05:37:19 -07:00
port ,
clean ,
clientId ,
ca ,
cert ,
key ,
2021-11-25 09:10:06 -08:00
rejectUnauthorized ,
2021-09-03 05:37:19 -07:00
} ;
if ( credentials . username && credentials . password ) {
clientOptions . username = credentials . username as string ;
clientOptions . password = credentials . password as string ;
}
2022-08-17 08:50:24 -07:00
client = mqtt . connect ( brokerUrl , clientOptions ) ;
2020-09-01 08:40:18 -07:00
}
2023-01-13 09:11:56 -08:00
const manualTriggerFunction = async ( ) = > {
2020-09-01 08:46:32 -07:00
await new Promise ( ( resolve , reject ) = > {
2020-09-01 08:40:18 -07:00
client . on ( 'connect' , ( ) = > {
2023-08-03 07:31:55 -07:00
client . subscribe ( topicsQoS as mqtt . ISubscriptionMap , ( error , _granted ) = > {
if ( error ) {
reject ( error ) ;
2020-09-01 08:40:18 -07:00
}
2023-08-16 04:06:47 -07:00
client . on ( 'message' , async ( topic : string , message : Buffer | string ) = > {
2020-09-01 08:40:18 -07:00
let result : IDataObject = { } ;
2022-12-02 12:54:28 -08:00
message = message . toString ( ) ;
2020-09-01 08:40:18 -07:00
2021-04-30 18:23:25 -07:00
if ( options . jsonParseBody ) {
2020-09-01 08:40:18 -07:00
try {
message = JSON . parse ( message . toString ( ) ) ;
2023-08-03 07:31:55 -07:00
} catch ( e ) { }
2020-09-01 08:40:18 -07:00
}
result . message = message ;
result . topic = topic ;
if ( options . onlyMessage ) {
//@ts-ignore
2021-04-30 18:23:25 -07:00
result = [ message as string ] ;
2020-09-01 08:40:18 -07:00
}
2023-08-16 04:06:47 -07:00
let responsePromise : IDeferredPromise < IRun > | undefined ;
if ( ! parallelProcessing ) {
responsePromise = await this . helpers . createDeferredPromise ( ) ;
}
this . emit ( [ this . helpers . returnJsonArray ( [ result ] ) ] , undefined , responsePromise ) ;
if ( responsePromise ) {
await responsePromise . promise ( ) ;
}
2020-09-01 08:40:18 -07:00
resolve ( true ) ;
} ) ;
} ) ;
} ) ;
client . on ( 'error' , ( error ) = > {
reject ( error ) ;
} ) ;
} ) ;
2023-01-13 09:11:56 -08:00
} ;
2020-09-01 08:40:18 -07:00
2021-04-30 18:23:25 -07:00
if ( this . getMode ( ) === 'trigger' ) {
2023-10-24 00:37:49 -07:00
void manualTriggerFunction ( ) ;
2021-04-30 18:23:25 -07:00
}
2020-09-01 08:40:18 -07:00
async function closeFunction() {
client . end ( ) ;
}
return {
closeFunction ,
manualTriggerFunction ,
} ;
}
}