diff --git a/packages/nodes-base/credentials/Amqp.credentials.ts b/packages/nodes-base/credentials/Amqp.credentials.ts index 2f0b4344aa..32168c49c4 100644 --- a/packages/nodes-base/credentials/Amqp.credentials.ts +++ b/packages/nodes-base/credentials/Amqp.credentials.ts @@ -36,5 +36,11 @@ export class Amqp implements ICredentialType { }, default: '', }, + { + displayName: 'Transport Type', + name: 'transportType', + type: 'string' as NodePropertyTypes, + default: '', + }, ]; } diff --git a/packages/nodes-base/nodes/Amqp/Amqp.node.ts b/packages/nodes-base/nodes/Amqp/Amqp.node.ts index cad15c6ea0..6e8558cd2c 100644 --- a/packages/nodes-base/nodes/Amqp/Amqp.node.ts +++ b/packages/nodes-base/nodes/Amqp/Amqp.node.ts @@ -57,9 +57,16 @@ export class Amqp implements INodeType { default: false, description: 'Send the data as an object.', }, + { + displayName: 'Send property', + name: 'sendOnlyProperty', + type: 'string', + default: '', + description: 'Send only this property - If empty the hole Json will be sent', + }, ], }, - ], + ] }; async executeSingle(this: IExecuteSingleFunctions): Promise { @@ -87,6 +94,7 @@ export class Amqp implements INodeType { const connectOptions: ContainerOptions = { host: credentials.hostname, + hostname: credentials.hostname, port: credentials.port, reconnect: true, // this id the default anyway reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm @@ -94,12 +102,23 @@ export class Amqp implements INodeType { if (credentials.username || credentials.password) { container.options.username = credentials.username; container.options.password = credentials.password; + connectOptions.username = credentials.username; + connectOptions.password = credentials.password; + } + if (credentials.transportType) { + connectOptions.transport = credentials.transportType; } const allSent = new Promise(( resolve ) => { container.on('sendable', (context: any) => { // tslint:disable-line:no-any let body: IDataObject | string = item.json; + let prop = options.sendOnlyProperty as string; + + if(prop) + { + body = body[prop] as string; + } if (options.dataAsObject !== true) { body = JSON.stringify(body); @@ -107,7 +126,7 @@ export class Amqp implements INodeType { const message = { application_properties: headerProperties, - body, + body }; const sendResult = context.sender.send(message); diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index a6619c0409..ce7cc85eef 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -75,9 +75,16 @@ export class AmqpTrigger implements INodeType { default: false, description: 'Parse the body to an object.', }, + { + displayName: 'Convert JSON Body content from Byte Array to string', + name: 'jsonConvertByteArrayToString', + type: 'boolean', + default: false, + description: 'Convert JSON Body content (["body"]["content"]) from Byte Array to string - Azure Service Bus', + } ], }, - ], + ] }; @@ -106,27 +113,42 @@ export class AmqpTrigger implements INodeType { const container = require('rhea'); const connectOptions: ContainerOptions = { host: credentials.hostname, + hostname: credentials.hostname, port: credentials.port, reconnect: true, // this id the default anyway reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm - container_id: (durable ? clientname : null), + container_id: (durable ? clientname : null) }; if (credentials.username || credentials.password) { + // Old rhea implementation. not shure if it is neccessary container.options.username = credentials.username; container.options.password = credentials.password; + connectOptions.username = credentials.username; + connectOptions.password = credentials.password; + } + if (credentials.transportType) { + connectOptions.transport = credentials.transportType; } + let lastMsgId: number | undefined = undefined; const self = this; container.on('message', (context: any) => { // tslint:disable-line:no-any + // ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code if (context.message.message_id && context.message.message_id === lastMsgId) { - // ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code - lastMsgId = context.message.message_id; return; } + console.log("new Message", context.message.message_id, lastMsgId); + lastMsgId = context.message.message_id; let data = context.message; + + if(options.jsonConvertByteArrayToString === true && data.body.content !== undefined) { + // The buffer is not ready... Stringify and parse back to load it. + let cont = JSON.stringify(data.body.content); + data.body = String.fromCharCode.apply(null,JSON.parse(cont).data); + } if (options.jsonParseBody === true) { data.body = JSON.parse(data.body); @@ -135,6 +157,7 @@ export class AmqpTrigger implements INodeType { data = data.body; } + self.emit([self.helpers.returnJsonArray([data])]); }); @@ -146,16 +169,16 @@ export class AmqpTrigger implements INodeType { source: { address: sink, durable: 2, - expiry_policy: 'never', + expiry_policy: 'never' }, - credit_window: 1, // prefetch 1 + credit_window: 1 // prefetch 1 }; } else { clientOptions = { source: { address: sink, }, - credit_window: 1, // prefetch 1 + credit_window: 1 // prefetch 1 }; } connection.open_receiver(clientOptions); @@ -176,7 +199,7 @@ export class AmqpTrigger implements INodeType { await new Promise(( resolve, reject ) => { const timeoutHandler = setTimeout(() => { reject(new Error('Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.')); - }, 30000); + }, 3000); container.on('message', (context: any) => { // tslint:disable-line:no-any // Check if the only property present in the message is body // in which case we only emit the content of the body property