From b77fd593039dcd778a5f25cef2f035a55bc28bec Mon Sep 17 00:00:00 2001 From: Ricardo Espinoza Date: Tue, 1 Sep 2020 11:40:18 -0400 Subject: [PATCH] :sparkles: Add MQTT Trigger-Node (#885) * Improvements to MQTT-node * :zap: Small improvements done * :zap: Improvements Co-authored-by: LEE SANG JUN --- .../credentials/Mqtt.credentials.ts | 59 +++++++ .../nodes-base/nodes/MQTT/MqttTrigger.node.ts | 158 ++++++++++++++++++ packages/nodes-base/nodes/MQTT/mqtt.png | Bin 0 -> 2310 bytes packages/nodes-base/package.json | 4 + 4 files changed, 221 insertions(+) create mode 100644 packages/nodes-base/credentials/Mqtt.credentials.ts create mode 100644 packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts create mode 100644 packages/nodes-base/nodes/MQTT/mqtt.png diff --git a/packages/nodes-base/credentials/Mqtt.credentials.ts b/packages/nodes-base/credentials/Mqtt.credentials.ts new file mode 100644 index 0000000000..bcc419a524 --- /dev/null +++ b/packages/nodes-base/credentials/Mqtt.credentials.ts @@ -0,0 +1,59 @@ +import { + ICredentialType, + NodePropertyTypes, +} from 'n8n-workflow'; + + +export class Mqtt implements ICredentialType { + name = 'mqtt'; + displayName = 'MQTT'; + properties = [ + // The credentials to get from user and save encrypted. + // Properties can be defined exactly in the same way + // as node properties. + { + displayName: 'Protocol', + name: 'protocol', + type: 'options' as NodePropertyTypes, + options: [ + { + name: 'mqtt', + value: 'mqtt', + }, + { + name: 'ws', + value: 'ws', + }, + ], + default: 'mqtt', + }, + { + displayName: 'Host', + name: 'host', + type: 'string' as NodePropertyTypes, + default: '', + }, + { + displayName: 'Port', + name: 'port', + type: 'number' as NodePropertyTypes, + default: 1883, + }, + { + displayName: 'Username', + name: 'username', + type: 'string' as NodePropertyTypes, + default: '', + }, + { + displayName: 'Password', + name: 'password', + type: 'string' as NodePropertyTypes, + typeOptions: { + password: true, + }, + default: '', + }, + ]; +} + diff --git a/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts new file mode 100644 index 0000000000..0344f290b3 --- /dev/null +++ b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts @@ -0,0 +1,158 @@ +import { + ITriggerFunctions, +} from 'n8n-core'; + +import { + INodeType, + INodeTypeDescription, + ITriggerResponse, + IDataObject, +} from 'n8n-workflow'; + +import * as mqtt from 'mqtt'; + +import { + IClientOptions, +} from 'mqtt'; + +export class MqttTrigger implements INodeType { + description: INodeTypeDescription = { + displayName: 'MQTT Trigger', + name: 'mqttTrigger', + icon: 'file:mqtt.png', + group: ['trigger'], + version: 1, + description: 'Listens to MQTT events', + defaults: { + name: 'MQTT Trigger', + color: '#9b27af', + }, + inputs: [], + outputs: ['main'], + credentials: [ + { + name: 'mqtt', + required: true, + }, + ], + properties: [ + { + displayName: 'Topics', + name: 'topics', + type: 'string', + default: '', + description: `Topics to subscribe to, multiple can be defined with comma.
+ wildcard characters are supported (+ - for single level and # - for multi level)`, + }, + { + displayName: 'Options', + name: 'options', + type: 'collection', + placeholder: 'Add Option', + default: {}, + options: [ + { + displayName: 'Only Message', + name: 'onlyMessage', + type: 'boolean', + default: false, + description: 'Returns only the message property.', + }, + { + displayName: 'JSON Parse Message', + name: 'jsonParseMessage', + type: 'boolean', + default: false, + description: 'Try to parse the message to an object.', + }, + ], + }, + ], + }; + + async trigger(this: ITriggerFunctions): Promise { + + const credentials = this.getCredentials('mqtt'); + + if (!credentials) { + throw new Error('Credentials are mandatory!'); + } + + const topics = (this.getNodeParameter('topics') as string).split(','); + + const options = this.getNodeParameter('options') as IDataObject; + + if (!topics) { + throw new Error('Topics are mandatory!'); + } + + const protocol = credentials.protocol as string || 'mqtt'; + const host = credentials.host as string; + const brokerUrl = `${protocol}://${host}`; + const port = credentials.port as number || 1883; + + const clientOptions: IClientOptions = { + port, + }; + + if (credentials.username && credentials.password) { + clientOptions.username = credentials.username as string; + clientOptions.password = credentials.password as string; + } + + const client = mqtt.connect(brokerUrl, clientOptions); + + const self = this; + + async function manualTriggerFunction() { + await new Promise(( resolve, reject ) => { + client.on('connect', () => { + client.subscribe(topics, (err, granted) => { + if (err) { + reject(err); + } + client.on('message', (topic: string, message: Buffer | string) => { // tslint:disable-line:no-any + + let result: IDataObject = {}; + + message = message.toString() as string; + + if (options.jsonParseMessage) { + try { + message = JSON.parse(message.toString()); + } catch (err) {} + } + + result.message = message; + result.topic = topic; + + if (options.onlyMessage) { + //@ts-ignore + result = message; + } + + self.emit([self.helpers.returnJsonArray([result], + )]); + resolve(true); + }); + }); + }); + + client.on('error', (error) => { + reject(error); + }); + }); + } + + manualTriggerFunction(); + + async function closeFunction() { + client.end(); + } + + return { + closeFunction, + manualTriggerFunction, + }; + } +} diff --git a/packages/nodes-base/nodes/MQTT/mqtt.png b/packages/nodes-base/nodes/MQTT/mqtt.png new file mode 100644 index 0000000000000000000000000000000000000000..12c5f24952b3cc910c95bbd7809f443be26385cd GIT binary patch literal 2310 zcmV+h3HkPkP)q_E1s6v?#7r7 z%bcyxM!zw0Kvh39%mcJA^YbP{8>J$jqFA1Ii=c~CW)-B$ut%UV5y!+MOV72}c9F`b zk4CJh3L~=h%fN|ak4B3aDp*$BcHQ{nKqGZHm32CWz$nM^p9}iv(&F~o5%dUp1U-Tt zL64wE&<<#x6$D0r8Gh^_-YG9#X(WazOI#XsBv!Xg{=2uT^Vt5@$Nu$~)TD-16RXRYD77@KUb3U&+?LDFEf3H5Z~>3_&whKU-(kWrkO4~p`Qesfk^ zz482}a#BMfR%oth#R-byDeY7`>y%6A2lu*lV?9d$b+!1N%W8#YYQ~hsJEJ-pP2NGC zfIj!UfaxbS{$JE^9vMkadW%ojG#ffe(! z&Wsx6#WQHke6JOGNu7;d`{D{oKufo)(jEm1nw%K4EH}RSYU@YIdI@d6vx0Efg{#&S zrIQ#rWOV(ho2_3PV>Zq@m{Qn&qiaL`FK+$!?%nd1xVPq9e6OPH-_;wwTCLP5a$^dN zcTE&cTD!R!uY@iB%ltd#EqQ5rg(pq1bl`lyw7hsnH0%zd4q7@iF=*)^G}K}&rU_T9 zEy{2O-FT__(~TEjE6X(GT)SP?vV33S(Y0sC&-0u6>j*%XzF)QB>(wfqa$o$h&Rboi zf@B`Bv)4^a*cSEe#ioy!f2Q=Jp~ION6Zm4@qRvL+zPJ;B#^eu%4Hx=eP5YkM{MQYc zv zb!XlzS9vLTM(~dwv+{6qXQOFf+;Kp|)E6F85(5XI<&yv%yvJ0<8of*Cwua7}*@xpc z&5nQjd04Fe^v%3<{mP@s;mblwc2u5w`|^S_5wQ)yXB4g33!f z-CWs9t7$+7MEkBd3^8rmA6H29503$8Y22}$1RGkpMnA-u4o>u6vS)tBt*#H|ee%o% z&tH|TfW`8&&(sxsvm!q!G-*QFKdXx0F{pLQmB*9)rjKDQtg*4%XNngLXkX@BKflGG z^pyVe0rdEov6*>`O)cGfZM{do@JI?u+4I8vn)X8pfF|L9WR76PY%M($G>pl9&yS9O zZ`S>q4mkBZkJ5K7&;@uL$vVTBm_^&>hNXrWc2!sGt)mnqsidMeP7h6+pwQ7OodO!G z9 zk8J%i(mgk}k{j*^c^t!??(gx%m_4T5hqhgZJhF2A-EI_1BLs_U!YOy69o?91XmG1o zM#C^pyBRIJIVnyAw~Mjfm>veSEZ`6?NY|w_xl4@MyS!|Ds{7G)H-~Ux@OA03xUb9m zLFj_i;Y4P~ z!Zay0eB^UPDYHZ+JF2SQy`g-DcHuE|pIiRfcWGi$p7LG2Tkqn%4QE>sQZ%D=^6!0@ zZb|^-qo3G9K8~TLJ5BmmFR|5_LZyJj(j~VbU_Ab?_2467{hz`2mOF55U6HL2))Wv6 zV{QbSws%2bU|?UW?F5Ywg1QIh2e-eyU45wDRKbuOg$@0wKCHpRjS3QN7<_Q8(_eVs z7qlHEdPXXzXN3k&2=dVA287nYw-BVpMySiwW$Nj&Sj{kxx%4i=i{eRd~8@3dsPcox#uO g9i|`o^piyYA3KLr@}Cs({r~^~07*qoM6N<$f^m3?c>n+a literal 0 HcmV?d00001 diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index baa3ed6483..f8b3b6db92 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -115,6 +115,7 @@ "dist/credentials/MoceanApi.credentials.js", "dist/credentials/MondayComApi.credentials.js", "dist/credentials/MongoDb.credentials.js", + "dist/credentials/Mqtt.credentials.js", "dist/credentials/Msg91Api.credentials.js", "dist/credentials/MySql.credentials.js", "dist/credentials/NextCloudApi.credentials.js", @@ -288,6 +289,7 @@ "dist/nodes/Mocean/Mocean.node.js", "dist/nodes/MondayCom/MondayCom.node.js", "dist/nodes/MongoDb/MongoDb.node.js", + "dist/nodes/MQTT/MqttTrigger.node.js", "dist/nodes/MoveBinaryData.node.js", "dist/nodes/Msg91/Msg91.node.js", "dist/nodes/MySql/MySql.node.js", @@ -373,6 +375,7 @@ "@types/mailparser": "^2.7.3", "@types/moment-timezone": "^0.5.12", "@types/mongodb": "^3.5.4", + "@types/mqtt": "^2.5.0", "@types/mssql": "^6.0.2", "@types/node": "^14.0.27", "@types/nodemailer": "^6.4.0", @@ -409,6 +412,7 @@ "moment": "2.24.0", "moment-timezone": "^0.5.28", "mongodb": "^3.5.5", + "mqtt": "^4.2.0", "mssql": "^6.2.0", "mysql2": "^2.0.1", "n8n-core": "~0.43.0",