diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index 249391fd9d..265ccc9c87 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -10,7 +10,13 @@ import type { IDeferredPromise, IRun, } from 'n8n-workflow'; -import { deepCopy, jsonParse, NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import { + deepCopy, + jsonParse, + NodeConnectionType, + NodeOperationError, + TriggerCloseError, +} from 'n8n-workflow'; export class AmqpTrigger implements INodeType { description: INodeTypeDescription = { @@ -257,11 +263,16 @@ export class AmqpTrigger implements INodeType { // The "closeFunction" function gets called by n8n whenever // the workflow gets deactivated and can so clean up. - async function closeFunction() { - container.removeAllListeners('receiver_open'); - container.removeAllListeners('message'); - connection.close(); - } + const closeFunction = async () => { + try { + container.removeAllListeners('receiver_open'); + container.removeAllListeners('message'); + connection.close(); + } catch (error) { + // eslint-disable-next-line n8n-nodes-base/node-execute-block-wrong-error-thrown + throw new TriggerCloseError(this.getNode(), { cause: error as Error, level: 'warning' }); + } + }; // The "manualTriggerFunction" function gets called by n8n // when a user is in the workflow editor and starts the diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 631566ffbd..54e15371fe 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -11,7 +11,7 @@ import type { ITriggerResponse, IRun, } from 'n8n-workflow'; -import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import { NodeConnectionType, NodeOperationError, TriggerCloseError } from 'n8n-workflow'; export class KafkaTrigger implements INodeType { description: INodeTypeDescription = { @@ -297,9 +297,14 @@ export class KafkaTrigger implements INodeType { // The "closeFunction" function gets called by n8n whenever // the workflow gets deactivated and can so clean up. - async function closeFunction() { - await consumer.disconnect(); - } + const closeFunction = async () => { + try { + await consumer.disconnect(); + } catch (error) { + // eslint-disable-next-line n8n-nodes-base/node-execute-block-wrong-error-thrown + throw new TriggerCloseError(this.getNode(), { cause: error as Error, level: 'warning' }); + } + }; // The "manualTriggerFunction" function gets called by n8n // when a user is in the workflow editor and starts the diff --git a/packages/nodes-base/nodes/LocalFileTrigger/LocalFileTrigger.node.ts b/packages/nodes-base/nodes/LocalFileTrigger/LocalFileTrigger.node.ts index 88bed5162c..f80b8b3340 100644 --- a/packages/nodes-base/nodes/LocalFileTrigger/LocalFileTrigger.node.ts +++ b/packages/nodes-base/nodes/LocalFileTrigger/LocalFileTrigger.node.ts @@ -5,6 +5,7 @@ import { type INodeTypeDescription, type ITriggerResponse, NodeConnectionType, + TriggerCloseError, } from 'n8n-workflow'; import { watch } from 'chokidar'; @@ -242,9 +243,14 @@ export class LocalFileTrigger implements INodeType { watcher.on(eventName, (pathString) => executeTrigger(eventName, pathString as string)); } - async function closeFunction() { - return await watcher.close(); - } + const closeFunction = async () => { + try { + return await watcher.close(); + } catch (error) { + // eslint-disable-next-line n8n-nodes-base/node-execute-block-wrong-error-thrown + throw new TriggerCloseError(this.getNode(), { cause: error as Error, level: 'warning' }); + } + }; return { closeFunction, diff --git a/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts index e9cea7412c..47f35f013e 100644 --- a/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts +++ b/packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts @@ -8,7 +8,7 @@ import type { ITriggerResponse, IRun, } from 'n8n-workflow'; -import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import { NodeConnectionType, NodeOperationError, TriggerCloseError } from 'n8n-workflow'; import { createClient, type MqttCredential } from './GenericFunctions'; @@ -149,9 +149,14 @@ export class MqttTrigger implements INodeType { await client.subscribeAsync(topicsQoS); } - async function closeFunction() { - await client.endAsync(); - } + const closeFunction = async () => { + try { + await client.endAsync(); + } catch (error) { + // eslint-disable-next-line n8n-nodes-base/node-execute-block-wrong-error-thrown + throw new TriggerCloseError(this.getNode(), { cause: error as Error, level: 'warning' }); + } + }; return { closeFunction, diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts index c57659b2c7..3e0875f6f7 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts @@ -10,7 +10,7 @@ import type { ITriggerFunctions, ITriggerResponse, } from 'n8n-workflow'; -import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import { NodeConnectionType, NodeOperationError, TriggerCloseError } from 'n8n-workflow'; import { rabbitDefaultOptions } from './DefaultOptions'; @@ -229,9 +229,14 @@ export class RabbitMQTrigger implements INodeType { }; const closeFunction = async () => { - await channel.close(); - await channel.connection.close(); - return; + try { + await channel.close(); + await channel.connection.close(); + return; + } catch (error) { + // eslint-disable-next-line n8n-nodes-base/node-execute-block-wrong-error-thrown + throw new TriggerCloseError(this.getNode(), { cause: error as Error, level: 'warning' }); + } }; return { diff --git a/packages/nodes-base/nodes/Redis/RedisTrigger.node.ts b/packages/nodes-base/nodes/Redis/RedisTrigger.node.ts index 34b4bb8d62..71633aecbd 100644 --- a/packages/nodes-base/nodes/Redis/RedisTrigger.node.ts +++ b/packages/nodes-base/nodes/Redis/RedisTrigger.node.ts @@ -4,7 +4,7 @@ import type { INodeTypeDescription, ITriggerResponse, } from 'n8n-workflow'; -import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import { NodeConnectionType, NodeOperationError, TriggerCloseError } from 'n8n-workflow'; import { redisConnectionTest, setupRedisClient } from './utils'; @@ -110,10 +110,15 @@ export class RedisTrigger implements INodeType { await client.pSubscribe(channels, onMessage); } - async function closeFunction() { - await client.pUnsubscribe(); - await client.quit(); - } + const closeFunction = async () => { + try { + await client.pUnsubscribe(); + await client.quit(); + } catch (error) { + // eslint-disable-next-line n8n-nodes-base/node-execute-block-wrong-error-thrown + throw new TriggerCloseError(this.getNode(), { cause: error as Error, level: 'warning' }); + } + }; return { closeFunction,