diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 1ff27bd302..a3d871eaaa 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -9,8 +9,9 @@ import type { INodeType, INodeTypeDescription, ITriggerResponse, + IRun, } from 'n8n-workflow'; -import { NodeOperationError } from 'n8n-workflow'; +import { createDeferredPromise, NodeOperationError } from 'n8n-workflow'; export class KafkaTrigger implements INodeType { description: INodeTypeDescription = { @@ -18,7 +19,7 @@ export class KafkaTrigger implements INodeType { name: 'kafkaTrigger', icon: 'file:kafka.svg', group: ['trigger'], - version: 1, + version: [1, 1.1], description: 'Consume messages from a Kafka topic', defaults: { name: 'Kafka Trigger', @@ -116,7 +117,7 @@ export class KafkaTrigger implements INodeType { type: 'number', default: 1, description: - 'Max number of requests that may be in progress at any time. If falsey then no limit.', + 'The maximum number of unacknowledged requests the client will send on a single connection', }, { displayName: 'Read Messages From Beginning', @@ -132,6 +133,19 @@ export class KafkaTrigger implements INodeType { default: false, description: 'Whether to try to parse the message to an object', }, + { + displayName: 'Parallel Processing', + name: 'parallelProcessing', + type: 'boolean', + default: true, + displayOptions: { + hide: { + '@version': [1], + }, + }, + description: + 'Whether to process messages in parallel or by keeping the message in order', + }, { displayName: 'Only Message', name: 'onlyMessage', @@ -177,6 +191,10 @@ export class KafkaTrigger implements INodeType { const ssl = credentials.ssl as boolean; + const options = this.getNodeParameter('options', {}) as IDataObject; + + options.nodeVersion = this.getNode().typeVersion; + const config: KafkaConfig = { clientId, brokers, @@ -213,9 +231,9 @@ export class KafkaTrigger implements INodeType { heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number, }); - await consumer.connect(); + const parallelProcessing = options.parallelProcessing as boolean; - const options = this.getNodeParameter('options', {}) as IDataObject; + await consumer.connect(); await consumer.subscribe({ topic, fromBeginning: options.fromBeginning ? true : false }); @@ -261,8 +279,16 @@ export class KafkaTrigger implements INodeType { //@ts-ignore data = value; } - - this.emit([this.helpers.returnJsonArray([data])]); + let responsePromise = undefined; + if (!parallelProcessing && (options.nodeVersion as number) > 1) { + responsePromise = await createDeferredPromise(); + this.emit([this.helpers.returnJsonArray([data])], undefined, responsePromise); + } else { + this.emit([this.helpers.returnJsonArray([data])]); + } + if (responsePromise) { + await responsePromise.promise(); + } }, }); };