From 6c773d7a862b910e6bd503b74fc45491c03ca73e Mon Sep 17 00:00:00 2001 From: Ricardo Espinoza Date: Fri, 30 Apr 2021 21:23:25 -0400 Subject: [PATCH] :sparkles: Add MQTT & Trigger Node (#1705) * :sparkles: MQTT-Node * :zap: Small fix * :zap: Error when the publish method faile * :zap: Improvements * :zap: Improvements * :zap: Add Send Input Data parameter * :zap: Minor improvements Co-authored-by: Jan Oberhauser --- .../credentials/Mqtt.credentials.ts | 18 +- packages/nodes-base/nodes/MQTT/Mqtt.node.json | 21 +++ packages/nodes-base/nodes/MQTT/Mqtt.node.ts | 171 ++++++++++++++++++ .../nodes-base/nodes/MQTT/MqttTrigger.node.ts | 49 +++-- packages/nodes-base/nodes/MQTT/mqtt.png | Bin 2310 -> 0 bytes packages/nodes-base/nodes/MQTT/mqtt.svg | 21 +++ packages/nodes-base/package.json | 1 + 7 files changed, 259 insertions(+), 22 deletions(-) create mode 100644 packages/nodes-base/nodes/MQTT/Mqtt.node.json create mode 100644 packages/nodes-base/nodes/MQTT/Mqtt.node.ts delete mode 100644 packages/nodes-base/nodes/MQTT/mqtt.png create mode 100644 packages/nodes-base/nodes/MQTT/mqtt.svg diff --git a/packages/nodes-base/credentials/Mqtt.credentials.ts b/packages/nodes-base/credentials/Mqtt.credentials.ts index 05d4b0ec5c..a474e0501b 100644 --- a/packages/nodes-base/credentials/Mqtt.credentials.ts +++ b/packages/nodes-base/credentials/Mqtt.credentials.ts @@ -3,15 +3,11 @@ import { NodePropertyTypes, } from 'n8n-workflow'; - export class Mqtt implements ICredentialType { name = 'mqtt'; displayName = 'MQTT'; documentationUrl = 'mqtt'; properties = [ - // The credentials to get from user and save encrypted. - // Properties can be defined exactly in the same way - // as node properties. { displayName: 'Protocol', name: 'protocol', @@ -55,5 +51,19 @@ export class Mqtt implements ICredentialType { }, default: '', }, + { + displayName: 'Clean Session', + name: 'clean', + type: 'boolean' as NodePropertyTypes, + default: true, + description: `Set to false to receive QoS 1 and 2 messages while offline.`, + }, + { + displayName: 'Client ID', + name: 'clientId', + type: 'string' as NodePropertyTypes, + default: '', + description: 'Client ID. If left empty, one is autogenrated for you', + }, ]; } diff --git a/packages/nodes-base/nodes/MQTT/Mqtt.node.json b/packages/nodes-base/nodes/MQTT/Mqtt.node.json new file mode 100644 index 0000000000..595ac2b74b --- /dev/null +++ b/packages/nodes-base/nodes/MQTT/Mqtt.node.json @@ -0,0 +1,21 @@ +{ + "node": "n8n-nodes-base.mqtt", + "nodeVersion": "1.0", + "codexVersion": "1.0", + "categories": [ + "Communication", + "Development" + ], + "resources": { + "credentialDocumentation": [ + { + "url": "https://docs.n8n.io/credentials/mqtt" + } + ], + "primaryDocumentation": [ + { + "url": "https://docs.n8n.io/nodes/n8n-nodes-base.mqtt/" + } + ] + } +} \ No newline at end of file diff --git a/packages/nodes-base/nodes/MQTT/Mqtt.node.ts b/packages/nodes-base/nodes/MQTT/Mqtt.node.ts new file mode 100644 index 0000000000..fc71fe6221 --- /dev/null +++ b/packages/nodes-base/nodes/MQTT/Mqtt.node.ts @@ -0,0 +1,171 @@ +import { + IExecuteFunctions, +} from 'n8n-core'; + +import { + IDataObject, + INodeExecutionData, + INodeType, + INodeTypeDescription, +} from 'n8n-workflow'; + +import * as mqtt from 'mqtt'; + +import { + IClientOptions, +} from 'mqtt'; + +export class Mqtt implements INodeType { + description: INodeTypeDescription = { + displayName: 'MQTT', + name: 'mqtt', + icon: 'file:mqtt.svg', + group: ['input'], + version: 1, + description: 'Push messages to MQTT', + defaults: { + name: 'MQTT', + color: '#9b27af', + }, + inputs: ['main'], + outputs: ['main'], + credentials: [ + { + name: 'mqtt', + required: true, + }, + ], + properties: [ + { + displayName: 'Topic', + name: 'topic', + type: 'string', + required: true, + default: '', + description: `The topic to publish to`, + }, + { + displayName: 'Send Input Data', + name: 'sendInputData', + type: 'boolean', + default: true, + description: 'Send the the data the node receives as JSON.', + }, + { + displayName: 'Message', + name: 'message', + type: 'string', + required: true, + displayOptions: { + show: { + sendInputData: [ + false, + ], + }, + }, + default: '', + description: 'The message to publish', + }, + { + displayName: 'Options', + name: 'options', + type: 'collection', + placeholder: 'Add Option', + default: {}, + options: [ + { + displayName: 'QoS', + name: 'qos', + type: 'options', + options: [ + { + name: 'Received at Most Once', + value: 0, + }, + { + name: 'Received at Least Once', + value: 1, + }, + { + name: 'Exactly Once', + value: 2, + }, + ], + default: 0, + description: 'QoS subscription level', + }, + { + displayName: 'Retain', + name: 'retain', + type: 'boolean', + default: false, + description: `Normally if a publisher publishes a message to a topic, and no one is subscribed to
+ that topic the message is simply discarded by the broker. However the publisher can tell the broker
+ to keep the last message on that topic by setting the retain flag to true.`, + }, + ], + }, + ], + }; + + async execute(this: IExecuteFunctions): Promise { + const items = this.getInputData(); + const length = (items.length as unknown) as number; + const credentials = this.getCredentials('mqtt') as IDataObject; + + const protocol = credentials.protocol as string || 'mqtt'; + const host = credentials.host as string; + const brokerUrl = `${protocol}://${host}`; + const port = credentials.port as number || 1883; + const clientId = credentials.clientId as string || `mqttjs_${Math.random().toString(16).substr(2, 8)}`; + const clean = credentials.clean as boolean; + + const clientOptions: IClientOptions = { + port, + clean, + clientId, + }; + + if (credentials.username && credentials.password) { + clientOptions.username = credentials.username as string; + clientOptions.password = credentials.password as string; + } + + const client = mqtt.connect(brokerUrl, clientOptions); + const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean; + + // tslint:disable-next-line: no-any + const data = await new Promise((resolve, reject): any => { + client.on('connect', () => { + for (let i = 0; i < length; i++) { + + let message; + const topic = (this.getNodeParameter('topic', i) as string); + const options = (this.getNodeParameter('options', i) as IDataObject); + + try { + if (sendInputData === true) { + message = JSON.stringify(items[i].json); + } else { + message = this.getNodeParameter('message', i) as string; + } + client.publish(topic, message, options); + } catch (e) { + reject(e); + } + } + //wait for the in-flight messages to be acked. + //needed for messages with QoS 1 & 2 + client.end(false, {}, () => { + resolve([items]); + }); + + client.on('error', (e: string | undefined) => { + reject(e); + }); + }); + }); + + return data as INodeExecutionData[][]; + } +} diff --git a/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts index c99078e919..94bbda8c18 100644 --- a/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts +++ b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts @@ -13,14 +13,14 @@ import { import * as mqtt from 'mqtt'; import { - IClientOptions, + IClientOptions, ISubscriptionMap, } from 'mqtt'; export class MqttTrigger implements INodeType { description: INodeTypeDescription = { displayName: 'MQTT Trigger', name: 'mqttTrigger', - icon: 'file:mqtt.png', + icon: 'file:mqtt.svg', group: ['trigger'], version: 1, description: 'Listens to MQTT events', @@ -43,7 +43,9 @@ export class MqttTrigger implements INodeType { type: 'string', default: '', description: `Topics to subscribe to, multiple can be defined with comma.
- wildcard characters are supported (+ - for single level and # - for multi level)`, + wildcard characters are supported (+ - for single level and # - for multi level)
+ By default all subscription used QoS=0. To set a different QoS, write the QoS desired
+ after the topic preceded by a colom. For Example: topicA:1,topicB:2`, }, { displayName: 'Options', @@ -52,6 +54,13 @@ export class MqttTrigger implements INodeType { placeholder: 'Add Option', default: {}, options: [ + { + displayName: 'JSON Parse Body', + name: 'jsonParseBody', + type: 'boolean', + default: false, + description: 'Try to parse the message to an object.', + }, { displayName: 'Only Message', name: 'onlyMessage', @@ -59,13 +68,6 @@ export class MqttTrigger implements INodeType { default: false, description: 'Returns only the message property.', }, - { - displayName: 'JSON Parse Message', - name: 'jsonParseMessage', - type: 'boolean', - default: false, - description: 'Try to parse the message to an object.', - }, ], }, ], @@ -81,6 +83,13 @@ export class MqttTrigger implements INodeType { const topics = (this.getNodeParameter('topics') as string).split(','); + const topicsQoS: IDataObject = {}; + + for (const data of topics) { + const [topic, qos] = data.split(':'); + topicsQoS[topic] = (qos) ? { qos: parseInt(qos, 10) } : { qos: 0 }; + } + const options = this.getNodeParameter('options') as IDataObject; if (!topics) { @@ -91,9 +100,13 @@ export class MqttTrigger implements INodeType { const host = credentials.host as string; const brokerUrl = `${protocol}://${host}`; const port = credentials.port as number || 1883; + const clientId = credentials.clientId as string || `mqttjs_${Math.random().toString(16).substr(2, 8)}`; + const clean = credentials.clean as boolean; const clientOptions: IClientOptions = { port, + clean, + clientId, }; if (credentials.username && credentials.password) { @@ -108,20 +121,19 @@ export class MqttTrigger implements INodeType { async function manualTriggerFunction() { await new Promise((resolve, reject) => { client.on('connect', () => { - client.subscribe(topics, (err, granted) => { + client.subscribe(topicsQoS as ISubscriptionMap, (err, granted) => { if (err) { reject(err); } client.on('message', (topic: string, message: Buffer | string) => { // tslint:disable-line:no-any - let result: IDataObject = {}; message = message.toString() as string; - if (options.jsonParseMessage) { + if (options.jsonParseBody) { try { message = JSON.parse(message.toString()); - } catch (error) { } + } catch (err) { } } result.message = message; @@ -129,10 +141,9 @@ export class MqttTrigger implements INodeType { if (options.onlyMessage) { //@ts-ignore - result = message; + result = [message as string]; } - - self.emit([self.helpers.returnJsonArray([result])]); + self.emit([self.helpers.returnJsonArray(result)]); resolve(true); }); }); @@ -144,7 +155,9 @@ export class MqttTrigger implements INodeType { }); } - manualTriggerFunction(); + if (this.getMode() === 'trigger') { + manualTriggerFunction(); + } async function closeFunction() { client.end(); diff --git a/packages/nodes-base/nodes/MQTT/mqtt.png b/packages/nodes-base/nodes/MQTT/mqtt.png deleted file mode 100644 index 12c5f24952b3cc910c95bbd7809f443be26385cd..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2310 zcmV+h3HkPkP)q_E1s6v?#7r7 z%bcyxM!zw0Kvh39%mcJA^YbP{8>J$jqFA1Ii=c~CW)-B$ut%UV5y!+MOV72}c9F`b zk4CJh3L~=h%fN|ak4B3aDp*$BcHQ{nKqGZHm32CWz$nM^p9}iv(&F~o5%dUp1U-Tt zL64wE&<<#x6$D0r8Gh^_-YG9#X(WazOI#XsBv!Xg{=2uT^Vt5@$Nu$~)TD-16RXRYD77@KUb3U&+?LDFEf3H5Z~>3_&whKU-(kWrkO4~p`Qesfk^ zz482}a#BMfR%oth#R-byDeY7`>y%6A2lu*lV?9d$b+!1N%W8#YYQ~hsJEJ-pP2NGC zfIj!UfaxbS{$JE^9vMkadW%ojG#ffe(! z&Wsx6#WQHke6JOGNu7;d`{D{oKufo)(jEm1nw%K4EH}RSYU@YIdI@d6vx0Efg{#&S zrIQ#rWOV(ho2_3PV>Zq@m{Qn&qiaL`FK+$!?%nd1xVPq9e6OPH-_;wwTCLP5a$^dN zcTE&cTD!R!uY@iB%ltd#EqQ5rg(pq1bl`lyw7hsnH0%zd4q7@iF=*)^G}K}&rU_T9 zEy{2O-FT__(~TEjE6X(GT)SP?vV33S(Y0sC&-0u6>j*%XzF)QB>(wfqa$o$h&Rboi zf@B`Bv)4^a*cSEe#ioy!f2Q=Jp~ION6Zm4@qRvL+zPJ;B#^eu%4Hx=eP5YkM{MQYc zv zb!XlzS9vLTM(~dwv+{6qXQOFf+;Kp|)E6F85(5XI<&yv%yvJ0<8of*Cwua7}*@xpc z&5nQjd04Fe^v%3<{mP@s;mblwc2u5w`|^S_5wQ)yXB4g33!f z-CWs9t7$+7MEkBd3^8rmA6H29503$8Y22}$1RGkpMnA-u4o>u6vS)tBt*#H|ee%o% z&tH|TfW`8&&(sxsvm!q!G-*QFKdXx0F{pLQmB*9)rjKDQtg*4%XNngLXkX@BKflGG z^pyVe0rdEov6*>`O)cGfZM{do@JI?u+4I8vn)X8pfF|L9WR76PY%M($G>pl9&yS9O zZ`S>q4mkBZkJ5K7&;@uL$vVTBm_^&>hNXrWc2!sGt)mnqsidMeP7h6+pwQ7OodO!G z9 zk8J%i(mgk}k{j*^c^t!??(gx%m_4T5hqhgZJhF2A-EI_1BLs_U!YOy69o?91XmG1o zM#C^pyBRIJIVnyAw~Mjfm>veSEZ`6?NY|w_xl4@MyS!|Ds{7G)H-~Ux@OA03xUb9m zLFj_i;Y4P~ z!Zay0eB^UPDYHZ+JF2SQy`g-DcHuE|pIiRfcWGi$p7LG2Tkqn%4QE>sQZ%D=^6!0@ zZb|^-qo3G9K8~TLJ5BmmFR|5_LZyJj(j~VbU_Ab?_2467{hz`2mOF55U6HL2))Wv6 zV{QbSws%2bU|?UW?F5Ywg1QIh2e-eyU45wDRKbuOg$@0wKCHpRjS3QN7<_Q8(_eVs z7qlHEdPXXzXN3k&2=dVA287nYw-BVpMySiwW$Nj&Sj{kxx%4i=i{eRd~8@3dsPcox#uO g9i|`o^piyYA3KLr@}Cs({r~^~07*qoM6N<$f^m3?c>n+a diff --git a/packages/nodes-base/nodes/MQTT/mqtt.svg b/packages/nodes-base/nodes/MQTT/mqtt.svg new file mode 100644 index 0000000000..3e202aa2ce --- /dev/null +++ b/packages/nodes-base/nodes/MQTT/mqtt.svg @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index 963d6c5912..516fa72e4e 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -439,6 +439,7 @@ "dist/nodes/Mocean/Mocean.node.js", "dist/nodes/MondayCom/MondayCom.node.js", "dist/nodes/MongoDb/MongoDb.node.js", + "dist/nodes/MQTT/Mqtt.node.js", "dist/nodes/MQTT/MqttTrigger.node.js", "dist/nodes/MoveBinaryData.node.js", "dist/nodes/Msg91/Msg91.node.js",