Improvements to AMQP-Node

This commit is contained in:
Jan Oberhauser 2020-08-06 14:42:26 +02:00
parent 3e67617130
commit 47499feff7
2 changed files with 61 additions and 13 deletions

View file

@ -2,6 +2,7 @@ import { ContainerOptions, Delivery } from 'rhea';
import { IExecuteSingleFunctions } from 'n8n-core'; import { IExecuteSingleFunctions } from 'n8n-core';
import { import {
IDataObject,
INodeExecutionData, INodeExecutionData,
INodeType, INodeType,
INodeTypeDescription, INodeTypeDescription,
@ -41,7 +42,23 @@ export class Amqp implements INodeType {
type: 'json', type: 'json',
default: '', default: '',
description: 'Header parameters as JSON (flat object). Sent as application_properties in amqp-message meta info.', description: 'Header parameters as JSON (flat object). Sent as application_properties in amqp-message meta info.',
} },
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'Data as Object',
name: 'dataAsObject',
type: 'boolean',
default: false,
description: 'Send the data as an object.',
},
],
},
] ]
}; };
@ -55,6 +72,7 @@ export class Amqp implements INodeType {
const sink = this.getNodeParameter('sink', '') as string; const sink = this.getNodeParameter('sink', '') as string;
const applicationProperties = this.getNodeParameter('headerParametersJson', {}) as string | object; const applicationProperties = this.getNodeParameter('headerParametersJson', {}) as string | object;
const options = this.getNodeParameter('options', {}) as IDataObject;
let headerProperties = applicationProperties; let headerProperties = applicationProperties;
if (typeof applicationProperties === 'string' && applicationProperties !== '') { if (typeof applicationProperties === 'string' && applicationProperties !== '') {
@ -81,11 +99,17 @@ export class Amqp implements INodeType {
const allSent = new Promise(( resolve ) => { const allSent = new Promise(( resolve ) => {
container.on('sendable', (context: any) => { // tslint:disable-line:no-any container.on('sendable', (context: any) => { // tslint:disable-line:no-any
let body: IDataObject | string = item.json;
if (options.dataAsObject !== true) {
body = JSON.stringify(body);
}
const message = { const message = {
application_properties: headerProperties, application_properties: headerProperties,
body: item.json body
}; };
const sendResult = context.sender.send(message); const sendResult = context.sender.send(message);
resolve(sendResult); resolve(sendResult);

View file

@ -2,10 +2,10 @@ import { ContainerOptions } from 'rhea';
import { ITriggerFunctions } from 'n8n-core'; import { ITriggerFunctions } from 'n8n-core';
import { import {
IDataObject,
INodeType, INodeType,
INodeTypeDescription, INodeTypeDescription,
ITriggerResponse, ITriggerResponse,
IDataObject,
} from 'n8n-workflow'; } from 'n8n-workflow';
@ -54,6 +54,29 @@ export class AmqpTrigger implements INodeType {
placeholder: 'for durable/persistent topic subscriptions, example: "order-worker"', placeholder: 'for durable/persistent topic subscriptions, example: "order-worker"',
description: 'Leave empty for non-durable topic subscriptions or queues', description: 'Leave empty for non-durable topic subscriptions or queues',
}, },
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
options: [
{
displayName: 'Only Body',
name: 'onlyBody',
type: 'boolean',
default: false,
description: 'Returns only the body property.',
},
{
displayName: 'JSON Parse Body',
name: 'jsonParseBody',
type: 'boolean',
default: false,
description: 'Parse the body to an object.',
},
],
},
] ]
}; };
@ -68,7 +91,7 @@ export class AmqpTrigger implements INodeType {
const sink = this.getNodeParameter('sink', '') as string; const sink = this.getNodeParameter('sink', '') as string;
const clientname = this.getNodeParameter('clientname', '') as string; const clientname = this.getNodeParameter('clientname', '') as string;
const subscription = this.getNodeParameter('subscription', '') as string; const subscription = this.getNodeParameter('subscription', '') as string;
const parseJson = this.getNodeParameter('parseJson', '') as boolean; const options = this.getNodeParameter('options', {}) as IDataObject;
if (sink === '') { if (sink === '') {
throw new Error('Queue or Topic required!'); throw new Error('Queue or Topic required!');
@ -103,15 +126,16 @@ export class AmqpTrigger implements INodeType {
return; return;
} }
// Check if the only property present in the message is body let data = context.message;
// in which case we only emit the content of the body property
// otherwise we emit all properties and their content if (options.jsonParseBody === true) {
if (Object.keys(context.message)[0] === 'body' && Object.keys(context.message).length === 1) { data.body = JSON.parse(data.body);
self.emit([self.helpers.returnJsonArray([context.message.body])]);
} else {
self.emit([self.helpers.returnJsonArray([context.message])]);
} }
if (options.onlyBody === true) {
data = data.body;
}
self.emit([self.helpers.returnJsonArray([data])]);
}); });
const connection = container.connect(connectOptions); const connection = container.connect(connectOptions);