feat: Enable parallel processing on multiple queue nodes (#6295)

* Add non-parallel execution

* Add parallel processing for MQTT

* Fix logic expression for trigger

* fixes

* remove unused import

* fix MQTT parallel processing

* fix AMQPTrigger node parallelProcessing

* MQTTTrigger node default parallelProcessing to true

* add AMQP credential test

* improve error handling

---------

Co-authored-by: Marcus <marcus@n8n.io>
This commit is contained in:
agobrech 2023-08-16 13:06:47 +02:00 committed by GitHub
parent 198a977f57
commit 44afcff959
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 6 deletions

View file

@ -40,7 +40,7 @@ export class Amqp implements ICredentialType {
name: 'transportType', name: 'transportType',
type: 'string', type: 'string',
default: '', default: '',
description: 'Optional Transport Type to use', description: 'Optional Transport Type to use. Either tcp or tls.',
}, },
]; ];
} }

View file

@ -1,4 +1,4 @@
import type { ContainerOptions, Dictionary, EventContext } from 'rhea'; import type { Connection, ContainerOptions, Dictionary, EventContext } from 'rhea';
import { create_container } from 'rhea'; import { create_container } from 'rhea';
import type { import type {
@ -7,6 +7,10 @@ import type {
INodeExecutionData, INodeExecutionData,
INodeType, INodeType,
INodeTypeDescription, INodeTypeDescription,
ICredentialTestFunctions,
INodeCredentialTestResult,
ICredentialsDecrypted,
ICredentialDataDecryptedObject,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow'; import { NodeOperationError } from 'n8n-workflow';
@ -28,6 +32,7 @@ export class Amqp implements INodeType {
{ {
name: 'amqp', name: 'amqp',
required: true, required: true,
testedBy: 'amqpConnectionTest',
}, },
], ],
properties: [ properties: [
@ -95,6 +100,52 @@ export class Amqp implements INodeType {
], ],
}; };
methods = {
credentialTest: {
async amqpConnectionTest(
this: ICredentialTestFunctions,
credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> {
const credentials = credential.data as ICredentialDataDecryptedObject;
const connectOptions: ContainerOptions = {
reconnect: false,
host: credentials.hostname as string,
hostname: credentials.hostname as string,
port: credentials.port as number,
username: credentials.username ? (credentials.username as string) : undefined,
password: credentials.password ? (credentials.password as string) : undefined,
transport: credentials.transportType ? (credentials.transportType as string) : undefined,
};
let conn: Connection | undefined = undefined;
try {
const container = create_container();
await new Promise<void>((resolve, reject) => {
container.on('connection_open', function (_contex: EventContext) {
resolve();
});
container.on('disconnected', function (context: EventContext) {
reject(context.error ?? new Error('unknown error'));
});
conn = container.connect(connectOptions);
});
} catch (error) {
return {
status: 'Error',
message: (error as Error).message,
};
} finally {
if (conn) (conn as Connection).close();
}
return {
status: 'OK',
message: 'Connection successful!',
};
},
},
};
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> { async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
try { try {
const credentials = await this.getCredentials('amqp'); const credentials = await this.getCredentials('amqp');

View file

@ -7,6 +7,8 @@ import type {
INodeType, INodeType,
INodeTypeDescription, INodeTypeDescription,
ITriggerResponse, ITriggerResponse,
IDeferredPromise,
IRun,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { deepCopy, jsonParse, NodeOperationError } from 'n8n-workflow'; import { deepCopy, jsonParse, NodeOperationError } from 'n8n-workflow';
@ -100,6 +102,13 @@ export class AmqpTrigger implements INodeType {
default: false, default: false,
description: 'Whether to return only the body property', description: 'Whether to return only the body property',
}, },
{
displayName: 'Parallel Processing',
name: 'parallelProcessing',
type: 'boolean',
default: true,
description: 'Whether to process messages in parallel',
},
{ {
displayName: 'Reconnect', displayName: 'Reconnect',
name: 'reconnect', name: 'reconnect',
@ -133,6 +142,7 @@ export class AmqpTrigger implements INodeType {
const clientname = this.getNodeParameter('clientname', '') as string; const clientname = this.getNodeParameter('clientname', '') as string;
const subscription = this.getNodeParameter('subscription', '') as string; const subscription = this.getNodeParameter('subscription', '') as string;
const options = this.getNodeParameter('options', {}) as IDataObject; const options = this.getNodeParameter('options', {}) as IDataObject;
const parallelProcessing = this.getNodeParameter('options.parallelProcessing', true) as boolean;
const pullMessagesNumber = (options.pullMessagesNumber as number) || 100; const pullMessagesNumber = (options.pullMessagesNumber as number) || 100;
const containerId = options.containerId as string; const containerId = options.containerId as string;
const containerReconnect = (options.reconnect as boolean) || true; const containerReconnect = (options.reconnect as boolean) || true;
@ -156,7 +166,7 @@ export class AmqpTrigger implements INodeType {
context.receiver?.add_credit(pullMessagesNumber); context.receiver?.add_credit(pullMessagesNumber);
}); });
container.on('message', (context: EventContext) => { container.on('message', async (context: EventContext) => {
// No message in the context // No message in the context
if (!context.message) { if (!context.message) {
return; return;
@ -195,7 +205,16 @@ export class AmqpTrigger implements INodeType {
data = data.body; data = data.body;
} }
let responsePromise: IDeferredPromise<IRun> | undefined = undefined;
if (!parallelProcessing) {
responsePromise = await this.helpers.createDeferredPromise();
}
if (responsePromise) {
this.emit([this.helpers.returnJsonArray([data as any])], undefined, responsePromise);
await responsePromise.promise();
} else {
this.emit([this.helpers.returnJsonArray([data as any])]); this.emit([this.helpers.returnJsonArray([data as any])]);
}
if (!context.receiver?.has_credit()) { if (!context.receiver?.has_credit()) {
setTimeout( setTimeout(

View file

@ -4,6 +4,8 @@ import type {
INodeType, INodeType,
INodeTypeDescription, INodeTypeDescription,
ITriggerResponse, ITriggerResponse,
IDeferredPromise,
IRun,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow'; import { NodeOperationError } from 'n8n-workflow';
@ -71,6 +73,14 @@ export class MqttTrigger implements INodeType {
default: false, default: false,
description: 'Whether to return only the message property', description: 'Whether to return only the message property',
}, },
{
displayName: 'Parallel Processing',
name: 'parallelProcessing',
type: 'boolean',
default: true,
description:
'Whether to process messages in parallel or by keeping the message in order',
},
], ],
}, },
], ],
@ -89,6 +99,7 @@ export class MqttTrigger implements INodeType {
} }
const options = this.getNodeParameter('options') as IDataObject; const options = this.getNodeParameter('options') as IDataObject;
const parallelProcessing = this.getNodeParameter('options.parallelProcessing', true) as boolean;
if (!topics) { if (!topics) {
throw new NodeOperationError(this.getNode(), 'Topics are mandatory!'); throw new NodeOperationError(this.getNode(), 'Topics are mandatory!');
@ -147,7 +158,7 @@ export class MqttTrigger implements INodeType {
if (error) { if (error) {
reject(error); reject(error);
} }
client.on('message', (topic: string, message: Buffer | string) => { client.on('message', async (topic: string, message: Buffer | string) => {
let result: IDataObject = {}; let result: IDataObject = {};
message = message.toString(); message = message.toString();
@ -165,7 +176,15 @@ export class MqttTrigger implements INodeType {
//@ts-ignore //@ts-ignore
result = [message as string]; result = [message as string];
} }
this.emit([this.helpers.returnJsonArray(result)]);
let responsePromise: IDeferredPromise<IRun> | undefined;
if (!parallelProcessing) {
responsePromise = await this.helpers.createDeferredPromise();
}
this.emit([this.helpers.returnJsonArray([result])], undefined, responsePromise);
if (responsePromise) {
await responsePromise.promise();
}
resolve(true); resolve(true);
}); });
}); });