🔀 Merge branch 'Fix-AMQP-Nodes'

This commit is contained in:
Jan Oberhauser 2020-08-06 14:44:50 +02:00
commit 4596aaf5d5
2 changed files with 75 additions and 6 deletions

View file

@ -2,6 +2,7 @@ import { ContainerOptions, Delivery } from 'rhea';
import { IExecuteSingleFunctions } from 'n8n-core'; import { IExecuteSingleFunctions } from 'n8n-core';
import { import {
IDataObject,
INodeExecutionData, INodeExecutionData,
INodeType, INodeType,
INodeTypeDescription, INodeTypeDescription,
@ -41,7 +42,23 @@ export class Amqp implements INodeType {
type: 'json', type: 'json',
default: '', default: '',
description: 'Header parameters as JSON (flat object). Sent as application_properties in amqp-message meta info.', description: 'Header parameters as JSON (flat object). Sent as application_properties in amqp-message meta info.',
} },
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'Data as Object',
name: 'dataAsObject',
type: 'boolean',
default: false,
description: 'Send the data as an object.',
},
],
},
] ]
}; };
@ -55,9 +72,10 @@ export class Amqp implements INodeType {
const sink = this.getNodeParameter('sink', '') as string; const sink = this.getNodeParameter('sink', '') as string;
const applicationProperties = this.getNodeParameter('headerParametersJson', {}) as string | object; const applicationProperties = this.getNodeParameter('headerParametersJson', {}) as string | object;
const options = this.getNodeParameter('options', {}) as IDataObject;
let headerProperties = applicationProperties; let headerProperties = applicationProperties;
if(typeof applicationProperties === 'string' && applicationProperties !== '') { if (typeof applicationProperties === 'string' && applicationProperties !== '') {
headerProperties = JSON.parse(applicationProperties); headerProperties = JSON.parse(applicationProperties);
} }
@ -71,7 +89,7 @@ export class Amqp implements INodeType {
host: credentials.hostname, host: credentials.hostname,
port: credentials.port, port: credentials.port,
reconnect: true, // this id the default anyway reconnect: true, // this id the default anyway
reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm
}; };
if (credentials.username || credentials.password) { if (credentials.username || credentials.password) {
container.options.username = credentials.username; container.options.username = credentials.username;
@ -81,9 +99,15 @@ export class Amqp implements INodeType {
const allSent = new Promise(( resolve ) => { const allSent = new Promise(( resolve ) => {
container.on('sendable', (context: any) => { // tslint:disable-line:no-any container.on('sendable', (context: any) => { // tslint:disable-line:no-any
let body: IDataObject | string = item.json;
if (options.dataAsObject !== true) {
body = JSON.stringify(body);
}
const message = { const message = {
application_properties: headerProperties, application_properties: headerProperties,
body: JSON.stringify(item) body
}; };
const sendResult = context.sender.send(message); const sendResult = context.sender.send(message);

View file

@ -2,6 +2,7 @@ import { ContainerOptions } from 'rhea';
import { ITriggerFunctions } from 'n8n-core'; import { ITriggerFunctions } from 'n8n-core';
import { import {
IDataObject,
INodeType, INodeType,
INodeTypeDescription, INodeTypeDescription,
ITriggerResponse, ITriggerResponse,
@ -53,6 +54,29 @@ export class AmqpTrigger implements INodeType {
placeholder: 'for durable/persistent topic subscriptions, example: "order-worker"', placeholder: 'for durable/persistent topic subscriptions, example: "order-worker"',
description: 'Leave empty for non-durable topic subscriptions or queues', description: 'Leave empty for non-durable topic subscriptions or queues',
}, },
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'Only Body',
name: 'onlyBody',
type: 'boolean',
default: false,
description: 'Returns only the body property.',
},
{
displayName: 'JSON Parse Body',
name: 'jsonParseBody',
type: 'boolean',
default: false,
description: 'Parse the body to an object.',
},
],
},
] ]
}; };
@ -67,12 +91,15 @@ export class AmqpTrigger implements INodeType {
const sink = this.getNodeParameter('sink', '') as string; const sink = this.getNodeParameter('sink', '') as string;
const clientname = this.getNodeParameter('clientname', '') as string; const clientname = this.getNodeParameter('clientname', '') as string;
const subscription = this.getNodeParameter('subscription', '') as string; const subscription = this.getNodeParameter('subscription', '') as string;
const options = this.getNodeParameter('options', {}) as IDataObject;
if (sink === '') { if (sink === '') {
throw new Error('Queue or Topic required!'); throw new Error('Queue or Topic required!');
} }
let durable = false; let durable = false;
if(subscription && clientname) {
if (subscription && clientname) {
durable = true; durable = true;
} }
@ -98,7 +125,17 @@ export class AmqpTrigger implements INodeType {
lastMsgId = context.message.message_id; lastMsgId = context.message.message_id;
return; return;
} }
self.emit([self.helpers.returnJsonArray([context.message])]);
let data = context.message;
if (options.jsonParseBody === true) {
data.body = JSON.parse(data.body);
}
if (options.onlyBody === true) {
data = data.body;
}
self.emit([self.helpers.returnJsonArray([data])]);
}); });
const connection = container.connect(connectOptions); const connection = container.connect(connectOptions);
@ -141,6 +178,14 @@ export class AmqpTrigger implements INodeType {
reject(new Error('Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.')); reject(new Error('Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.'));
}, 30000); }, 30000);
container.on('message', (context: any) => { // tslint:disable-line:no-any container.on('message', (context: any) => { // tslint:disable-line:no-any
// Check if the only property present in the message is body
// in which case we only emit the content of the body property
// otherwise we emit all properties and their content
if (Object.keys(context.message)[0] === 'body' && Object.keys(context.message).length === 1) {
self.emit([self.helpers.returnJsonArray([context.message.body])]);
} else {
self.emit([self.helpers.returnJsonArray([context.message])]);
}
clearTimeout(timeoutHandler); clearTimeout(timeoutHandler);
resolve(true); resolve(true);
}); });