n8n/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts
Iván Ovejero 70ae90fa3c
refactor: Apply more eslint-plugin-n8n-nodes-base autofixable rules (#3432)
*  Update `lintfix` script

* 👕 Remove unneeded lint exceptions

* 👕 Run baseline `lintfix`

* 👕 Apply `node-param-description-miscased-url` (#3441)

* 👕 Apply `rule node-param-placeholder-miscased-id` (#3443)

Co-authored-by: Iván Ovejero <ivov.src@gmail.com>

* 👕 Apply `node-param-option-name-wrong-for-upsert` (#3446)

* 👕 Apply `node-param-min-value-wrong-for-limit` (#3442)

Co-authored-by: Iván Ovejero <ivov.src@gmail.com>

* Apply `node-param-display-name-wrong-for-dynamic-options` (#3454)

* 🔨 fix

*  Fix `Assigned To` fields

Co-authored-by: Michael Kret <michael.k@radency.com>

* 👕 Apply `rule node-param-default-wrong-for-number` (#3453)

* 👕 Apply `node-param-default-wrong-for-string` (#3452)

Co-authored-by: Iván Ovejero <ivov.src@gmail.com>

* Apply `node-param-display-name-miscased` (#3449)

* 🔨 fix

* 🔨 exceptions

*  review fixes

* 👕 Apply `node-param-description-lowercase-first-char` (#3451)

*  fix

*  review fixes

*  fix

Co-authored-by: Iván Ovejero <ivov.src@gmail.com>

* 👕 Apply `node-param-description-wrong-for-dynamic-options` (#3456)

* Rule working as intended

* Add rule

* 🔥 Remove repetitions

* 👕 Add exceptions

Co-authored-by: Iván Ovejero <ivov.src@gmail.com>

* 👕 Small fix for `node-param-description-wrong-for-dynamic-options`

* 👕 Apply `node-param-default-wrong-for-fixed-collection` (#3460)

* 👕 Apply `node-param-description-line-break-html-tag` (#3462)

* 👕 Run baseline `lintfix`

* 👕 Apply `node-param-options-type-unsorted-items` (#3459)

*  fix

* 🔨 exceptions

* Add exception for Salesmate and Zoom

Co-authored-by: Michael Kret <michael.k@radency.com>
Co-authored-by: Iván Ovejero <ivov.src@gmail.com>

*  Restore `lintfix` command

Co-authored-by: Omar Ajoue <krynble@gmail.com>
Co-authored-by: Michael Kret <88898367+michael-radency@users.noreply.github.com>
Co-authored-by: agobrech <45268029+agobrech@users.noreply.github.com>
Co-authored-by: Michael Kret <michael.k@radency.com>
Co-authored-by: brianinoa <54530642+brianinoa@users.noreply.github.com>
2022-06-03 19:23:49 +02:00

441 lines
9.9 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import {
IExecuteFunctions,
} from 'n8n-core';
import {
IDataObject,
INodeExecutionData,
INodeType,
INodeTypeDescription,
JsonObject,
NodeApiError,
NodeOperationError,
} from 'n8n-workflow';
import {
rabbitmqConnectExchange,
rabbitmqConnectQueue,
} from './GenericFunctions';
export class RabbitMQ implements INodeType {
description: INodeTypeDescription = {
displayName: 'RabbitMQ',
name: 'rabbitmq',
icon: 'file:rabbitmq.png',
group: ['transform'],
version: 1,
description: 'Sends messages to a RabbitMQ topic',
defaults: {
name: 'RabbitMQ',
},
inputs: ['main'],
outputs: ['main'],
credentials: [
{
name: 'rabbitmq',
required: true,
},
],
properties: [
{
displayName: 'Mode',
name: 'mode',
type: 'options',
options: [
{
name: 'Queue',
value: 'queue',
description: 'Publish data to queue',
},
{
name: 'Exchange',
value: 'exchange',
description: 'Publish data to exchange',
},
],
default: 'queue',
description: 'To where data should be moved',
},
// ----------------------------------
// Queue
// ----------------------------------
{
displayName: 'Queue / Topic',
name: 'queue',
type: 'string',
displayOptions: {
show: {
mode: [
'queue',
],
},
},
default: '',
placeholder: 'queue-name',
description: 'Name of the queue to publish to',
},
// ----------------------------------
// Exchange
// ----------------------------------
{
displayName: 'Exchange',
name: 'exchange',
type: 'string',
displayOptions: {
show: {
mode: [
'exchange',
],
},
},
default: '',
placeholder: 'exchange-name',
description: 'Name of the exchange to publish to',
},
{
displayName: 'Type',
name: 'exchangeType',
type: 'options',
displayOptions: {
show: {
mode: [
'exchange',
],
},
},
options: [
{
name: 'Direct',
value: 'direct',
description: 'Direct exchange type',
},
{
name: 'Topic',
value: 'topic',
description: 'Topic exchange type',
},
{
name: 'Headers',
value: 'headers',
description: 'Headers exchange type',
},
{
name: 'Fanout',
value: 'fanout',
description: 'Fanout exchange type',
},
],
default: 'fanout',
description: 'Type of exchange',
},
{
displayName: 'Routing Key',
name: 'routingKey',
type: 'string',
displayOptions: {
show: {
mode: [
'exchange',
],
},
},
default: '',
placeholder: 'routing-key',
description: 'The routing key for the message',
},
// ----------------------------------
// Default
// ----------------------------------
{
displayName: 'Send Input Data',
name: 'sendInputData',
type: 'boolean',
default: true,
description: 'Send the the data the node receives as JSON',
},
{
displayName: 'Message',
name: 'message',
type: 'string',
displayOptions: {
show: {
sendInputData: [
false,
],
},
},
default: '',
description: 'The message to be sent',
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
default: {},
placeholder: 'Add Option',
options: [
{
displayName: 'Alternate Exchange',
name: 'alternateExchange',
type: 'string',
displayOptions: {
show: {
'/mode': [
'exchange',
],
},
},
default: '',
description: 'An exchange to send messages to if this exchange cant route them to any queues',
},
{
displayName: 'Arguments',
name: 'arguments',
placeholder: 'Add Argument',
description: 'Arguments to add',
type: 'fixedCollection',
typeOptions: {
multipleValues: true,
},
default: {},
options: [
{
name: 'argument',
displayName: 'Argument',
values: [
{
displayName: 'Key',
name: 'key',
type: 'string',
default: '',
},
{
displayName: 'Value',
name: 'value',
type: 'string',
default: '',
},
],
},
],
},
{
displayName: 'Auto Delete Queue',
name: 'autoDelete',
type: 'boolean',
default: false,
description: 'The queue will be deleted when the number of consumers drops to zero',
},
{
displayName: 'Durable',
name: 'durable',
type: 'boolean',
default: true,
description: 'The queue will survive broker restarts',
},
{
displayName: 'Exclusive',
name: 'exclusive',
type: 'boolean',
displayOptions: {
show: {
'/mode': [
'queue',
],
},
},
default: false,
description: 'Scopes the queue to the connection',
},
{
displayName: 'Headers',
name: 'headers',
placeholder: 'Add Header',
description: 'Headers to add',
type: 'fixedCollection',
typeOptions: {
multipleValues: true,
},
default: {},
options: [
{
name: 'header',
displayName: 'Header',
values: [
{
displayName: 'Key',
name: 'key',
type: 'string',
default: '',
},
{
displayName: 'Value',
name: 'value',
type: 'string',
default: '',
},
],
},
],
},
],
},
],
};
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
let channel, options: IDataObject;
try {
const items = this.getInputData();
const mode = this.getNodeParameter('mode', 0) as string;
const returnItems: INodeExecutionData[] = [];
if (mode === 'queue') {
const queue = this.getNodeParameter('queue', 0) as string;
options = this.getNodeParameter('options', 0, {}) as IDataObject;
channel = await rabbitmqConnectQueue.call(this, queue, options);
const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;
let message: string;
const queuePromises = [];
for (let i = 0; i < items.length; i++) {
if (sendInputData === true) {
message = JSON.stringify(items[i].json);
} else {
message = this.getNodeParameter('message', i) as string;
}
let headers: IDataObject = {};
if (options.headers && ((options.headers as IDataObject).header! as IDataObject[]).length) {
const itemOptions = this.getNodeParameter('options', i, {}) as IDataObject;
const additionalHeaders: IDataObject = {};
((itemOptions.headers as IDataObject).header as IDataObject[]).forEach((header: IDataObject) => {
additionalHeaders[header.key as string] = header.value;
});
headers = additionalHeaders;
}
queuePromises.push(channel.sendToQueue(queue, Buffer.from(message), { headers }));
}
// @ts-ignore
const promisesResponses = await Promise.allSettled(queuePromises);
promisesResponses.forEach((response: JsonObject) => {
if (response!.status !== 'fulfilled') {
if (this.continueOnFail() !== true) {
throw new NodeApiError(this.getNode(), response);
} else {
// Return the actual reason as error
returnItems.push(
{
json: {
error: response.reason,
},
},
);
return;
}
}
returnItems.push({
json: {
success: response.value,
},
});
});
await channel.close();
await channel.connection.close();
}
else if (mode === 'exchange') {
const exchange = this.getNodeParameter('exchange', 0) as string;
const type = this.getNodeParameter('exchangeType', 0) as string;
const routingKey = this.getNodeParameter('routingKey', 0) as string;
options = this.getNodeParameter('options', 0, {}) as IDataObject;
channel = await rabbitmqConnectExchange.call(this, exchange, type, options);
const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;
let message: string;
const exchangePromises = [];
for (let i = 0; i < items.length; i++) {
if (sendInputData === true) {
message = JSON.stringify(items[i].json);
} else {
message = this.getNodeParameter('message', i) as string;
}
let headers: IDataObject = {};
if (options.headers && ((options.headers as IDataObject).header! as IDataObject[]).length) {
const itemOptions = this.getNodeParameter('options', i, {}) as IDataObject;
const additionalHeaders: IDataObject = {};
((itemOptions.headers as IDataObject).header as IDataObject[]).forEach((header: IDataObject) => {
additionalHeaders[header.key as string] = header.value;
});
headers = additionalHeaders;
}
exchangePromises.push(channel.publish(exchange, routingKey, Buffer.from(message), { headers }));
}
// @ts-ignore
const promisesResponses = await Promise.allSettled(exchangePromises);
promisesResponses.forEach((response: JsonObject) => {
if (response!.status !== 'fulfilled') {
if (this.continueOnFail() !== true) {
throw new NodeApiError(this.getNode(), response);
} else {
// Return the actual reason as error
returnItems.push(
{
json: {
error: response.reason,
},
},
);
return;
}
}
returnItems.push({
json: {
success: response.value,
},
});
});
await channel.close();
await channel.connection.close();
} else {
throw new NodeOperationError(this.getNode(), `The operation "${mode}" is not known!`);
}
return this.prepareOutputData(returnItems);
}
catch (error) {
if (channel) {
await channel.close();
await channel.connection.close();
}
throw error;
}
}
}