diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 22c26a163a..a1bea2f47e 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -94,6 +94,36 @@ export class KafkaTrigger implements INodeType { default: false, description: 'Whether to allow sending message to a previously non exisiting topic', }, + { + displayName: 'Auto Commit Threshold', + name: 'autoCommitThreshold', + type: 'number', + default: 0, + description: 'The consumer will commit offsets after resolving a given number of messages', + }, + { + displayName: 'Auto Commit Interval', + name: 'autoCommitInterval', + type: 'number', + default: 0, + description: 'The consumer will commit offsets after a given period, for example, five seconds', + hint: 'Value in milliseconds', + }, + { + displayName: 'Heartbeat Interval', + name: 'heartbeatInterval', + type: 'number', + default: 3000, + description: 'Heartbeats are used to ensure that the consumer\'s session stays active', + hint: 'The value must be set lower than Session Timeout', + }, + { + displayName: 'Max Number of Requests', + name: 'maxInFlightRequests', + type: 'number', + default: 0, + description: 'Max number of requests that may be in progress at any time. If falsey then no limit.', + }, { displayName: 'Read Messages From Beginning', name: 'fromBeginning', @@ -122,13 +152,6 @@ export class KafkaTrigger implements INodeType { default: false, description: 'Whether to return only the message property', }, - { - displayName: 'Session Timeout', - name: 'sessionTimeout', - type: 'number', - default: 30000, - description: 'The time to await a response in ms', - }, { displayName: 'Return Headers', name: 'returnHeaders', @@ -136,6 +159,14 @@ export class KafkaTrigger implements INodeType { default: false, description: 'Whether to return the headers received from Kafka', }, + { + displayName: 'Session Timeout', + name: 'sessionTimeout', + type: 'number', + default: 30000, + description: 'The time to await a response in ms', + hint: 'Value in milliseconds', + }, ], }, ], @@ -175,7 +206,12 @@ export class KafkaTrigger implements INodeType { const kafka = new apacheKafka(config); - const consumer = kafka.consumer({ groupId }); + const consumer = kafka.consumer({ + groupId, + maxInFlightRequests: this.getNodeParameter('options.maxInFlightRequests', 0) as number, + sessionTimeout: this.getNodeParameter('options.sessionTimeout', 30000) as number, + heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number, + }); await consumer.connect(); @@ -191,6 +227,8 @@ export class KafkaTrigger implements INodeType { const startConsumer = async () => { await consumer.run({ + autoCommitInterval: options.autoCommitInterval as number || null, + autoCommitThreshold: options.autoCommitThreshold as number || null, eachMessage: async ({ topic, message }) => { let data: IDataObject = {};