Add new Options to pass to the RHEA Container

This commit is contained in:
Günther Erb 2020-12-30 12:42:48 +01:00
parent 638310fa94
commit dfa847cfee

View file

@ -96,6 +96,27 @@ export class AmqpTrigger implements INodeType {
default: 10, default: 10,
description: 'Milliseconds to sleep after every cicle.', description: 'Milliseconds to sleep after every cicle.',
}, },
{
displayName: 'Container ID',
name: 'containerID',
type: 'string',
default: '',
description: 'Will be used to pass to the RHEA Backend as container_id',
},
{
displayName: 'Reconnect',
name: 'reconnect',
type: 'boolean',
default: true,
description: 'If on, the library will automatically attempt to reconnect if disconnected',
},
{
displayName: 'Reconnect limit',
name: 'reconnectLimit',
type: 'number',
default: 50,
description: 'maximum number of reconnect attempts',
},
], ],
}, },
], ],
@ -114,6 +135,9 @@ export class AmqpTrigger implements INodeType {
const subscription = this.getNodeParameter('subscription', '') as string; const subscription = this.getNodeParameter('subscription', '') as string;
const options = this.getNodeParameter('options', {}) as IDataObject; const options = this.getNodeParameter('options', {}) as IDataObject;
const pullMessagesNumber = options.pullMessagesNumber || 100; const pullMessagesNumber = options.pullMessagesNumber || 100;
const container_id = options.containerID as string;
const containerReconnect = options.reconnect || true as boolean;
const containerReconnectLimit = options.reconnectLimit || 50 as number;
if (sink === '') { if (sink === '') {
throw new Error('Queue or Topic required!'); throw new Error('Queue or Topic required!');
@ -126,24 +150,28 @@ export class AmqpTrigger implements INodeType {
} }
const container = require('rhea'); const container = require('rhea');
/*
Values are documentet here: https://github.com/amqp/rhea#container
*/
const connectOptions: ContainerOptions = { const connectOptions: ContainerOptions = {
host: credentials.hostname, host: credentials.hostname,
hostname: credentials.hostname, hostname: credentials.hostname,
port: credentials.port, port: credentials.port,
reconnect: true, // this id the default anyway reconnect: containerReconnect,
reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm reconnect_limit: containerReconnectLimit,
container_id: (durable ? clientname : null),
}; };
if (credentials.username || credentials.password) { if (credentials.username || credentials.password) {
// Old rhea implementation. not shure if it is neccessary
container.options.username = credentials.username;
container.options.password = credentials.password;
connectOptions.username = credentials.username; connectOptions.username = credentials.username;
connectOptions.password = credentials.password; connectOptions.password = credentials.password;
} }
if (credentials.transportType) { if (credentials.transportType) {
connectOptions.transport = credentials.transportType; connectOptions.transport = credentials.transportType;
} }
if(container_id) {
connectOptions.id = container_id;
connectOptions.container_id = container_id;
}
let lastMsgId: number | undefined = undefined; let lastMsgId: number | undefined = undefined;
const self = this; const self = this;