From ee7147c6b3b053ac8fc317319ab257204e599f16 Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Wed, 18 Sep 2024 20:03:18 +0300 Subject: [PATCH] fix(MQTT Node): Close connection if connection attempt fails (#10873) --- .../nodes-base/nodes/MQTT/GenericFunctions.ts | 5 +++ packages/nodes-base/nodes/MQTT/Mqtt.node.ts | 7 ++- .../nodes/MQTT/test/GenericFunctions.test.ts | 44 +++++++++++++++---- 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/packages/nodes-base/nodes/MQTT/GenericFunctions.ts b/packages/nodes-base/nodes/MQTT/GenericFunctions.ts index 7ff91fee55..e3deaba923 100644 --- a/packages/nodes-base/nodes/MQTT/GenericFunctions.ts +++ b/packages/nodes-base/nodes/MQTT/GenericFunctions.ts @@ -1,5 +1,6 @@ import { connect, type IClientOptions, type MqttClient } from 'mqtt'; import { ApplicationError, randomString } from 'n8n-workflow'; + import { formatPrivateKey } from '@utils/utilities'; interface BaseMqttCredential { @@ -62,6 +63,10 @@ export const createClient = async (credentials: MqttCredential): Promise { client.removeListener('connect', onConnect); client.removeListener('error', onError); + // mqtt client has an automatic reconnect mechanism that will + // keep trying to reconnect until it succeeds unless we + // explicitly close the client + client.end(); reject(new ApplicationError(error.message)); }; diff --git a/packages/nodes-base/nodes/MQTT/Mqtt.node.ts b/packages/nodes-base/nodes/MQTT/Mqtt.node.ts index bf0c889012..0ddc4ff924 100644 --- a/packages/nodes-base/nodes/MQTT/Mqtt.node.ts +++ b/packages/nodes-base/nodes/MQTT/Mqtt.node.ts @@ -8,6 +8,7 @@ import { type INodeType, type INodeTypeDescription, NodeConnectionType, + ensureError, } from 'n8n-workflow'; import { createClient, type MqttCredential } from './GenericFunctions'; @@ -116,10 +117,12 @@ export class Mqtt implements INodeType { try { const client = await createClient(credentials); client.end(); - } catch (error) { + } catch (e) { + const error = ensureError(e); + return { status: 'Error', - message: (error as Error).message, + message: error.message, }; } return { diff --git a/packages/nodes-base/nodes/MQTT/test/GenericFunctions.test.ts b/packages/nodes-base/nodes/MQTT/test/GenericFunctions.test.ts index 6d68777fe3..7b2a12ae40 100644 --- a/packages/nodes-base/nodes/MQTT/test/GenericFunctions.test.ts +++ b/packages/nodes-base/nodes/MQTT/test/GenericFunctions.test.ts @@ -1,19 +1,20 @@ -import { MqttClient } from 'mqtt'; import { mock } from 'jest-mock-extended'; +import { MqttClient } from 'mqtt'; +import { ApplicationError } from 'n8n-workflow'; import { createClient, type MqttCredential } from '../GenericFunctions'; describe('createClient', () => { - const mockConnect = jest.spyOn(MqttClient.prototype, 'connect').mockImplementation(function ( - this: MqttClient, - ) { - setImmediate(() => this.emit('connect', mock())); - return this; - }); - beforeEach(() => jest.clearAllMocks()); it('should create a client with minimal credentials', async () => { + const mockConnect = jest.spyOn(MqttClient.prototype, 'connect').mockImplementation(function ( + this: MqttClient, + ) { + setImmediate(() => this.emit('connect', mock())); + return this; + }); + const credentials = mock({ protocol: 'mqtt', host: 'localhost', @@ -35,4 +36,31 @@ describe('createClient', () => { clientId: 'testClient', }); }); + + it('should reject with ApplicationError on connection error and close connection', async () => { + const mockConnect = jest.spyOn(MqttClient.prototype, 'connect').mockImplementation(function ( + this: MqttClient, + ) { + setImmediate(() => this.emit('error', new Error('Connection failed'))); + return this; + }); + const mockEnd = jest.spyOn(MqttClient.prototype, 'end').mockImplementation(); + + const credentials: MqttCredential = { + protocol: 'mqtt', + host: 'localhost', + port: 1883, + clean: true, + clientId: 'testClientId', + username: 'testUser', + password: 'testPass', + ssl: false, + }; + + const clientPromise = createClient(credentials); + + await expect(clientPromise).rejects.toThrow(ApplicationError); + expect(mockConnect).toBeCalledTimes(1); + expect(mockEnd).toBeCalledTimes(1); + }); });