n8n/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts
Omar Ajoue 7ce7285f7a
Load credentials from the database (#1741)
* Changes to types so that credentials can be always loaded from DB

This first commit changes all return types from the execute functions
and calls to get credentials to be async so we can use await.

This is a first step as previously credentials were loaded in memory and
always available. We will now be loading them from the DB which requires
turning the whole call chain async.

* Fix updated files

* Removed unnecessary credential loading to improve performance

* Fix typo

*  Fix issue

* Updated new nodes to load credentials async

*  Remove not needed comment

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-08-20 18:57:30 +02:00

172 lines
4 KiB
TypeScript

import {
ITriggerFunctions,
} from 'n8n-core';
import {
IDataObject,
INodeType,
INodeTypeDescription,
ITriggerResponse,
NodeOperationError,
} from 'n8n-workflow';
import * as mqtt from 'mqtt';
import {
IClientOptions, ISubscriptionMap,
} from 'mqtt';
export class MqttTrigger implements INodeType {
description: INodeTypeDescription = {
displayName: 'MQTT Trigger',
name: 'mqttTrigger',
icon: 'file:mqtt.svg',
group: ['trigger'],
version: 1,
description: 'Listens to MQTT events',
defaults: {
name: 'MQTT Trigger',
color: '#9b27af',
},
inputs: [],
outputs: ['main'],
credentials: [
{
name: 'mqtt',
required: true,
},
],
properties: [
{
displayName: 'Topics',
name: 'topics',
type: 'string',
default: '',
description: `Topics to subscribe to, multiple can be defined with comma.<br/>
wildcard characters are supported (+ - for single level and # - for multi level)<br>
By default all subscription used QoS=0. To set a different QoS, write the QoS desired<br>
after the topic preceded by a colom. For Example: topicA:1,topicB:2`,
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'JSON Parse Body',
name: 'jsonParseBody',
type: 'boolean',
default: false,
description: 'Try to parse the message to an object.',
},
{
displayName: 'Only Message',
name: 'onlyMessage',
type: 'boolean',
default: false,
description: 'Returns only the message property.',
},
],
},
],
};
async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {
const credentials = await this.getCredentials('mqtt');
if (!credentials) {
throw new NodeOperationError(this.getNode(), 'Credentials are mandatory!');
}
const topics = (this.getNodeParameter('topics') as string).split(',');
const topicsQoS: IDataObject = {};
for (const data of topics) {
const [topic, qos] = data.split(':');
topicsQoS[topic] = (qos) ? { qos: parseInt(qos, 10) } : { qos: 0 };
}
const options = this.getNodeParameter('options') as IDataObject;
if (!topics) {
throw new NodeOperationError(this.getNode(), 'Topics are mandatory!');
}
const protocol = credentials.protocol as string || 'mqtt';
const host = credentials.host as string;
const brokerUrl = `${protocol}://${host}`;
const port = credentials.port as number || 1883;
const clientId = credentials.clientId as string || `mqttjs_${Math.random().toString(16).substr(2, 8)}`;
const clean = credentials.clean as boolean;
const clientOptions: IClientOptions = {
port,
clean,
clientId,
};
if (credentials.username && credentials.password) {
clientOptions.username = credentials.username as string;
clientOptions.password = credentials.password as string;
}
const client = mqtt.connect(brokerUrl, clientOptions);
const self = this;
async function manualTriggerFunction() {
await new Promise((resolve, reject) => {
client.on('connect', () => {
client.subscribe(topicsQoS as ISubscriptionMap, (err, granted) => {
if (err) {
reject(err);
}
client.on('message', (topic: string, message: Buffer | string) => { // tslint:disable-line:no-any
let result: IDataObject = {};
message = message.toString() as string;
if (options.jsonParseBody) {
try {
message = JSON.parse(message.toString());
} catch (err) { }
}
result.message = message;
result.topic = topic;
if (options.onlyMessage) {
//@ts-ignore
result = [message as string];
}
self.emit([self.helpers.returnJsonArray(result)]);
resolve(true);
});
});
});
client.on('error', (error) => {
reject(error);
});
});
}
if (this.getMode() === 'trigger') {
manualTriggerFunction();
}
async function closeFunction() {
client.end();
}
return {
closeFunction,
manualTriggerFunction,
};
}
}