From 71cae90679d9d6fe4cf0dc898827cc9cd4457873 Mon Sep 17 00:00:00 2001 From: ruanjiefeng Date: Mon, 5 Sep 2022 19:11:25 +0800 Subject: [PATCH] fix(kafkaTrigger Node): fix kafka trigger not working with default max requests value --- .../credentials/Kafka.credentials.ts | 1 + packages/nodes-base/nodes/Kafka/Kafka.node.ts | 55 +++++++++++++++++++ .../nodes/Kafka/KafkaTrigger.node.ts | 10 +++- 3 files changed, 64 insertions(+), 2 deletions(-) diff --git a/packages/nodes-base/credentials/Kafka.credentials.ts b/packages/nodes-base/credentials/Kafka.credentials.ts index 8d1b817d21..a27f008589 100644 --- a/packages/nodes-base/credentials/Kafka.credentials.ts +++ b/packages/nodes-base/credentials/Kafka.credentials.ts @@ -11,6 +11,7 @@ export class Kafka implements ICredentialType { type: 'string', default: '', placeholder: 'my-app', + hint: 'Will not affect the connection, but will be used to identify the client in the Kafka server logs. Read more here', }, { displayName: 'Brokers', diff --git a/packages/nodes-base/nodes/Kafka/Kafka.node.ts b/packages/nodes-base/nodes/Kafka/Kafka.node.ts index 96080587d7..15f3327367 100644 --- a/packages/nodes-base/nodes/Kafka/Kafka.node.ts +++ b/packages/nodes-base/nodes/Kafka/Kafka.node.ts @@ -11,7 +11,11 @@ import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; import { IExecuteFunctions } from 'n8n-core'; import { + ICredentialDataDecryptedObject, + ICredentialsDecrypted, + ICredentialTestFunctions, IDataObject, + INodeCredentialTestResult, INodeExecutionData, INodeType, INodeTypeDescription, @@ -35,6 +39,7 @@ export class Kafka implements INodeType { { name: 'kafka', required: true, + testedBy: 'kafkaConnectionTest', }, ], properties: [ @@ -185,6 +190,56 @@ export class Kafka implements INodeType { ], }; + methods = { + credentialTest: { + async kafkaConnectionTest( + this: ICredentialTestFunctions, + credential: ICredentialsDecrypted, + ): Promise { + const credentials = credential.data as ICredentialDataDecryptedObject; + try { + const brokers = ((credentials.brokers as string) || '') + .split(',') + .map((item) => item.trim()) as string[]; + + const clientId = credentials.clientId as string; + + const ssl = credentials.ssl as boolean; + + const config: KafkaConfig = { + clientId, + brokers, + ssl, + }; + if (credentials.authentication === true) { + if (!(credentials.username && credentials.password)) { + throw Error('Username and password are required for authentication'); + } + config.sasl = { + username: credentials.username as string, + password: credentials.password as string, + mechanism: credentials.saslMechanism as string, + } as SASLOptions; + } + + const kafka = new apacheKafka(config); + + await kafka.admin().connect(); + await kafka.admin().disconnect(); + return { + status: 'OK', + message: 'Authentication successful', + }; + } catch (error) { + return { + status: 'Error', + message: error.message, + }; + } + }, + }, + }; + async execute(this: IExecuteFunctions): Promise { const items = this.getInputData(); diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 69e68a93f3..dcde8ca63a 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -114,7 +114,7 @@ export class KafkaTrigger implements INodeType { displayName: 'Max Number of Requests', name: 'maxInFlightRequests', type: 'number', - default: 0, + default: 1, description: 'Max number of requests that may be in progress at any time. If falsey then no limit.', }, @@ -202,9 +202,15 @@ export class KafkaTrigger implements INodeType { const kafka = new apacheKafka(config); + const maxInFlightRequests = ( + this.getNodeParameter('options.maxInFlightRequests', null) === 0 + ? null + : this.getNodeParameter('options.maxInFlightRequests', null) + ) as number; + const consumer = kafka.consumer({ groupId, - maxInFlightRequests: this.getNodeParameter('options.maxInFlightRequests', 0) as number, + maxInFlightRequests, sessionTimeout: this.getNodeParameter('options.sessionTimeout', 30000) as number, heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number, });