feat(Kafka Trigger Node): Add additional options (#3600)

* 🔨 additional options to kafka trigger

*  option for maxInFlightRequests

*  Small change

Co-authored-by: ricardo <ricardoespinoza105@gmail.com>
This commit is contained in:
Michael Kret 2022-07-27 18:00:39 +03:00 committed by GitHub
parent 3ebfa45570
commit 3496a39788
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -94,6 +94,36 @@ export class KafkaTrigger implements INodeType {
default: false,
description: 'Whether to allow sending message to a previously non exisiting topic',
},
{
displayName: 'Auto Commit Threshold',
name: 'autoCommitThreshold',
type: 'number',
default: 0,
description: 'The consumer will commit offsets after resolving a given number of messages',
},
{
displayName: 'Auto Commit Interval',
name: 'autoCommitInterval',
type: 'number',
default: 0,
description: 'The consumer will commit offsets after a given period, for example, five seconds',
hint: 'Value in milliseconds',
},
{
displayName: 'Heartbeat Interval',
name: 'heartbeatInterval',
type: 'number',
default: 3000,
description: 'Heartbeats are used to ensure that the consumer\'s session stays active',
hint: 'The value must be set lower than Session Timeout',
},
{
displayName: 'Max Number of Requests',
name: 'maxInFlightRequests',
type: 'number',
default: 0,
description: 'Max number of requests that may be in progress at any time. If falsey then no limit.',
},
{
displayName: 'Read Messages From Beginning',
name: 'fromBeginning',
@ -122,13 +152,6 @@ export class KafkaTrigger implements INodeType {
default: false,
description: 'Whether to return only the message property',
},
{
displayName: 'Session Timeout',
name: 'sessionTimeout',
type: 'number',
default: 30000,
description: 'The time to await a response in ms',
},
{
displayName: 'Return Headers',
name: 'returnHeaders',
@ -136,6 +159,14 @@ export class KafkaTrigger implements INodeType {
default: false,
description: 'Whether to return the headers received from Kafka',
},
{
displayName: 'Session Timeout',
name: 'sessionTimeout',
type: 'number',
default: 30000,
description: 'The time to await a response in ms',
hint: 'Value in milliseconds',
},
],
},
],
@ -175,7 +206,12 @@ export class KafkaTrigger implements INodeType {
const kafka = new apacheKafka(config);
const consumer = kafka.consumer({ groupId });
const consumer = kafka.consumer({
groupId,
maxInFlightRequests: this.getNodeParameter('options.maxInFlightRequests', 0) as number,
sessionTimeout: this.getNodeParameter('options.sessionTimeout', 30000) as number,
heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number,
});
await consumer.connect();
@ -191,6 +227,8 @@ export class KafkaTrigger implements INodeType {
const startConsumer = async () => {
await consumer.run({
autoCommitInterval: options.autoCommitInterval as number || null,
autoCommitThreshold: options.autoCommitThreshold as number || null,
eachMessage: async ({ topic, message }) => {
let data: IDataObject = {};