n8n/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

212 lines
6.4 KiB
TypeScript
Raw Normal View History

import type {
ITriggerFunctions,
IDataObject,
INodeType,
INodeTypeDescription,
ITriggerResponse,
IDeferredPromise,
IRun,
} from 'n8n-workflow';
import { NodeOperationError, randomString } from 'n8n-workflow';
import * as mqtt from 'mqtt';
import { formatPrivateKey } from '@utils/utilities';
export class MqttTrigger implements INodeType {
description: INodeTypeDescription = {
displayName: 'MQTT Trigger',
name: 'mqttTrigger',
icon: 'file:mqtt.svg',
group: ['trigger'],
version: 1,
description: 'Listens to MQTT events',
eventTriggerDescription: '',
defaults: {
name: 'MQTT Trigger',
},
triggerPanel: {
header: '',
executionsHelp: {
inactive:
"<b>While building your workflow</b>, click the 'listen' button, then trigger an MQTT event. This will trigger an execution, which will show up in this editor.<br /> <br /><b>Once you're happy with your workflow</b>, <a data-key='activate'>activate</a> it. Then every time a change is detected, the workflow will execute. These executions will show up in the <a data-key='executions'>executions list</a>, but not in the editor.",
active:
"<b>While building your workflow</b>, click the 'listen' button, then trigger an MQTT event. This will trigger an execution, which will show up in this editor.<br /> <br /><b>Your workflow will also execute automatically</b>, since it's activated. Every time a change is detected, this node will trigger an execution. These executions will show up in the <a data-key='executions'>executions list</a>, but not in the editor.",
},
activationHint:
"Once youve finished building your workflow, <a data-key='activate'>activate</a> it to have it also listen continuously (you just wont see those executions here).",
},
inputs: [],
outputs: ['main'],
credentials: [
{
name: 'mqtt',
required: true,
},
],
properties: [
{
displayName: 'Topics',
name: 'topics',
type: 'string',
default: '',
description:
'Topics to subscribe to, multiple can be defined with comma. Wildcard characters are supported (+ - for single level and # - for multi level). By default all subscription used QoS=0. To set a different QoS, write the QoS desired after the topic preceded by a colom. For Example: topicA:1,topicB:2',
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'JSON Parse Body',
name: 'jsonParseBody',
type: 'boolean',
default: false,
refactor: Apply more `eslint-plugin-n8n-nodes-base` rules (#3534) * :zap: Update `lintfix` script * :zap: Run baseline `lintfix` * :fire: Remove unneeded exceptions (#3538) * :fire: Remove exceptions for `node-param-default-wrong-for-simplify` * :fire: Remove exceptions for `node-param-placeholder-miscased-id` * :zap: Update version * :shirt: Apply `node-param-placeholder-missing` (#3542) * :shirt: Apply `filesystem-wrong-cred-filename` (#3543) * :shirt: Apply `node-param-description-missing-from-dynamic-options` (#3545) Co-authored-by: Iván Ovejero <ivov.src@gmail.com> * :shirt: Apply `node-class-description-empty-string` (#3546) * :shirt: Apply `node-class-description-icon-not-svg` (#3548) * :shirt: Apply `filesystem-wrong-node-filename` (#3549) Co-authored-by: Iván Ovejero <ivov.src@gmail.com> * :shirt: Expand lintings to credentials (#3550) * :shirt: Apply `node-param-multi-options-type-unsorted-items` (#3552) * :zap: fix * :zap: Minor fixes Co-authored-by: Michael Kret <michael.k@radency.com> * :shirt: Apply `node-param-description-wrong-for-dynamic-multi-options` (#3541) * :zap: Add new lint rule, node-param-description-wrong-for-dynamic-multi-options * :zap: Fix with updated linting rules * :zap: Minor fixes Co-authored-by: Iván Ovejero <ivov.src@gmail.com> * :shirt: Apply `node-param-description-boolean-without-whether` (#3553) * :zap: fix * Update packages/nodes-base/nodes/Clockify/ProjectDescription.ts Co-authored-by: Iván Ovejero <ivov.src@gmail.com> * :shirt: Apply node-param-display-name-wrong-for-dynamic-multi-options (#3537) * :shirt: Add exceptions * :shirt: Add exception * :pencil2: Alphabetize rules * :zap: Restore `lintfix` command Co-authored-by: agobrech <45268029+agobrech@users.noreply.github.com> Co-authored-by: Omar Ajoue <krynble@gmail.com> Co-authored-by: Michael Kret <michael.k@radency.com> Co-authored-by: brianinoa <54530642+brianinoa@users.noreply.github.com> Co-authored-by: Michael Kret <88898367+michael-radency@users.noreply.github.com>
2022-06-20 07:54:01 -07:00
description: 'Whether to try parse the message to an object',
},
{
displayName: 'Only Message',
name: 'onlyMessage',
type: 'boolean',
default: false,
refactor: Apply more `eslint-plugin-n8n-nodes-base` rules (#3534) * :zap: Update `lintfix` script * :zap: Run baseline `lintfix` * :fire: Remove unneeded exceptions (#3538) * :fire: Remove exceptions for `node-param-default-wrong-for-simplify` * :fire: Remove exceptions for `node-param-placeholder-miscased-id` * :zap: Update version * :shirt: Apply `node-param-placeholder-missing` (#3542) * :shirt: Apply `filesystem-wrong-cred-filename` (#3543) * :shirt: Apply `node-param-description-missing-from-dynamic-options` (#3545) Co-authored-by: Iván Ovejero <ivov.src@gmail.com> * :shirt: Apply `node-class-description-empty-string` (#3546) * :shirt: Apply `node-class-description-icon-not-svg` (#3548) * :shirt: Apply `filesystem-wrong-node-filename` (#3549) Co-authored-by: Iván Ovejero <ivov.src@gmail.com> * :shirt: Expand lintings to credentials (#3550) * :shirt: Apply `node-param-multi-options-type-unsorted-items` (#3552) * :zap: fix * :zap: Minor fixes Co-authored-by: Michael Kret <michael.k@radency.com> * :shirt: Apply `node-param-description-wrong-for-dynamic-multi-options` (#3541) * :zap: Add new lint rule, node-param-description-wrong-for-dynamic-multi-options * :zap: Fix with updated linting rules * :zap: Minor fixes Co-authored-by: Iván Ovejero <ivov.src@gmail.com> * :shirt: Apply `node-param-description-boolean-without-whether` (#3553) * :zap: fix * Update packages/nodes-base/nodes/Clockify/ProjectDescription.ts Co-authored-by: Iván Ovejero <ivov.src@gmail.com> * :shirt: Apply node-param-display-name-wrong-for-dynamic-multi-options (#3537) * :shirt: Add exceptions * :shirt: Add exception * :pencil2: Alphabetize rules * :zap: Restore `lintfix` command Co-authored-by: agobrech <45268029+agobrech@users.noreply.github.com> Co-authored-by: Omar Ajoue <krynble@gmail.com> Co-authored-by: Michael Kret <michael.k@radency.com> Co-authored-by: brianinoa <54530642+brianinoa@users.noreply.github.com> Co-authored-by: Michael Kret <88898367+michael-radency@users.noreply.github.com>
2022-06-20 07:54:01 -07:00
description: 'Whether to return only the message property',
},
{
displayName: 'Parallel Processing',
name: 'parallelProcessing',
type: 'boolean',
default: true,
description:
'Whether to process messages in parallel or by keeping the message in order',
},
],
},
],
};
async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {
const credentials = await this.getCredentials('mqtt');
const topics = (this.getNodeParameter('topics') as string).split(',');
const topicsQoS: IDataObject = {};
for (const data of topics) {
const [topic, qos] = data.split(':');
topicsQoS[topic] = qos ? { qos: parseInt(qos, 10) } : { qos: 0 };
}
const options = this.getNodeParameter('options') as IDataObject;
const parallelProcessing = this.getNodeParameter('options.parallelProcessing', true) as boolean;
if (!topics) {
:sparkles: Improve node error handling (#1309) * Add path mapping and response error interfaces * Add error handling and throwing functionality * Refactor error handling into a single function * Re-implement error handling in Hacker News node * Fix linting details * Re-implement error handling in Spotify node * Re-implement error handling in G Suite Admin node * :construction: create basic setup NodeError * :construction: add httpCodes * :construction: add path priolist * :construction: handle statusCode in error, adjust interfaces * :construction: fixing type issues w/Ivan * :construction: add error exploration * 👔 fix linter issues * :wrench: improve object check * :construction: remove path passing from NodeApiError * :construction: add multi error + refactor findProperty method * 👔 allow any * :wrench: handle multi error message callback * :zap: change return type of callback * :zap: add customCallback to MultiError * :construction: refactor to use INode * :hammer: handle arrays, continue search after first null property found * 🚫 refactor method access * :construction: setup NodeErrorView * :zap: change timestamp to Date.now * :books: Add documentation for methods and constants * :construction: change message setting * 🚚 move NodeErrors to workflow * :sparkles: add new ErrorView for Nodes * :art: improve error notification * :art: refactor interfaces * :zap: add WorkflowOperationError, refactor error throwing * 👕 fix linter issues * :art: rename param * :bug: fix handling normal errors * :zap: add usage of NodeApiError * :art: fix throw new error instead of constructor * :art: remove unnecessary code/comments * :art: adjusted spacing + updated status messages * :art: fix tab indentation * ✨ Replace current errors with custom errors (#1576) * :zap: Introduce NodeApiError in catch blocks * :zap: Introduce NodeOperationError in nodes * :zap: Add missing errors and remove incompatible * :zap: Fix NodeOperationError in incompatible nodes * :wrench: Adjust error handling in missed nodes PayPal, FileMaker, Reddit, Taiga and Facebook Graph API nodes * :hammer: Adjust Strava Trigger node error handling * :hammer: Adjust AWS nodes error handling * :hammer: Remove duplicate instantiation of NodeApiError * :bug: fix strava trigger node error handling * Add XML parsing to NodeApiError constructor (#1633) * :bug: Remove type annotation from catch variable * :sparkles: Add XML parsing to NodeApiError * :zap: Simplify error handling in Rekognition node * :zap: Pass in XML flag in generic functions * :fire: Remove try/catch wrappers at call sites * :hammer: Refactor setting description from XML * :hammer: Refactor let to const in resource loaders * :zap: Find property in parsed XML * :zap: Change let to const * :fire: Remove unneeded try/catch block * :shirt: Fix linting issues * :bug: Fix errors from merge conflict resolution * :zap: Add custom errors to latest contributions * :shirt: Fix linting issues * :zap: Refactor MongoDB helpers for custom errors * :bug: Correct custom error type * :zap: Apply feedback to A nodes * :zap: Apply feedback to missed A node * :zap: Apply feedback to B-D nodes * :zap: Apply feedback to E-F nodes * :zap: Apply feedback to G nodes * :zap: Apply feedback to H-L nodes * :zap: Apply feedback to M nodes * :zap: Apply feedback to P nodes * :zap: Apply feedback to R nodes * :zap: Apply feedback to S nodes * :zap: Apply feedback to T nodes * :zap: Apply feedback to V-Z nodes * :zap: Add HTTP code to iterable node error * :hammer: Standardize e as error * :hammer: Standardize err as error * :zap: Fix error handling for non-standard nodes Co-authored-by: Ben Hesseldieck <b.hesseldieck@gmail.com> Co-authored-by: Ben Hesseldieck <b.hesseldieck@gmail.com> Co-authored-by: Ben Hesseldieck <1849459+BHesseldieck@users.noreply.github.com>
2021-04-16 09:33:36 -07:00
throw new NodeOperationError(this.getNode(), 'Topics are mandatory!');
}
const protocol = (credentials.protocol as string) || 'mqtt';
const host = credentials.host as string;
const brokerUrl = `${protocol}://${host}`;
const port = (credentials.port as number) || 1883;
const clientId = (credentials.clientId as string) || `mqttjs_${randomString(8).toLowerCase()}`;
const clean = credentials.clean as boolean;
const ssl = credentials.ssl as boolean;
const ca = formatPrivateKey(credentials.ca as string);
const cert = formatPrivateKey(credentials.cert as string);
const key = formatPrivateKey(credentials.key as string);
const rejectUnauthorized = credentials.rejectUnauthorized as boolean;
let client: mqtt.MqttClient;
if (!ssl) {
const clientOptions: mqtt.IClientOptions = {
port,
clean,
clientId,
};
if (credentials.username && credentials.password) {
clientOptions.username = credentials.username as string;
clientOptions.password = credentials.password as string;
}
client = mqtt.connect(brokerUrl, clientOptions);
} else {
const clientOptions: mqtt.IClientOptions = {
port,
clean,
clientId,
ca,
cert,
key,
rejectUnauthorized,
};
if (credentials.username && credentials.password) {
clientOptions.username = credentials.username as string;
clientOptions.password = credentials.password as string;
}
client = mqtt.connect(brokerUrl, clientOptions);
}
const manualTriggerFunction = async () => {
await new Promise((resolve, reject) => {
client.on('connect', () => {
client.subscribe(topicsQoS as mqtt.ISubscriptionMap, (error, _granted) => {
if (error) {
reject(error);
}
client.on('message', async (topic: string, message: Buffer | string) => {
let result: IDataObject = {};
message = message.toString();
if (options.jsonParseBody) {
try {
message = JSON.parse(message.toString());
} catch (e) {}
}
result.message = message;
result.topic = topic;
if (options.onlyMessage) {
//@ts-ignore
result = [message as string];
}
let responsePromise: IDeferredPromise<IRun> | undefined;
if (!parallelProcessing) {
responsePromise = await this.helpers.createDeferredPromise();
}
this.emit([this.helpers.returnJsonArray([result])], undefined, responsePromise);
if (responsePromise) {
await responsePromise.promise();
}
resolve(true);
});
});
});
client.on('error', (error) => {
reject(error);
});
});
};
if (this.getMode() === 'trigger') {
void manualTriggerFunction();
}
async function closeFunction() {
client.end();
}
return {
closeFunction,
manualTriggerFunction,
};
}
}