import { logLevel, SASLOptions, type KafkaConfig } from 'kafkajs'; import type { KafkaCredential } from './types'; import { type ICredentialTestFunctions, NodeOperationError, type ITriggerFunctions, } from 'n8n-workflow'; export const getConnectionConfig = ( context: ITriggerFunctions | ICredentialTestFunctions, credentials: KafkaCredential, ): KafkaConfig => { const brokers = ((credentials.brokers as string) || '').split(',').map((item) => item.trim()); const config: KafkaConfig = { brokers, clientId: credentials.clientId, ssl: credentials.ssl, logLevel: logLevel.ERROR, }; if (credentials.authentication) { if (!(credentials.username && credentials.password)) { throw new NodeOperationError( context.getNode(), 'Username and password are required for authentication', ); } config.sasl = { username: credentials.username, password: credentials.password, mechanism: credentials.saslMechanism, } as SASLOptions; } return config; };