n8n/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts
Omar Ajoue 7ce7285f7a
Load credentials from the database (#1741)
* Changes to types so that credentials can be always loaded from DB

This first commit changes all return types from the execute functions
and calls to get credentials to be async so we can use await.

This is a first step as previously credentials were loaded in memory and
always available. We will now be loading them from the DB which requires
turning the whole call chain async.

* Fix updated files

* Removed unnecessary credential loading to improve performance

* Fix typo

*  Fix issue

* Updated new nodes to load credentials async

*  Remove not needed comment

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-08-20 18:57:30 +02:00

288 lines
8.6 KiB
TypeScript

import {
ContainerOptions,
create_container,
EventContext,
Message,
ReceiverOptions,
} from 'rhea';
import { ITriggerFunctions } from 'n8n-core';
import {
IDataObject,
INodeType,
INodeTypeDescription,
ITriggerResponse,
NodeOperationError,
} from 'n8n-workflow';
export class AmqpTrigger implements INodeType {
description: INodeTypeDescription = {
displayName: 'AMQP Trigger',
name: 'amqpTrigger',
icon: 'file:amqp.png',
group: ['trigger'],
version: 1,
description: 'Listens to AMQP 1.0 Messages',
defaults: {
name: 'AMQP Trigger',
color: '#00FF00',
},
inputs: [],
outputs: ['main'],
credentials: [{
name: 'amqp',
required: true,
}],
properties: [
// Node properties which the user gets displayed and
// can change on the node.
{
displayName: 'Queue / Topic',
name: 'sink',
type: 'string',
default: '',
placeholder: 'topic://sourcename.something',
description: 'name of the queue of topic to listen to',
},
{
displayName: 'Clientname',
name: 'clientname',
type: 'string',
default: '',
placeholder: 'for durable/persistent topic subscriptions, example: "n8n"',
description: 'Leave empty for non-durable topic subscriptions or queues. ',
},
{
displayName: 'Subscription',
name: 'subscription',
type: 'string',
default: '',
placeholder: 'for durable/persistent topic subscriptions, example: "order-worker"',
description: 'Leave empty for non-durable topic subscriptions or queues',
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'Container ID',
name: 'containerId',
type: 'string',
default: '',
description: 'Will be used to pass to the RHEA Backend as container_id',
},
{
displayName: 'Convert Body To String',
name: 'jsonConvertByteArrayToString',
type: 'boolean',
default: false,
description: 'Convert JSON Body content (["body"]["content"]) from Byte Array to string. Needed for Azure Service Bus.',
},
{
displayName: 'JSON Parse Body',
name: 'jsonParseBody',
type: 'boolean',
default: false,
description: 'Parse the body to an object.',
},
{
displayName: 'Messages per Cicle',
name: 'pullMessagesNumber',
type: 'number',
default: 100,
description: 'Number of messages to pull from the bus for every cicle',
},
{
displayName: 'Only Body',
name: 'onlyBody',
type: 'boolean',
default: false,
description: 'Returns only the body property.',
},
{
displayName: 'Reconnect',
name: 'reconnect',
type: 'boolean',
default: true,
description: 'Automatically reconnect if disconnected',
},
{
displayName: 'Reconnect Limit',
name: 'reconnectLimit',
type: 'number',
default: 50,
description: 'Maximum number of reconnect attempts',
},
{
displayName: 'Sleep Time',
name: 'sleepTime',
type: 'number',
default: 10,
description: 'Milliseconds to sleep after every cicle.',
},
],
},
],
};
async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {
const credentials = await this.getCredentials('amqp');
if (!credentials) {
throw new NodeOperationError(this.getNode(), 'Credentials are mandatory!');
}
const sink = this.getNodeParameter('sink', '') as string;
const clientname = this.getNodeParameter('clientname', '') as string;
const subscription = this.getNodeParameter('subscription', '') as string;
const options = this.getNodeParameter('options', {}) as IDataObject;
const pullMessagesNumber = options.pullMessagesNumber as number || 100;
const containerId = options.containerId as string;
const containerReconnect = options.reconnect as boolean || true;
const containerReconnectLimit = options.reconnectLimit as number || 50;
if (sink === '') {
throw new NodeOperationError(this.getNode(), 'Queue or Topic required!');
}
let durable = false;
if (subscription && clientname) {
durable = true;
}
const container = create_container();
let lastMsgId: string | number | Buffer | undefined = undefined;
const self = this;
container.on('receiver_open', (context: EventContext) => {
context.receiver?.add_credit(pullMessagesNumber);
});
container.on('message', (context: EventContext) => {
// No message in the context
if (!context.message) {
return;
}
// ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code
if (context.message.message_id && context.message.message_id === lastMsgId) {
return;
}
lastMsgId = context.message.message_id;
let data = context.message;
if (options.jsonConvertByteArrayToString === true && data.body.content !== undefined) {
// The buffer is not ready... Stringify and parse back to load it.
const cont = JSON.stringify(data.body.content);
data.body = String.fromCharCode.apply(null, JSON.parse(cont).data);
}
if (options.jsonConvertByteArrayToString === true && data.body.content !== undefined) {
// The buffer is not ready... Stringify and parse back to load it.
const cont = JSON.stringify(data.body.content);
data.body = String.fromCharCode.apply(null, JSON.parse(cont).data);
}
if (options.jsonConvertByteArrayToString === true && data.body.content !== undefined) {
// The buffer is not ready... Stringify and parse back to load it.
const content = JSON.stringify(data.body.content);
data.body = String.fromCharCode.apply(null, JSON.parse(content).data);
}
if (options.jsonParseBody === true) {
data.body = JSON.parse(data.body);
}
if (options.onlyBody === true) {
data = data.body;
}
self.emit([self.helpers.returnJsonArray([data as any])]); // tslint:disable-line:no-any
if (!context.receiver?.has_credit()) {
setTimeout(() => {
context.receiver?.add_credit(pullMessagesNumber);
}, options.sleepTime as number || 10);
}
});
/*
Values are documentet here: https://github.com/amqp/rhea#container
*/
const connectOptions: ContainerOptions = {
host: credentials.hostname,
hostname: credentials.hostname,
port: credentials.port,
reconnect: containerReconnect,
reconnect_limit: containerReconnectLimit,
username: credentials.username ? credentials.username : undefined,
password: credentials.password ? credentials.password : undefined,
transport: credentials.transportType ? credentials.transportType : undefined,
container_id: containerId ? containerId : undefined,
id: containerId ? containerId : undefined,
};
const connection = container.connect(connectOptions);
const clientOptions: ReceiverOptions = {
name: subscription ? subscription : undefined,
source: {
address: sink,
durable: (durable ? 2 : undefined),
expiry_policy: (durable ? 'never' : undefined),
},
credit_window: 0, // prefetch 1
};
connection.open_receiver(clientOptions);
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
container.removeAllListeners('receiver_open');
container.removeAllListeners('message');
connection.close();
}
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the
// workflow manually.
// for AMQP it doesn't make much sense to wait here but
// for a new user who doesn't know how this works, it's better to wait and show a respective info message
async function manualTriggerFunction() {
await new Promise((resolve, reject) => {
const timeoutHandler = setTimeout(() => {
reject(new Error('Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.'));
}, 30000);
container.on('message', (context: EventContext) => {
// Check if the only property present in the message is body
// in which case we only emit the content of the body property
// otherwise we emit all properties and their content
const message = context.message as Message;
if (Object.keys(message)[0] === 'body' && Object.keys(message).length === 1) {
self.emit([self.helpers.returnJsonArray([message.body])]);
} else {
self.emit([self.helpers.returnJsonArray([message as any])]); // tslint:disable-line:no-any
}
clearTimeout(timeoutHandler);
resolve(true);
});
});
}
return {
closeFunction,
manualTriggerFunction,
};
}
}