import type { IDataObject, IExecuteFunctions, INodeExecutionData, ITriggerFunctions, } from 'n8n-workflow'; import { jsonParse, sleep } from 'n8n-workflow'; import * as amqplib from 'amqplib'; import { formatPrivateKey } from '@utils/utilities'; import type { ExchangeType, Options, RabbitMQCredentials, TriggerOptions } from './types'; const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost'] as const; export async function rabbitmqConnect( credentials: RabbitMQCredentials, ): Promise { const credentialData = credentialKeys.reduce((acc, key) => { acc[key] = credentials[key] === '' ? undefined : credentials[key]; return acc; }, {} as IDataObject) as amqplib.Options.Connect; const optsData: IDataObject = {}; if (credentials.ssl) { credentialData.protocol = 'amqps'; optsData.ca = credentials.ca === '' ? undefined : [Buffer.from(formatPrivateKey(credentials.ca))]; if (credentials.passwordless) { optsData.cert = credentials.cert === '' ? undefined : Buffer.from(formatPrivateKey(credentials.cert)); optsData.key = credentials.key === '' ? undefined : Buffer.from(formatPrivateKey(credentials.key)); optsData.passphrase = credentials.passphrase === '' ? undefined : credentials.passphrase; optsData.credentials = amqplib.credentials.external(); } } return await amqplib.connect(credentialData, optsData); } export async function rabbitmqCreateChannel( this: IExecuteFunctions | ITriggerFunctions, ): Promise { const credentials = await this.getCredentials('rabbitmq'); return await new Promise(async (resolve, reject) => { try { const connection = await rabbitmqConnect(credentials); // TODO: why is this error handler being added here? connection.on('error', reject); const channel = await connection.createChannel(); resolve(channel); } catch (error) { reject(error); } }); } export async function rabbitmqConnectQueue( this: IExecuteFunctions | ITriggerFunctions, queue: string, options: Options | TriggerOptions, ): Promise { const channel = await rabbitmqCreateChannel.call(this); return await new Promise(async (resolve, reject) => { try { if (options.assertQueue) { await channel.assertQueue(queue, options); } else { await channel.checkQueue(queue); } if ('binding' in options && options.binding?.bindings.length) { options.binding.bindings.forEach(async (binding) => { await channel.bindQueue(queue, binding.exchange, binding.routingKey); }); } resolve(channel); } catch (error) { reject(error); } }); } export async function rabbitmqConnectExchange( this: IExecuteFunctions | ITriggerFunctions, exchange: string, options: Options | TriggerOptions, ): Promise { const exchangeType = this.getNodeParameter('exchangeType', 0) as ExchangeType; const channel = await rabbitmqCreateChannel.call(this); return await new Promise(async (resolve, reject) => { try { if (options.assertExchange) { await channel.assertExchange(exchange, exchangeType, options); } else { await channel.checkExchange(exchange); } resolve(channel); } catch (error) { reject(error); } }); } export class MessageTracker { messages: number[] = []; isClosing = false; received(message: amqplib.ConsumeMessage) { this.messages.push(message.fields.deliveryTag); } answered(message: amqplib.ConsumeMessage) { if (this.messages.length === 0) { return; } const index = this.messages.findIndex((value) => value !== message.fields.deliveryTag); this.messages.splice(index); } unansweredMessages() { return this.messages.length; } async closeChannel(channel: amqplib.Channel, consumerTag: string) { if (this.isClosing) { return; } this.isClosing = true; // Do not accept any new messages await channel.cancel(consumerTag); let count = 0; let unansweredMessages = this.unansweredMessages(); // Give currently executing messages max. 5 minutes to finish before // the channel gets closed. If we would not do that, it would not be possible // to acknowledge messages anymore for which the executions were already running // when for example a new version of the workflow got saved. That would lead to // them getting delivered and processed again. while (unansweredMessages !== 0 && count++ <= 300) { await sleep(1000); unansweredMessages = this.unansweredMessages(); } await channel.close(); await channel.connection.close(); } } export const parsePublishArguments = (options: Options) => { const additionalArguments: IDataObject = {}; if (options.arguments?.argument.length) { options.arguments.argument.forEach((argument) => { additionalArguments[argument.key] = argument.value; }); } return additionalArguments as amqplib.Options.Publish; }; export const parseMessage = async ( message: amqplib.Message, options: TriggerOptions, helpers: ITriggerFunctions['helpers'], ): Promise => { if (options.contentIsBinary) { const { content } = message; message.content = undefined as unknown as Buffer; return { binary: { data: await helpers.prepareBinaryData(content), }, json: message as unknown as IDataObject, }; } else { let content: IDataObject | string = message.content.toString(); if (options.jsonParseBody) { content = jsonParse(content); } if (options.onlyContent) { return { json: content as IDataObject }; } else { message.content = content as unknown as Buffer; return { json: message as unknown as IDataObject }; } } };