Update Trigger nodes close function to throw TriggerCloseError

This commit is contained in:
Shireen Missi 2024-10-04 16:16:30 +01:00
parent bb59cc71ac
commit 8a08f92ea7
No known key found for this signature in database
GPG key ID: D213F10998FACC51
6 changed files with 63 additions and 26 deletions

View file

@ -10,7 +10,13 @@ import type {
IDeferredPromise,
IRun,
} from 'n8n-workflow';
import { deepCopy, jsonParse, NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import {
deepCopy,
jsonParse,
NodeConnectionType,
NodeOperationError,
TriggerCloseError,
} from 'n8n-workflow';
export class AmqpTrigger implements INodeType {
description: INodeTypeDescription = {
@ -257,11 +263,16 @@ export class AmqpTrigger implements INodeType {
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
container.removeAllListeners('receiver_open');
container.removeAllListeners('message');
connection.close();
}
const closeFunction = async () => {
try {
container.removeAllListeners('receiver_open');
container.removeAllListeners('message');
connection.close();
} catch (error) {
// eslint-disable-next-line n8n-nodes-base/node-execute-block-wrong-error-thrown
throw new TriggerCloseError(this.getNode(), { cause: error as Error, level: 'warning' });
}
};
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the

View file

@ -11,7 +11,7 @@ import type {
ITriggerResponse,
IRun,
} from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError, TriggerCloseError } from 'n8n-workflow';
export class KafkaTrigger implements INodeType {
description: INodeTypeDescription = {
@ -297,9 +297,14 @@ export class KafkaTrigger implements INodeType {
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
await consumer.disconnect();
}
const closeFunction = async () => {
try {
await consumer.disconnect();
} catch (error) {
// eslint-disable-next-line n8n-nodes-base/node-execute-block-wrong-error-thrown
throw new TriggerCloseError(this.getNode(), { cause: error as Error, level: 'warning' });
}
};
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the

View file

@ -5,6 +5,7 @@ import {
type INodeTypeDescription,
type ITriggerResponse,
NodeConnectionType,
TriggerCloseError,
} from 'n8n-workflow';
import { watch } from 'chokidar';
@ -242,9 +243,14 @@ export class LocalFileTrigger implements INodeType {
watcher.on(eventName, (pathString) => executeTrigger(eventName, pathString as string));
}
async function closeFunction() {
return await watcher.close();
}
const closeFunction = async () => {
try {
return await watcher.close();
} catch (error) {
// eslint-disable-next-line n8n-nodes-base/node-execute-block-wrong-error-thrown
throw new TriggerCloseError(this.getNode(), { cause: error as Error, level: 'warning' });
}
};
return {
closeFunction,

View file

@ -8,7 +8,7 @@ import type {
ITriggerResponse,
IRun,
} from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError, TriggerCloseError } from 'n8n-workflow';
import { createClient, type MqttCredential } from './GenericFunctions';
@ -149,9 +149,14 @@ export class MqttTrigger implements INodeType {
await client.subscribeAsync(topicsQoS);
}
async function closeFunction() {
await client.endAsync();
}
const closeFunction = async () => {
try {
await client.endAsync();
} catch (error) {
// eslint-disable-next-line n8n-nodes-base/node-execute-block-wrong-error-thrown
throw new TriggerCloseError(this.getNode(), { cause: error as Error, level: 'warning' });
}
};
return {
closeFunction,

View file

@ -10,7 +10,7 @@ import type {
ITriggerFunctions,
ITriggerResponse,
} from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError, TriggerCloseError } from 'n8n-workflow';
import { rabbitDefaultOptions } from './DefaultOptions';
@ -229,9 +229,14 @@ export class RabbitMQTrigger implements INodeType {
};
const closeFunction = async () => {
await channel.close();
await channel.connection.close();
return;
try {
await channel.close();
await channel.connection.close();
return;
} catch (error) {
// eslint-disable-next-line n8n-nodes-base/node-execute-block-wrong-error-thrown
throw new TriggerCloseError(this.getNode(), { cause: error as Error, level: 'warning' });
}
};
return {

View file

@ -4,7 +4,7 @@ import type {
INodeTypeDescription,
ITriggerResponse,
} from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError, TriggerCloseError } from 'n8n-workflow';
import { redisConnectionTest, setupRedisClient } from './utils';
@ -110,10 +110,15 @@ export class RedisTrigger implements INodeType {
await client.pSubscribe(channels, onMessage);
}
async function closeFunction() {
await client.pUnsubscribe();
await client.quit();
}
const closeFunction = async () => {
try {
await client.pUnsubscribe();
await client.quit();
} catch (error) {
// eslint-disable-next-line n8n-nodes-base/node-execute-block-wrong-error-thrown
throw new TriggerCloseError(this.getNode(), { cause: error as Error, level: 'warning' });
}
};
return {
closeFunction,