From 1a1accce437b0b939113ba65bf39c6c140d12b1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=BCnther=20Erb?= Date: Wed, 30 Dec 2020 13:57:53 +0100 Subject: [PATCH] Fix Subscription --- .../nodes-base/nodes/Amqp/AmqpTrigger.node.ts | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index 019f533f80..9a5d9f204e 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -1,4 +1,4 @@ -import { Container, ContainerOptions, EventContext, ReceiverOptions } from 'rhea'; +import { Container, ContainerOptions, EventContext, Message, ReceiverOptions } from 'rhea'; import { ITriggerFunctions } from 'n8n-core'; import { @@ -169,11 +169,11 @@ export class AmqpTrigger implements INodeType { let lastMsgId: string| number | Buffer | undefined = undefined; const self = this; - container.on('receiver_open', (context: any) => { // tslint:disable-line:no-any - context.receiver.add_credit(pullMessagesNumber); + container.on('receiver_open', (context: EventContext) => { + context.receiver?.add_credit(pullMessagesNumber); }); - container.on('message', (context: EventContext) => { // tslint:disable-line:no-any + container.on('message', (context: EventContext) => { // No message in the context if(!context.message) @@ -241,6 +241,7 @@ export class AmqpTrigger implements INodeType { const connection = container.connect(connectOptions); let clientOptions : ReceiverOptions = { + name: subscription ? subscription : undefined, source: { address: sink, durable: (durable ? 2 : undefined), @@ -269,14 +270,15 @@ export class AmqpTrigger implements INodeType { const timeoutHandler = setTimeout(() => { reject(new Error('Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.')); }, 3000); - container.on('message', (context: any) => { // tslint:disable-line:no-any + container.on('message', (context: EventContext) => { // Check if the only property present in the message is body // in which case we only emit the content of the body property // otherwise we emit all properties and their content - if (Object.keys(context.message)[0] === 'body' && Object.keys(context.message).length === 1) { - self.emit([self.helpers.returnJsonArray([context.message.body])]); + const message = context.message as Message; + if (Object.keys(message)[0] === 'body' && Object.keys(message).length === 1) { + self.emit([self.helpers.returnJsonArray([message.body])]); } else { - self.emit([self.helpers.returnJsonArray([context.message])]); + self.emit([self.helpers.returnJsonArray([message as any])]); } clearTimeout(timeoutHandler); resolve(true);