feat(Kafka Trigger Node): Add non-parallel execution (#6175)

* Fix typo, add v1.1

* Add parallel processing

* Add versioning

* Improve description for maximum inflight requests

---------

Co-authored-by: Michael Kret <michael.k@radency.com>
This commit is contained in:
agobrech 2023-05-04 17:26:56 +02:00 committed by GitHub
parent 0eb4d9fc16
commit 814ea5185c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -9,8 +9,9 @@ import type {
INodeType,
INodeTypeDescription,
ITriggerResponse,
IRun,
} from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';
import { createDeferredPromise, NodeOperationError } from 'n8n-workflow';
export class KafkaTrigger implements INodeType {
description: INodeTypeDescription = {
@ -18,7 +19,7 @@ export class KafkaTrigger implements INodeType {
name: 'kafkaTrigger',
icon: 'file:kafka.svg',
group: ['trigger'],
version: 1,
version: [1, 1.1],
description: 'Consume messages from a Kafka topic',
defaults: {
name: 'Kafka Trigger',
@ -116,7 +117,7 @@ export class KafkaTrigger implements INodeType {
type: 'number',
default: 1,
description:
'Max number of requests that may be in progress at any time. If falsey then no limit.',
'The maximum number of unacknowledged requests the client will send on a single connection',
},
{
displayName: 'Read Messages From Beginning',
@ -132,6 +133,19 @@ export class KafkaTrigger implements INodeType {
default: false,
description: 'Whether to try to parse the message to an object',
},
{
displayName: 'Parallel Processing',
name: 'parallelProcessing',
type: 'boolean',
default: true,
displayOptions: {
hide: {
'@version': [1],
},
},
description:
'Whether to process messages in parallel or by keeping the message in order',
},
{
displayName: 'Only Message',
name: 'onlyMessage',
@ -177,6 +191,10 @@ export class KafkaTrigger implements INodeType {
const ssl = credentials.ssl as boolean;
const options = this.getNodeParameter('options', {}) as IDataObject;
options.nodeVersion = this.getNode().typeVersion;
const config: KafkaConfig = {
clientId,
brokers,
@ -213,9 +231,9 @@ export class KafkaTrigger implements INodeType {
heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number,
});
await consumer.connect();
const parallelProcessing = options.parallelProcessing as boolean;
const options = this.getNodeParameter('options', {}) as IDataObject;
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: options.fromBeginning ? true : false });
@ -261,8 +279,16 @@ export class KafkaTrigger implements INodeType {
//@ts-ignore
data = value;
}
this.emit([this.helpers.returnJsonArray([data])]);
let responsePromise = undefined;
if (!parallelProcessing && (options.nodeVersion as number) > 1) {
responsePromise = await createDeferredPromise<IRun>();
this.emit([this.helpers.returnJsonArray([data])], undefined, responsePromise);
} else {
this.emit([this.helpers.returnJsonArray([data])]);
}
if (responsePromise) {
await responsePromise.promise();
}
},
});
};