mirror of
https://github.com/n8n-io/n8n.git
synced 2024-12-31 23:47:28 -08:00
e50f0e6a4e
Some checks are pending
Test Master / install-and-build (push) Waiting to run
Test Master / Unit tests (18.x) (push) Blocked by required conditions
Test Master / Unit tests (20.x) (push) Blocked by required conditions
Test Master / Unit tests (22.4) (push) Blocked by required conditions
Test Master / Lint (push) Blocked by required conditions
Test Master / Notify Slack on failure (push) Blocked by required conditions
196 lines
5.5 KiB
TypeScript
196 lines
5.5 KiB
TypeScript
import type {
|
|
IDataObject,
|
|
IExecuteFunctions,
|
|
INodeExecutionData,
|
|
ITriggerFunctions,
|
|
} from 'n8n-workflow';
|
|
import { jsonParse, sleep } from 'n8n-workflow';
|
|
import * as amqplib from 'amqplib';
|
|
import { formatPrivateKey } from '@utils/utilities';
|
|
import type { ExchangeType, Options, RabbitMQCredentials, TriggerOptions } from './types';
|
|
|
|
const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost'] as const;
|
|
|
|
export async function rabbitmqConnect(
|
|
credentials: RabbitMQCredentials,
|
|
): Promise<amqplib.Connection> {
|
|
const credentialData = credentialKeys.reduce((acc, key) => {
|
|
acc[key] = credentials[key] === '' ? undefined : credentials[key];
|
|
return acc;
|
|
}, {} as IDataObject) as amqplib.Options.Connect;
|
|
|
|
const optsData: IDataObject = {};
|
|
if (credentials.ssl) {
|
|
credentialData.protocol = 'amqps';
|
|
|
|
optsData.ca =
|
|
credentials.ca === '' ? undefined : [Buffer.from(formatPrivateKey(credentials.ca))];
|
|
if (credentials.passwordless) {
|
|
optsData.cert =
|
|
credentials.cert === '' ? undefined : Buffer.from(formatPrivateKey(credentials.cert));
|
|
optsData.key =
|
|
credentials.key === '' ? undefined : Buffer.from(formatPrivateKey(credentials.key));
|
|
optsData.passphrase = credentials.passphrase === '' ? undefined : credentials.passphrase;
|
|
optsData.credentials = amqplib.credentials.external();
|
|
}
|
|
}
|
|
|
|
return await amqplib.connect(credentialData, optsData);
|
|
}
|
|
|
|
export async function rabbitmqCreateChannel(
|
|
this: IExecuteFunctions | ITriggerFunctions,
|
|
): Promise<amqplib.Channel> {
|
|
const credentials = await this.getCredentials<RabbitMQCredentials>('rabbitmq');
|
|
|
|
return await new Promise(async (resolve, reject) => {
|
|
try {
|
|
const connection = await rabbitmqConnect(credentials);
|
|
// TODO: why is this error handler being added here?
|
|
connection.on('error', reject);
|
|
|
|
const channel = await connection.createChannel();
|
|
resolve(channel);
|
|
} catch (error) {
|
|
reject(error);
|
|
}
|
|
});
|
|
}
|
|
|
|
export async function rabbitmqConnectQueue(
|
|
this: IExecuteFunctions | ITriggerFunctions,
|
|
queue: string,
|
|
options: Options | TriggerOptions,
|
|
): Promise<amqplib.Channel> {
|
|
const channel = await rabbitmqCreateChannel.call(this);
|
|
|
|
return await new Promise(async (resolve, reject) => {
|
|
try {
|
|
if (options.assertQueue) {
|
|
await channel.assertQueue(queue, options);
|
|
} else {
|
|
await channel.checkQueue(queue);
|
|
}
|
|
|
|
if ('binding' in options && options.binding?.bindings.length) {
|
|
options.binding.bindings.forEach(async (binding) => {
|
|
await channel.bindQueue(queue, binding.exchange, binding.routingKey);
|
|
});
|
|
}
|
|
|
|
resolve(channel);
|
|
} catch (error) {
|
|
reject(error);
|
|
}
|
|
});
|
|
}
|
|
|
|
export async function rabbitmqConnectExchange(
|
|
this: IExecuteFunctions | ITriggerFunctions,
|
|
exchange: string,
|
|
options: Options | TriggerOptions,
|
|
): Promise<amqplib.Channel> {
|
|
const exchangeType = this.getNodeParameter('exchangeType', 0) as ExchangeType;
|
|
const channel = await rabbitmqCreateChannel.call(this);
|
|
|
|
return await new Promise(async (resolve, reject) => {
|
|
try {
|
|
if (options.assertExchange) {
|
|
await channel.assertExchange(exchange, exchangeType, options);
|
|
} else {
|
|
await channel.checkExchange(exchange);
|
|
}
|
|
resolve(channel);
|
|
} catch (error) {
|
|
reject(error);
|
|
}
|
|
});
|
|
}
|
|
|
|
export class MessageTracker {
|
|
messages: number[] = [];
|
|
|
|
isClosing = false;
|
|
|
|
received(message: amqplib.ConsumeMessage) {
|
|
this.messages.push(message.fields.deliveryTag);
|
|
}
|
|
|
|
answered(message: amqplib.ConsumeMessage) {
|
|
if (this.messages.length === 0) {
|
|
return;
|
|
}
|
|
|
|
const index = this.messages.findIndex((value) => value !== message.fields.deliveryTag);
|
|
this.messages.splice(index);
|
|
}
|
|
|
|
unansweredMessages() {
|
|
return this.messages.length;
|
|
}
|
|
|
|
async closeChannel(channel: amqplib.Channel, consumerTag: string) {
|
|
if (this.isClosing) {
|
|
return;
|
|
}
|
|
this.isClosing = true;
|
|
|
|
// Do not accept any new messages
|
|
await channel.cancel(consumerTag);
|
|
|
|
let count = 0;
|
|
let unansweredMessages = this.unansweredMessages();
|
|
|
|
// Give currently executing messages max. 5 minutes to finish before
|
|
// the channel gets closed. If we would not do that, it would not be possible
|
|
// to acknowledge messages anymore for which the executions were already running
|
|
// when for example a new version of the workflow got saved. That would lead to
|
|
// them getting delivered and processed again.
|
|
while (unansweredMessages !== 0 && count++ <= 300) {
|
|
await sleep(1000);
|
|
unansweredMessages = this.unansweredMessages();
|
|
}
|
|
|
|
await channel.close();
|
|
await channel.connection.close();
|
|
}
|
|
}
|
|
|
|
export const parsePublishArguments = (options: Options) => {
|
|
const additionalArguments: IDataObject = {};
|
|
if (options.arguments?.argument.length) {
|
|
options.arguments.argument.forEach((argument) => {
|
|
additionalArguments[argument.key] = argument.value;
|
|
});
|
|
}
|
|
return additionalArguments as amqplib.Options.Publish;
|
|
};
|
|
|
|
export const parseMessage = async (
|
|
message: amqplib.Message,
|
|
options: TriggerOptions,
|
|
helpers: ITriggerFunctions['helpers'],
|
|
): Promise<INodeExecutionData> => {
|
|
if (options.contentIsBinary) {
|
|
const { content } = message;
|
|
message.content = undefined as unknown as Buffer;
|
|
return {
|
|
binary: {
|
|
data: await helpers.prepareBinaryData(content),
|
|
},
|
|
json: message as unknown as IDataObject,
|
|
};
|
|
} else {
|
|
let content: IDataObject | string = message.content.toString();
|
|
if (options.jsonParseBody) {
|
|
content = jsonParse(content);
|
|
}
|
|
if (options.onlyContent) {
|
|
return { json: content as IDataObject };
|
|
} else {
|
|
message.content = content as unknown as Buffer;
|
|
return { json: message as unknown as IDataObject };
|
|
}
|
|
}
|
|
};
|