From dfa847cfee049a56c1041ad151555c06801b0252 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=BCnther=20Erb?= Date: Wed, 30 Dec 2020 12:42:48 +0100 Subject: [PATCH 1/5] Add new Options to pass to the RHEA Container --- .../nodes-base/nodes/Amqp/AmqpTrigger.node.ts | 40 ++++++++++++++++--- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index 40af50c4f2..c613698d63 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -96,6 +96,27 @@ export class AmqpTrigger implements INodeType { default: 10, description: 'Milliseconds to sleep after every cicle.', }, + { + displayName: 'Container ID', + name: 'containerID', + type: 'string', + default: '', + description: 'Will be used to pass to the RHEA Backend as container_id', + }, + { + displayName: 'Reconnect', + name: 'reconnect', + type: 'boolean', + default: true, + description: 'If on, the library will automatically attempt to reconnect if disconnected', + }, + { + displayName: 'Reconnect limit', + name: 'reconnectLimit', + type: 'number', + default: 50, + description: 'maximum number of reconnect attempts', + }, ], }, ], @@ -114,6 +135,9 @@ export class AmqpTrigger implements INodeType { const subscription = this.getNodeParameter('subscription', '') as string; const options = this.getNodeParameter('options', {}) as IDataObject; const pullMessagesNumber = options.pullMessagesNumber || 100; + const container_id = options.containerID as string; + const containerReconnect = options.reconnect || true as boolean; + const containerReconnectLimit = options.reconnectLimit || 50 as number; if (sink === '') { throw new Error('Queue or Topic required!'); @@ -126,24 +150,28 @@ export class AmqpTrigger implements INodeType { } const container = require('rhea'); + + /* + Values are documentet here: https://github.com/amqp/rhea#container + */ const connectOptions: ContainerOptions = { host: credentials.hostname, hostname: credentials.hostname, port: credentials.port, - reconnect: true, // this id the default anyway - reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm - container_id: (durable ? clientname : null), + reconnect: containerReconnect, + reconnect_limit: containerReconnectLimit, }; if (credentials.username || credentials.password) { - // Old rhea implementation. not shure if it is neccessary - container.options.username = credentials.username; - container.options.password = credentials.password; connectOptions.username = credentials.username; connectOptions.password = credentials.password; } if (credentials.transportType) { connectOptions.transport = credentials.transportType; } + if(container_id) { + connectOptions.id = container_id; + connectOptions.container_id = container_id; + } let lastMsgId: number | undefined = undefined; const self = this; From 7648ee8d934bf3191d2b37263c661e034c519e74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=BCnther=20Erb?= Date: Wed, 30 Dec 2020 13:29:29 +0100 Subject: [PATCH 2/5] Typings on Amqp --- .../nodes-base/nodes/Amqp/AmqpTrigger.node.ts | 116 ++++++++++-------- 1 file changed, 63 insertions(+), 53 deletions(-) diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index c613698d63..019f533f80 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -1,4 +1,4 @@ -import { ContainerOptions } from 'rhea'; +import { Container, ContainerOptions, EventContext, ReceiverOptions } from 'rhea'; import { ITriggerFunctions } from 'n8n-core'; import { @@ -55,6 +55,20 @@ export class AmqpTrigger implements INodeType { description: 'Leave empty for non-durable topic subscriptions or queues', }, { + displayName: 'Pull N Messages per Cicle', + name: 'pullMessagesNumber', + type: 'number', + default: 100, + description: 'Number of messages to pull from the bus for every cicle', + }, + { + displayName: 'Sleep time after cicle', + name: 'sleepTime', + type: 'number', + default: 10, + description: 'Milliseconds to sleep after every cicle', + }, + { displayName: 'Options', name: 'options', type: 'collection', @@ -119,7 +133,7 @@ export class AmqpTrigger implements INodeType { }, ], }, - ], + ] }; @@ -134,10 +148,10 @@ export class AmqpTrigger implements INodeType { const clientname = this.getNodeParameter('clientname', '') as string; const subscription = this.getNodeParameter('subscription', '') as string; const options = this.getNodeParameter('options', {}) as IDataObject; - const pullMessagesNumber = options.pullMessagesNumber || 100; + const pullMessagesNumber = options.pullMessagesNumber as number || 100; const container_id = options.containerID as string; - const containerReconnect = options.reconnect || true as boolean; - const containerReconnectLimit = options.reconnectLimit || 50 as number; + const containerReconnect = options.reconnect as boolean || true ; + const containerReconnectLimit = options.reconnectLimit as number || 50; if (sink === '') { throw new Error('Queue or Topic required!'); @@ -149,38 +163,23 @@ export class AmqpTrigger implements INodeType { durable = true; } - const container = require('rhea'); + const container: Container = require('rhea'); - /* - Values are documentet here: https://github.com/amqp/rhea#container - */ - const connectOptions: ContainerOptions = { - host: credentials.hostname, - hostname: credentials.hostname, - port: credentials.port, - reconnect: containerReconnect, - reconnect_limit: containerReconnectLimit, - }; - if (credentials.username || credentials.password) { - connectOptions.username = credentials.username; - connectOptions.password = credentials.password; - } - if (credentials.transportType) { - connectOptions.transport = credentials.transportType; - } - if(container_id) { - connectOptions.id = container_id; - connectOptions.container_id = container_id; - } - let lastMsgId: number | undefined = undefined; + let lastMsgId: string| number | Buffer | undefined = undefined; const self = this; container.on('receiver_open', (context: any) => { // tslint:disable-line:no-any context.receiver.add_credit(pullMessagesNumber); }); - container.on('message', (context: any) => { // tslint:disable-line:no-any + container.on('message', (context: EventContext) => { // tslint:disable-line:no-any + + // No message in the context + if(!context.message) + return; + + console.log("New Message on Amqp Trigger from " + container.id + " context conteaineer id: " + context.container.id); // ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code if (context.message.message_id && context.message.message_id === lastMsgId) { return; @@ -188,6 +187,12 @@ export class AmqpTrigger implements INodeType { lastMsgId = context.message.message_id; let data = context.message; + + if(options.jsonConvertByteArrayToString === true && data.body.content !== undefined) { + // The buffer is not ready... Stringify and parse back to load it. + let cont = JSON.stringify(data.body.content); + data.body = String.fromCharCode.apply(null,JSON.parse(cont).data); + } if (options.jsonConvertByteArrayToString === true && data.body.content !== undefined) { // The buffer is not ready... Stringify and parse back to load it. @@ -209,35 +214,40 @@ export class AmqpTrigger implements INodeType { } - self.emit([self.helpers.returnJsonArray([data])]); + self.emit([self.helpers.returnJsonArray([data as any])]); - if (context.receiver.credit === 0) { + if (!context.receiver?.has_credit()) { setTimeout(() => { - context.receiver.add_credit(pullMessagesNumber); + context.receiver?.add_credit(pullMessagesNumber); }, options.sleepTime as number || 10); } }); + /* + Values are documentet here: https://github.com/amqp/rhea#container + */ + const connectOptions: ContainerOptions = { + host: credentials.hostname, + hostname: credentials.hostname, + port: credentials.port, + reconnect: containerReconnect, + reconnect_limit: containerReconnectLimit, + username: credentials.username ? credentials.username : undefined, + password: credentials.password ? credentials.password : undefined, + transport: credentials.transportType ? credentials.transportType : undefined, + container_id: container_id ? container_id : undefined, + id: container_id ? container_id : undefined, + }; const connection = container.connect(connectOptions); - let clientOptions = undefined; - if (durable) { - clientOptions = { - name: subscription, - source: { - address: sink, - durable: 2, - expiry_policy: 'never', - }, - credit_window: 0, // prefetch 1 - }; - } else { - clientOptions = { - source: { - address: sink, - }, - credit_window: 0, // prefetch 1 - }; - } + + let clientOptions : ReceiverOptions = { + source: { + address: sink, + durable: (durable ? 2 : undefined), + expiry_policy: (durable ? 'never' : undefined), + }, + credit_window: 0, // prefetch 1 + }; connection.open_receiver(clientOptions); @@ -248,7 +258,7 @@ export class AmqpTrigger implements INodeType { container.removeAllListeners('message'); connection.close(); } - + // The "manualTriggerFunction" function gets called by n8n // when a user is in the workflow editor and starts the // workflow manually. @@ -258,7 +268,7 @@ export class AmqpTrigger implements INodeType { await new Promise((resolve, reject) => { const timeoutHandler = setTimeout(() => { reject(new Error('Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.')); - }, 30000); + }, 3000); container.on('message', (context: any) => { // tslint:disable-line:no-any // Check if the only property present in the message is body // in which case we only emit the content of the body property From 1a1accce437b0b939113ba65bf39c6c140d12b1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=BCnther=20Erb?= Date: Wed, 30 Dec 2020 13:57:53 +0100 Subject: [PATCH 3/5] Fix Subscription --- .../nodes-base/nodes/Amqp/AmqpTrigger.node.ts | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index 019f533f80..9a5d9f204e 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -1,4 +1,4 @@ -import { Container, ContainerOptions, EventContext, ReceiverOptions } from 'rhea'; +import { Container, ContainerOptions, EventContext, Message, ReceiverOptions } from 'rhea'; import { ITriggerFunctions } from 'n8n-core'; import { @@ -169,11 +169,11 @@ export class AmqpTrigger implements INodeType { let lastMsgId: string| number | Buffer | undefined = undefined; const self = this; - container.on('receiver_open', (context: any) => { // tslint:disable-line:no-any - context.receiver.add_credit(pullMessagesNumber); + container.on('receiver_open', (context: EventContext) => { + context.receiver?.add_credit(pullMessagesNumber); }); - container.on('message', (context: EventContext) => { // tslint:disable-line:no-any + container.on('message', (context: EventContext) => { // No message in the context if(!context.message) @@ -241,6 +241,7 @@ export class AmqpTrigger implements INodeType { const connection = container.connect(connectOptions); let clientOptions : ReceiverOptions = { + name: subscription ? subscription : undefined, source: { address: sink, durable: (durable ? 2 : undefined), @@ -269,14 +270,15 @@ export class AmqpTrigger implements INodeType { const timeoutHandler = setTimeout(() => { reject(new Error('Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.')); }, 3000); - container.on('message', (context: any) => { // tslint:disable-line:no-any + container.on('message', (context: EventContext) => { // Check if the only property present in the message is body // in which case we only emit the content of the body property // otherwise we emit all properties and their content - if (Object.keys(context.message)[0] === 'body' && Object.keys(context.message).length === 1) { - self.emit([self.helpers.returnJsonArray([context.message.body])]); + const message = context.message as Message; + if (Object.keys(message)[0] === 'body' && Object.keys(message).length === 1) { + self.emit([self.helpers.returnJsonArray([message.body])]); } else { - self.emit([self.helpers.returnJsonArray([context.message])]); + self.emit([self.helpers.returnJsonArray([message as any])]); } clearTimeout(timeoutHandler); resolve(true); From ca8aed39dd3b9a3cbb077e121b3875c36b691412 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=BCnther=20Erb?= Date: Wed, 30 Dec 2020 13:58:00 +0100 Subject: [PATCH 4/5] Typeings --- packages/nodes-base/nodes/Amqp/Amqp.node.ts | 61 +++++++++++++++------ 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/packages/nodes-base/nodes/Amqp/Amqp.node.ts b/packages/nodes-base/nodes/Amqp/Amqp.node.ts index 3fd8133711..653d0dff92 100644 --- a/packages/nodes-base/nodes/Amqp/Amqp.node.ts +++ b/packages/nodes-base/nodes/Amqp/Amqp.node.ts @@ -1,4 +1,4 @@ -import { ContainerOptions, Delivery } from 'rhea'; +import { ContainerOptions, Delivery, Dictionary, EventContext } from 'rhea'; import { IExecuteFunctions } from 'n8n-core'; import { @@ -64,6 +64,27 @@ export class Amqp implements INodeType { default: '', description: 'The only property to send. If empty the whole item will be sent.', }, + { + displayName: 'Container ID', + name: 'containerID', + type: 'string', + default: '', + description: 'Will be used to pass to the RHEA Backend as container_id', + }, + { + displayName: 'Reconnect', + name: 'reconnect', + type: 'boolean', + default: true, + description: 'If on, the library will automatically attempt to reconnect if disconnected', + }, + { + displayName: 'Reconnect limit', + name: 'reconnectLimit', + type: 'number', + default: 50, + description: 'maximum number of reconnect attempts', + }, ], }, ], @@ -78,11 +99,16 @@ export class Amqp implements INodeType { const sink = this.getNodeParameter('sink', 0, '') as string; const applicationProperties = this.getNodeParameter('headerParametersJson', 0, {}) as string | object; const options = this.getNodeParameter('options', 0, {}) as IDataObject; + const container_id = options.containerID as string; + const containerReconnect = options.reconnect as boolean || true ; + const containerReconnectLimit = options.reconnectLimit as number || 50; - let headerProperties = applicationProperties; + let headerProperties : Dictionary; if (typeof applicationProperties === 'string' && applicationProperties !== '') { headerProperties = JSON.parse(applicationProperties); - } + } else { + headerProperties = applicationProperties as object; + } if (sink === '') { throw new Error('Queue or Topic required!'); @@ -90,28 +116,27 @@ export class Amqp implements INodeType { const container = require('rhea'); + /* + Values are documentet here: https://github.com/amqp/rhea#container + */ const connectOptions: ContainerOptions = { host: credentials.hostname, hostname: credentials.hostname, port: credentials.port, - reconnect: true, // this id the default anyway - reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm + reconnect: containerReconnect, + reconnect_limit: containerReconnectLimit, + username: credentials.username ? credentials.username : undefined, + password: credentials.password ? credentials.password : undefined, + transport: credentials.transportType ? credentials.transportType : undefined, + container_id: container_id ? container_id : undefined, + id: container_id ? container_id : undefined, }; - if (credentials.username || credentials.password) { - container.options.username = credentials.username; - container.options.password = credentials.password; - connectOptions.username = credentials.username; - connectOptions.password = credentials.password; - } - if (credentials.transportType !== '') { - connectOptions.transport = credentials.transportType; - } - const conn = container.connect(connectOptions); + const sender = conn.open_sender(sink); const responseData: IDataObject[] = await new Promise((resolve) => { - container.once('sendable', (context: any) => { // tslint:disable-line:no-any + container.once('sendable', (context: EventContext) => { const returnData = []; const items = this.getInputData(); @@ -129,12 +154,12 @@ export class Amqp implements INodeType { body = JSON.stringify(body); } - const result = context.sender.send({ + const result = context.sender?.send({ application_properties: headerProperties, body, }); - returnData.push({ id: result.id }); + returnData.push({ id: result?.id }); } resolve(returnData); From a4d45e4edadd81d956f61551b88e4717b83e431f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=BCnther=20Erb?= Date: Wed, 30 Dec 2020 15:26:00 +0100 Subject: [PATCH 5/5] move to create_container on rhea --- packages/nodes-base/nodes/Amqp/Amqp.node.ts | 5 +++-- packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts | 7 +++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/nodes-base/nodes/Amqp/Amqp.node.ts b/packages/nodes-base/nodes/Amqp/Amqp.node.ts index 653d0dff92..ab7f2e21b8 100644 --- a/packages/nodes-base/nodes/Amqp/Amqp.node.ts +++ b/packages/nodes-base/nodes/Amqp/Amqp.node.ts @@ -1,4 +1,5 @@ -import { ContainerOptions, Delivery, Dictionary, EventContext } from 'rhea'; +import { ContainerOptions, Dictionary, EventContext } from 'rhea'; +import rhea = require('rhea'); import { IExecuteFunctions } from 'n8n-core'; import { @@ -114,7 +115,7 @@ export class Amqp implements INodeType { throw new Error('Queue or Topic required!'); } - const container = require('rhea'); + const container = rhea.create_container(); /* Values are documentet here: https://github.com/amqp/rhea#container diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index 9a5d9f204e..3b3bf8a553 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -1,4 +1,5 @@ -import { Container, ContainerOptions, EventContext, Message, ReceiverOptions } from 'rhea'; +import { ContainerOptions, EventContext, Message, ReceiverOptions } from 'rhea'; +import rhea = require("rhea"); import { ITriggerFunctions } from 'n8n-core'; import { @@ -163,8 +164,7 @@ export class AmqpTrigger implements INodeType { durable = true; } - const container: Container = require('rhea'); - + const container = rhea.create_container(); let lastMsgId: string| number | Buffer | undefined = undefined; const self = this; @@ -179,7 +179,6 @@ export class AmqpTrigger implements INodeType { if(!context.message) return; - console.log("New Message on Amqp Trigger from " + container.id + " context conteaineer id: " + context.container.id); // ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code if (context.message.message_id && context.message.message_id === lastMsgId) { return;