Adjusted message send format, adjusted for different formats in incoming messages

This commit is contained in:
Rupenieks 2020-08-05 10:09:36 +01:00
parent 420777c0b3
commit 37d5d864e4
2 changed files with 27 additions and 6 deletions

View file

@ -57,7 +57,7 @@ export class Amqp implements INodeType {
const applicationProperties = this.getNodeParameter('headerParametersJson', {}) as string | object;
let headerProperties = applicationProperties;
if(typeof applicationProperties === 'string' && applicationProperties !== '') {
if (typeof applicationProperties === 'string' && applicationProperties !== '') {
headerProperties = JSON.parse(applicationProperties);
}
@ -71,7 +71,7 @@ export class Amqp implements INodeType {
host: credentials.hostname,
port: credentials.port,
reconnect: true, // this id the default anyway
reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm
reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm
};
if (credentials.username || credentials.password) {
container.options.username = credentials.username;
@ -83,9 +83,9 @@ export class Amqp implements INodeType {
const message = {
application_properties: headerProperties,
body: JSON.stringify(item)
body: item.json
};
const sendResult = context.sender.send(message);
resolve(sendResult);

View file

@ -5,6 +5,7 @@ import {
INodeType,
INodeTypeDescription,
ITriggerResponse,
IDataObject,
} from 'n8n-workflow';
@ -67,12 +68,15 @@ export class AmqpTrigger implements INodeType {
const sink = this.getNodeParameter('sink', '') as string;
const clientname = this.getNodeParameter('clientname', '') as string;
const subscription = this.getNodeParameter('subscription', '') as string;
const parseJson = this.getNodeParameter('parseJson', '') as boolean;
if (sink === '') {
throw new Error('Queue or Topic required!');
}
let durable = false;
if(subscription && clientname) {
if (subscription && clientname) {
durable = true;
}
@ -98,7 +102,16 @@ export class AmqpTrigger implements INodeType {
lastMsgId = context.message.message_id;
return;
}
self.emit([self.helpers.returnJsonArray([context.message])]);
// Check if the only property present in the message is body
// in which case we only emit the content of the body property
// otherwise we emit all properties and their content
if (Object.keys(context.message)[0] === 'body' && Object.keys(context.message).length === 1) {
self.emit([self.helpers.returnJsonArray([context.message.body])]);
} else {
self.emit([self.helpers.returnJsonArray([context.message])]);
}
});
const connection = container.connect(connectOptions);
@ -141,6 +154,14 @@ export class AmqpTrigger implements INodeType {
reject(new Error('Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.'));
}, 30000);
container.on('message', (context: any) => { // tslint:disable-line:no-any
// Check if the only property present in the message is body
// in which case we only emit the content of the body property
// otherwise we emit all properties and their content
if (Object.keys(context.message)[0] === 'body' && Object.keys(context.message).length === 1) {
self.emit([self.helpers.returnJsonArray([context.message.body])]);
} else {
self.emit([self.helpers.returnJsonArray([context.message])]);
}
clearTimeout(timeoutHandler);
resolve(true);
});