mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-26 13:14:07 -08:00
61e26804ba
* ⚡ enabled array-type * ⚡ await-thenable on * ⚡ ban-types on * ⚡ default-param-last on * ⚡ dot-notation on * ⚡ member-delimiter-style on * ⚡ no-duplicate-imports on * ⚡ no-empty-interface on * ⚡ no-floating-promises on * ⚡ no-for-in-array on * ⚡ no-invalid-void-type on * ⚡ no-loop-func on * ⚡ no-shadow on * ⚡ ban-ts-comment re enabled * ⚡ @typescript-eslint/lines-between-class-members on * address my own comment * @typescript-eslint/return-await on * @typescript-eslint/promise-function-async on * @typescript-eslint/no-unnecessary-boolean-literal-compare on * @typescript-eslint/no-unnecessary-type-assertion on * prefer-const on * @typescript-eslint/prefer-optional-chain on Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <aditya@netroy.in>
150 lines
4.1 KiB
TypeScript
150 lines
4.1 KiB
TypeScript
import { IDataObject, IExecuteFunctions, ITriggerFunctions, sleep } from 'n8n-workflow';
|
|
|
|
import * as amqplib from 'amqplib';
|
|
|
|
declare module 'amqplib' {
|
|
interface Channel {
|
|
connection: amqplib.Connection;
|
|
}
|
|
}
|
|
|
|
export async function rabbitmqConnect(
|
|
this: IExecuteFunctions | ITriggerFunctions,
|
|
options: IDataObject,
|
|
): Promise<amqplib.Channel> {
|
|
const credentials = await this.getCredentials('rabbitmq');
|
|
|
|
const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost'];
|
|
|
|
const credentialData: IDataObject = {};
|
|
credentialKeys.forEach((key) => {
|
|
credentialData[key] = credentials[key] === '' ? undefined : credentials[key];
|
|
});
|
|
|
|
const optsData: IDataObject = {};
|
|
if (credentials.ssl === true) {
|
|
credentialData.protocol = 'amqps';
|
|
|
|
optsData.ca = credentials.ca === '' ? undefined : [Buffer.from(credentials.ca as string)];
|
|
if (credentials.passwordless === true) {
|
|
optsData.cert = credentials.cert === '' ? undefined : Buffer.from(credentials.cert as string);
|
|
optsData.key = credentials.key === '' ? undefined : Buffer.from(credentials.key as string);
|
|
optsData.passphrase = credentials.passphrase === '' ? undefined : credentials.passphrase;
|
|
optsData.credentials = amqplib.credentials.external();
|
|
}
|
|
}
|
|
|
|
return new Promise(async (resolve, reject) => {
|
|
try {
|
|
const connection = await amqplib.connect(credentialData, optsData);
|
|
|
|
connection.on('error', (error: Error) => {
|
|
reject(error);
|
|
});
|
|
|
|
const channel = (await connection.createChannel().catch(console.warn)) as amqplib.Channel;
|
|
|
|
if (
|
|
options.arguments &&
|
|
((options.arguments as IDataObject).argument! as IDataObject[]).length
|
|
) {
|
|
const additionalArguments: IDataObject = {};
|
|
((options.arguments as IDataObject).argument as IDataObject[]).forEach(
|
|
(argument: IDataObject) => {
|
|
additionalArguments[argument.key as string] = argument.value;
|
|
},
|
|
);
|
|
options.arguments = additionalArguments;
|
|
}
|
|
|
|
resolve(channel);
|
|
} catch (error) {
|
|
reject(error);
|
|
}
|
|
});
|
|
}
|
|
|
|
export async function rabbitmqConnectQueue(
|
|
this: IExecuteFunctions | ITriggerFunctions,
|
|
queue: string,
|
|
options: IDataObject,
|
|
): Promise<amqplib.Channel> {
|
|
const channel = await rabbitmqConnect.call(this, options);
|
|
|
|
return new Promise(async (resolve, reject) => {
|
|
try {
|
|
await channel.assertQueue(queue, options);
|
|
resolve(channel);
|
|
} catch (error) {
|
|
reject(error);
|
|
}
|
|
});
|
|
}
|
|
|
|
export async function rabbitmqConnectExchange(
|
|
this: IExecuteFunctions | ITriggerFunctions,
|
|
exchange: string,
|
|
type: string,
|
|
options: IDataObject,
|
|
): Promise<amqplib.Channel> {
|
|
const channel = await rabbitmqConnect.call(this, options);
|
|
|
|
return new Promise(async (resolve, reject) => {
|
|
try {
|
|
await channel.assertExchange(exchange, type, options);
|
|
resolve(channel);
|
|
} catch (error) {
|
|
reject(error);
|
|
}
|
|
});
|
|
}
|
|
|
|
export class MessageTracker {
|
|
messages: number[] = [];
|
|
|
|
isClosing = false;
|
|
|
|
received(message: amqplib.ConsumeMessage) {
|
|
this.messages.push(message.fields.deliveryTag);
|
|
}
|
|
|
|
answered(message: amqplib.ConsumeMessage) {
|
|
if (this.messages.length === 0) {
|
|
return;
|
|
}
|
|
|
|
const index = this.messages.findIndex((value) => value !== message.fields.deliveryTag);
|
|
this.messages.splice(index);
|
|
}
|
|
|
|
unansweredMessages() {
|
|
return this.messages.length;
|
|
}
|
|
|
|
async closeChannel(channel: amqplib.Channel, consumerTag: string) {
|
|
if (this.isClosing) {
|
|
return;
|
|
}
|
|
this.isClosing = true;
|
|
|
|
// Do not accept any new messages
|
|
await channel.cancel(consumerTag);
|
|
|
|
let count = 0;
|
|
let unansweredMessages = this.unansweredMessages();
|
|
|
|
// Give currently executing messages max. 5 minutes to finish before
|
|
// the channel gets closed. If we would not do that, it would not be possible
|
|
// to acknowledge messages anymore for which the executions were already running
|
|
// when for example a new version of the workflow got saved. That would lead to
|
|
// them getting delivered and processed again.
|
|
while (unansweredMessages !== 0 && count++ <= 300) {
|
|
await sleep(1000);
|
|
unansweredMessages = this.unansweredMessages();
|
|
}
|
|
|
|
await channel.close();
|
|
await channel.connection.close();
|
|
}
|
|
}
|