diff --git a/packages/nodes-base/nodes/Kafka/GenericFunctions.ts b/packages/nodes-base/nodes/Kafka/GenericFunctions.ts new file mode 100644 index 0000000000..2e4b36a9ef --- /dev/null +++ b/packages/nodes-base/nodes/Kafka/GenericFunctions.ts @@ -0,0 +1,37 @@ +import { logLevel, SASLOptions, type KafkaConfig } from 'kafkajs'; +import type { KafkaCredential } from './types'; +import { + type ICredentialTestFunctions, + NodeOperationError, + type ITriggerFunctions, +} from 'n8n-workflow'; + +export const getConnectionConfig = ( + context: ITriggerFunctions | ICredentialTestFunctions, + credentials: KafkaCredential, +): KafkaConfig => { + const brokers = ((credentials.brokers as string) || '').split(',').map((item) => item.trim()); + + const config: KafkaConfig = { + brokers, + clientId: credentials.clientId, + ssl: credentials.ssl, + logLevel: logLevel.ERROR, + }; + + if (credentials.authentication) { + if (!(credentials.username && credentials.password)) { + throw new NodeOperationError( + context.getNode(), + 'Username and password are required for authentication', + ); + } + config.sasl = { + username: credentials.username, + password: credentials.password, + mechanism: credentials.saslMechanism, + } as SASLOptions; + } + + return config; +}; diff --git a/packages/nodes-base/nodes/Kafka/Kafka.node.ts b/packages/nodes-base/nodes/Kafka/Kafka.node.ts index 87e9f95b0d..01f22eadda 100644 --- a/packages/nodes-base/nodes/Kafka/Kafka.node.ts +++ b/packages/nodes-base/nodes/Kafka/Kafka.node.ts @@ -1,11 +1,8 @@ import type { KafkaConfig, SASLOptions, TopicMessages } from 'kafkajs'; import { CompressionTypes, Kafka as apacheKafka } from 'kafkajs'; - import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; - import type { IExecuteFunctions, - ICredentialDataDecryptedObject, ICredentialsDecrypted, ICredentialTestFunctions, IDataObject, @@ -14,8 +11,11 @@ import type { INodeType, INodeTypeDescription, } from 'n8n-workflow'; -import { ApplicationError, NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; + import { generatePairedItemData } from '../../utils/utilities'; +import { KafkaCredential } from './types'; +import { getConnectionConfig } from './GenericFunctions'; export class Kafka implements INodeType { description: INodeTypeDescription = { @@ -212,34 +212,9 @@ export class Kafka implements INodeType { this: ICredentialTestFunctions, credential: ICredentialsDecrypted, ): Promise { - const credentials = credential.data as ICredentialDataDecryptedObject; + const credentials = credential.data as KafkaCredential; try { - const brokers = ((credentials.brokers as string) || '') - .split(',') - .map((item) => item.trim()); - - const clientId = credentials.clientId as string; - - const ssl = credentials.ssl as boolean; - - const config: KafkaConfig = { - clientId, - brokers, - ssl, - }; - if (credentials.authentication === true) { - if (!(credentials.username && credentials.password)) { - throw new ApplicationError('Username and password are required for authentication', { - level: 'warning', - }); - } - config.sasl = { - username: credentials.username as string, - password: credentials.password as string, - mechanism: credentials.saslMechanism as string, - } as SASLOptions; - } - + const config = getConnectionConfig(this, credentials); const kafka = new apacheKafka(config); await kafka.admin().connect(); diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 631566ffbd..3b49c8d5fc 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -1,19 +1,20 @@ -import type { KafkaConfig, SASLOptions } from 'kafkajs'; -import { Kafka as apacheKafka, logLevel } from 'kafkajs'; - +import type { KafkaMessage } from 'kafkajs'; +import { Kafka as apacheKafka } from 'kafkajs'; import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; - import type { ITriggerFunctions, IDataObject, - INodeType, INodeTypeDescription, ITriggerResponse, IRun, + INodeExecutionData, } from 'n8n-workflow'; -import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import { Node, NodeConnectionType } from 'n8n-workflow'; -export class KafkaTrigger implements INodeType { +import type { KafkaCredential, TriggerNodeOptions } from './types'; +import { getConnectionConfig } from './GenericFunctions'; + +export class KafkaTrigger extends Node { description: INodeTypeDescription = { displayName: 'Kafka Trigger', name: 'kafkaTrigger', @@ -178,139 +179,99 @@ export class KafkaTrigger implements INodeType { ], }; - async trigger(this: ITriggerFunctions): Promise { - const topic = this.getNodeParameter('topic') as string; + async parsePayload( + message: KafkaMessage, + messageTopic: string, + options: TriggerNodeOptions, + context: ITriggerFunctions, + ): Promise { + const data: IDataObject = {}; + let value = message.value?.toString() as string; - const groupId = this.getNodeParameter('groupId') as string; - - const credentials = await this.getCredentials('kafka'); - - const brokers = ((credentials.brokers as string) || '').split(',').map((item) => item.trim()); - - const clientId = credentials.clientId as string; - - const ssl = credentials.ssl as boolean; - - const options = this.getNodeParameter('options', {}) as IDataObject; - - options.nodeVersion = this.getNode().typeVersion; - - const config: KafkaConfig = { - clientId, - brokers, - ssl, - logLevel: logLevel.ERROR, - }; - - if (credentials.authentication === true) { - if (!(credentials.username && credentials.password)) { - throw new NodeOperationError( - this.getNode(), - 'Username and password are required for authentication', - ); - } - config.sasl = { - username: credentials.username as string, - password: credentials.password as string, - mechanism: credentials.saslMechanism as string, - } as SASLOptions; + if (options.jsonParseMessage) { + try { + value = JSON.parse(value); + } catch (error) {} } - const kafka = new apacheKafka(config); + const useSchemaRegistry = context.getNodeParameter('useSchemaRegistry', 0) as boolean; + if (useSchemaRegistry) { + const schemaRegistryUrl = context.getNodeParameter('schemaRegistryUrl', 0) as string; + try { + const registry = new SchemaRegistry({ host: schemaRegistryUrl }); + value = await registry.decode(message.value as Buffer); + } catch (error) {} + } - const maxInFlightRequests = ( - this.getNodeParameter('options.maxInFlightRequests', null) === 0 - ? null - : this.getNodeParameter('options.maxInFlightRequests', null) - ) as number; + if (options.onlyMessage) { + return [context.helpers.returnJsonArray([value as unknown as IDataObject])]; + } + + if (options.returnHeaders && message.headers) { + const headers: { [key: string]: string } = {}; + for (const key of Object.keys(message.headers)) { + const header = message.headers[key]; + headers[key] = header?.toString('utf8') || ''; + } + + data.headers = headers; + } + + data.message = value; + data.topic = messageTopic; + + return [context.helpers.returnJsonArray([data])]; + } + + async trigger(context: ITriggerFunctions): Promise { + const topic = context.getNodeParameter('topic') as string; + const groupId = context.getNodeParameter('groupId') as string; + + const options = context.getNodeParameter('options', {}) as TriggerNodeOptions; + const nodeVersion = context.getNode().typeVersion; + + const credentials = await context.getCredentials('kafka'); + const config = getConnectionConfig(context, credentials); + const kafka = new apacheKafka(config); const consumer = kafka.consumer({ groupId, - maxInFlightRequests, - sessionTimeout: this.getNodeParameter('options.sessionTimeout', 30000) as number, - heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number, + maxInFlightRequests: options.maxInFlightRequests, + sessionTimeout: options.sessionTimeout ?? 30000, + heartbeatInterval: options.heartbeatInterval ?? 3000, }); - const parallelProcessing = options.parallelProcessing as boolean; - - await consumer.connect(); - - await consumer.subscribe({ topic, fromBeginning: options.fromBeginning ? true : false }); - - const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean; - - const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string; - const startConsumer = async () => { + await consumer.connect(); + await consumer.subscribe({ topic, fromBeginning: options.fromBeginning ? true : false }); + await consumer.run({ - autoCommitInterval: (options.autoCommitInterval as number) || null, - autoCommitThreshold: (options.autoCommitThreshold as number) || null, + autoCommitInterval: options.autoCommitInterval || null, + autoCommitThreshold: options.autoCommitThreshold || null, eachMessage: async ({ topic: messageTopic, message }) => { - let data: IDataObject = {}; - let value = message.value?.toString() as string; - - if (options.jsonParseMessage) { - try { - value = JSON.parse(value); - } catch (error) {} - } - - if (useSchemaRegistry) { - try { - const registry = new SchemaRegistry({ host: schemaRegistryUrl }); - value = await registry.decode(message.value as Buffer); - } catch (error) {} - } - - if (options.returnHeaders && message.headers) { - const headers: { [key: string]: string } = {}; - for (const key of Object.keys(message.headers)) { - const header = message.headers[key]; - headers[key] = header?.toString('utf8') || ''; - } - - data.headers = headers; - } - - data.message = value; - data.topic = messageTopic; - - if (options.onlyMessage) { - //@ts-ignore - data = value; - } - let responsePromise = undefined; - if (!parallelProcessing && (options.nodeVersion as number) > 1) { - responsePromise = this.helpers.createDeferredPromise(); - this.emit([this.helpers.returnJsonArray([data])], undefined, responsePromise); - } else { - this.emit([this.helpers.returnJsonArray([data])]); - } - if (responsePromise) { - await responsePromise.promise; - } + const data = await this.parsePayload(message, messageTopic, options, context); + const donePromise = + !options.parallelProcessing && nodeVersion > 1 && context.getMode() === 'trigger' + ? context.helpers.createDeferredPromise() + : undefined; + context.emit(data, undefined, donePromise); + await donePromise?.promise; }, }); }; - await startConsumer(); - - // The "closeFunction" function gets called by n8n whenever - // the workflow gets deactivated and can so clean up. - async function closeFunction() { - await consumer.disconnect(); - } - - // The "manualTriggerFunction" function gets called by n8n - // when a user is in the workflow editor and starts the - // workflow manually. So the function has to make sure that - // the emit() gets called with similar data like when it - // would trigger by itself so that the user knows what data - // to expect. async function manualTriggerFunction() { await startConsumer(); } + if (context.getMode() === 'trigger') { + await startConsumer(); + } + + async function closeFunction() { + await consumer.disconnect(); + } + return { closeFunction, manualTriggerFunction, diff --git a/packages/nodes-base/nodes/Kafka/types.ts b/packages/nodes-base/nodes/Kafka/types.ts new file mode 100644 index 0000000000..911554ab3c --- /dev/null +++ b/packages/nodes-base/nodes/Kafka/types.ts @@ -0,0 +1,32 @@ +import type { SASLMechanism } from 'kafkajs'; + +export type KafkaCredential = { + clientId: string; + brokers: string; + ssl: boolean; + authentication: boolean; +} & ( + | { + authentication: true; + username: string; + password: string; + saslMechanism: SASLMechanism; + } + | { + authentication: false; + } +); + +export interface TriggerNodeOptions { + allowAutoTopicCreation: boolean; + autoCommitThreshold: number; + autoCommitInterval: number; + heartbeatInterval: number; + maxInFlightRequests: number; + fromBeginning: boolean; + jsonParseMessage: boolean; + parallelProcessing: boolean; + onlyMessage: boolean; + returnHeaders: boolean; + sessionTimeout: number; +} diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 81dbe231b0..d29f70bd84 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -1661,6 +1661,7 @@ export abstract class Node { execute?(context: IExecuteFunctions): Promise; webhook?(context: IWebhookFunctions): Promise; poll?(context: IPollFunctions): Promise; + trigger?(context: ITriggerFunctions): Promise; } export interface IVersionedNodeType { diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index b2eb597e2e..18a78aedfe 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -1148,7 +1148,11 @@ export class Workflow { if (mode === 'manual') { // In manual mode we do not just start the trigger function we also // want to be able to get informed as soon as the first data got emitted - const triggerResponse = await nodeType.trigger.call(triggerFunctions); + + const triggerResponse = + nodeType instanceof Node + ? await nodeType.trigger(triggerFunctions) + : await nodeType.trigger.call(triggerFunctions); // Add the manual trigger response which resolves when the first time data got emitted triggerResponse!.manualTriggerResponse = new Promise((resolve, reject) => { @@ -1197,7 +1201,9 @@ export class Workflow { return triggerResponse; } // In all other modes simply start the trigger - return await nodeType.trigger.call(triggerFunctions); + return nodeType instanceof Node + ? await nodeType.trigger(triggerFunctions) + : await nodeType.trigger.call(triggerFunctions); } /**