diff --git a/packages/nodes-base/nodes/Amqp/Amqp.node.ts b/packages/nodes-base/nodes/Amqp/Amqp.node.ts index 50f6f20a90..3fd8133711 100644 --- a/packages/nodes-base/nodes/Amqp/Amqp.node.ts +++ b/packages/nodes-base/nodes/Amqp/Amqp.node.ts @@ -1,6 +1,6 @@ import { ContainerOptions, Delivery } from 'rhea'; -import { IExecuteSingleFunctions } from 'n8n-core'; +import { IExecuteFunctions } from 'n8n-core'; import { IDataObject, INodeExecutionData, @@ -66,20 +66,18 @@ export class Amqp implements INodeType { }, ], }, - ] + ], }; - async executeSingle(this: IExecuteSingleFunctions): Promise { - const item = this.getInputData(); - + async execute(this: IExecuteFunctions): Promise < INodeExecutionData[][] > { const credentials = this.getCredentials('amqp'); if (!credentials) { throw new Error('Credentials are mandatory!'); } - const sink = this.getNodeParameter('sink', '') as string; - const applicationProperties = this.getNodeParameter('headerParametersJson', {}) as string | object; - const options = this.getNodeParameter('options', {}) as IDataObject; + const sink = this.getNodeParameter('sink', 0, '') as string; + const applicationProperties = this.getNodeParameter('headerParametersJson', 0, {}) as string | object; + const options = this.getNodeParameter('options', 0, {}) as IDataObject; let headerProperties = applicationProperties; if (typeof applicationProperties === 'string' && applicationProperties !== '') { @@ -109,39 +107,43 @@ export class Amqp implements INodeType { connectOptions.transport = credentials.transportType; } - const allSent = new Promise(( resolve ) => { - container.once('sendable', (context: any) => { // tslint:disable-line:no-any - - let body: IDataObject | string = item.json; - const sendOnlyProperty = options.sendOnlyProperty as string; - - if (sendOnlyProperty) { - body = body[sendOnlyProperty] as string; - } - - if (options.dataAsObject !== true) { - body = JSON.stringify(body); - } - - const message = { - application_properties: headerProperties, - body - }; - - const sendResult = context.sender.send(message); - - resolve(sendResult); - }); - }); - const conn = container.connect(connectOptions); const sender = conn.open_sender(sink); - const sendResult: Delivery = await allSent as Delivery; // sendResult has a a property that causes circular reference if returned + const responseData: IDataObject[] = await new Promise((resolve) => { + container.once('sendable', (context: any) => { // tslint:disable-line:no-any + const returnData = []; + + const items = this.getInputData(); + for (let i = 0; i < items.length; i++) { + const item = items[i]; + + let body: IDataObject | string = item.json; + const sendOnlyProperty = options.sendOnlyProperty as string; + + if (sendOnlyProperty) { + body = body[sendOnlyProperty] as string; + } + + if (options.dataAsObject !== true) { + body = JSON.stringify(body); + } + + const result = context.sender.send({ + application_properties: headerProperties, + body, + }); + + returnData.push({ id: result.id }); + } + + resolve(returnData); + }); + }); sender.close(); conn.close(); - return { json: { id: sendResult.id } } as INodeExecutionData; + return [this.helpers.returnJsonArray(responseData)]; } } diff --git a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts index d63d785c24..40af50c4f2 100644 --- a/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts +++ b/packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts @@ -55,20 +55,6 @@ export class AmqpTrigger implements INodeType { description: 'Leave empty for non-durable topic subscriptions or queues', }, { - displayName: 'Pull N Messages per Cicle', - name: 'pullMessagesNumber', - type: 'number', - default: 100, - description: 'Number of messages to pull from the bus for every cicle', - }, - { - displayName: 'Sleep time after cicle', - name: 'sleepTime', - type: 'number', - default: 10, - description: 'Milliseconds to sleep after every cicle', - }, - { displayName: 'Options', name: 'options', type: 'collection', @@ -96,9 +82,23 @@ export class AmqpTrigger implements INodeType { default: false, description: 'Returns only the body property.', }, + { + displayName: 'Messages per Cicle', + name: 'pullMessagesNumber', + type: 'number', + default: 100, + description: 'Number of messages to pull from the bus for every cicle', + }, + { + displayName: 'Sleep Time', + name: 'sleepTime', + type: 'number', + default: 10, + description: 'Milliseconds to sleep after every cicle.', + }, ], }, - ] + ], }; @@ -113,8 +113,7 @@ export class AmqpTrigger implements INodeType { const clientname = this.getNodeParameter('clientname', '') as string; const subscription = this.getNodeParameter('subscription', '') as string; const options = this.getNodeParameter('options', {}) as IDataObject; - const pullMessagesNumber = this.getNodeParameter('pullMessagesNumber', {}) as number; - const sleepTime = this.getNodeParameter('sleepTime', {}) as number; + const pullMessagesNumber = options.pullMessagesNumber || 100; if (sink === '') { throw new Error('Queue or Topic required!'); @@ -133,7 +132,7 @@ export class AmqpTrigger implements INodeType { port: credentials.port, reconnect: true, // this id the default anyway reconnect_limit: 50, // try for max 50 times, based on a back-off algorithm - container_id: (durable ? clientname : null) + container_id: (durable ? clientname : null), }; if (credentials.username || credentials.password) { // Old rhea implementation. not shure if it is neccessary @@ -146,13 +145,10 @@ export class AmqpTrigger implements INodeType { connectOptions.transport = credentials.transportType; } - - let lastMsgId: number | undefined = undefined; const self = this; - container.on('receiver_open', function (context: any) { - console.log("Connection opened"); + container.on('receiver_open', (context: any) => { // tslint:disable-line:no-any context.receiver.add_credit(pullMessagesNumber); }); @@ -164,11 +160,11 @@ export class AmqpTrigger implements INodeType { lastMsgId = context.message.message_id; let data = context.message; - - if(options.jsonConvertByteArrayToString === true && data.body.content !== undefined) { + + if (options.jsonConvertByteArrayToString === true && data.body.content !== undefined) { // The buffer is not ready... Stringify and parse back to load it. - let cont = JSON.stringify(data.body.content); - data.body = String.fromCharCode.apply(null,JSON.parse(cont).data); + const cont = JSON.stringify(data.body.content); + data.body = String.fromCharCode.apply(null, JSON.parse(cont).data); } if (options.jsonConvertByteArrayToString === true && data.body.content !== undefined) { @@ -187,8 +183,11 @@ export class AmqpTrigger implements INodeType { self.emit([self.helpers.returnJsonArray([data])]); - if(context.receiver.credit ==0) - setTimeout(function(){ context.receiver.add_credit(pullMessagesNumber); }, sleepTime || 0); + if (context.receiver.credit === 0) { + setTimeout(() => { + context.receiver.add_credit(pullMessagesNumber); + }, options.sleepTime as number || 10); + } }); const connection = container.connect(connectOptions); @@ -199,16 +198,16 @@ export class AmqpTrigger implements INodeType { source: { address: sink, durable: 2, - expiry_policy: 'never' + expiry_policy: 'never', }, - credit_window: 0 // prefetch 1 + credit_window: 0, // prefetch 1 }; } else { clientOptions = { source: { address: sink, }, - credit_window: 0 // prefetch 1 + credit_window: 0, // prefetch 1 }; } connection.open_receiver(clientOptions); @@ -217,11 +216,11 @@ export class AmqpTrigger implements INodeType { // The "closeFunction" function gets called by n8n whenever // the workflow gets deactivated and can so clean up. async function closeFunction() { - container.removeAllListeners("receiver_open"); - container.removeAllListeners("message"); + container.removeAllListeners('receiver_open'); + container.removeAllListeners('message'); connection.close(); } - + // The "manualTriggerFunction" function gets called by n8n // when a user is in the workflow editor and starts the // workflow manually. @@ -231,7 +230,7 @@ export class AmqpTrigger implements INodeType { await new Promise((resolve, reject) => { const timeoutHandler = setTimeout(() => { reject(new Error('Aborted, no message received within 30secs. This 30sec timeout is only set for "manually triggered execution". Active Workflows will listen indefinitely.')); - }, 3000); + }, 30000); container.on('message', (context: any) => { // tslint:disable-line:no-any // Check if the only property present in the message is body // in which case we only emit the content of the body property