feat(Kafka Node): Overhaul Kafka and KafkaTrigger nodes

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-12-03 12:23:02 +01:00
parent 2e61722002
commit e044b78049
No known key found for this signature in database
6 changed files with 165 additions and 153 deletions

View file

@ -0,0 +1,37 @@
import { logLevel, SASLOptions, type KafkaConfig } from 'kafkajs';
import type { KafkaCredential } from './types';
import {
type ICredentialTestFunctions,
NodeOperationError,
type ITriggerFunctions,
} from 'n8n-workflow';
export const getConnectionConfig = (
context: ITriggerFunctions | ICredentialTestFunctions,
credentials: KafkaCredential,
): KafkaConfig => {
const brokers = ((credentials.brokers as string) || '').split(',').map((item) => item.trim());
const config: KafkaConfig = {
brokers,
clientId: credentials.clientId,
ssl: credentials.ssl,
logLevel: logLevel.ERROR,
};
if (credentials.authentication) {
if (!(credentials.username && credentials.password)) {
throw new NodeOperationError(
context.getNode(),
'Username and password are required for authentication',
);
}
config.sasl = {
username: credentials.username,
password: credentials.password,
mechanism: credentials.saslMechanism,
} as SASLOptions;
}
return config;
};

View file

@ -1,11 +1,8 @@
import type { KafkaConfig, SASLOptions, TopicMessages } from 'kafkajs'; import type { KafkaConfig, SASLOptions, TopicMessages } from 'kafkajs';
import { CompressionTypes, Kafka as apacheKafka } from 'kafkajs'; import { CompressionTypes, Kafka as apacheKafka } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import type { import type {
IExecuteFunctions, IExecuteFunctions,
ICredentialDataDecryptedObject,
ICredentialsDecrypted, ICredentialsDecrypted,
ICredentialTestFunctions, ICredentialTestFunctions,
IDataObject, IDataObject,
@ -14,8 +11,11 @@ import type {
INodeType, INodeType,
INodeTypeDescription, INodeTypeDescription,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { ApplicationError, NodeConnectionType, NodeOperationError } from 'n8n-workflow'; import { NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import { generatePairedItemData } from '../../utils/utilities'; import { generatePairedItemData } from '../../utils/utilities';
import { KafkaCredential } from './types';
import { getConnectionConfig } from './GenericFunctions';
export class Kafka implements INodeType { export class Kafka implements INodeType {
description: INodeTypeDescription = { description: INodeTypeDescription = {
@ -212,34 +212,9 @@ export class Kafka implements INodeType {
this: ICredentialTestFunctions, this: ICredentialTestFunctions,
credential: ICredentialsDecrypted, credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> { ): Promise<INodeCredentialTestResult> {
const credentials = credential.data as ICredentialDataDecryptedObject; const credentials = credential.data as KafkaCredential;
try { try {
const brokers = ((credentials.brokers as string) || '') const config = getConnectionConfig(this, credentials);
.split(',')
.map((item) => item.trim());
const clientId = credentials.clientId as string;
const ssl = credentials.ssl as boolean;
const config: KafkaConfig = {
clientId,
brokers,
ssl,
};
if (credentials.authentication === true) {
if (!(credentials.username && credentials.password)) {
throw new ApplicationError('Username and password are required for authentication', {
level: 'warning',
});
}
config.sasl = {
username: credentials.username as string,
password: credentials.password as string,
mechanism: credentials.saslMechanism as string,
} as SASLOptions;
}
const kafka = new apacheKafka(config); const kafka = new apacheKafka(config);
await kafka.admin().connect(); await kafka.admin().connect();

View file

@ -1,19 +1,20 @@
import type { KafkaConfig, SASLOptions } from 'kafkajs'; import type { KafkaMessage } from 'kafkajs';
import { Kafka as apacheKafka, logLevel } from 'kafkajs'; import { Kafka as apacheKafka } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import type { import type {
ITriggerFunctions, ITriggerFunctions,
IDataObject, IDataObject,
INodeType,
INodeTypeDescription, INodeTypeDescription,
ITriggerResponse, ITriggerResponse,
IRun, IRun,
INodeExecutionData,
} from 'n8n-workflow'; } from 'n8n-workflow';
import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; import { Node, NodeConnectionType } from 'n8n-workflow';
export class KafkaTrigger implements INodeType { import type { KafkaCredential, TriggerNodeOptions } from './types';
import { getConnectionConfig } from './GenericFunctions';
export class KafkaTrigger extends Node {
description: INodeTypeDescription = { description: INodeTypeDescription = {
displayName: 'Kafka Trigger', displayName: 'Kafka Trigger',
name: 'kafkaTrigger', name: 'kafkaTrigger',
@ -178,139 +179,99 @@ export class KafkaTrigger implements INodeType {
], ],
}; };
async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> { async parsePayload(
const topic = this.getNodeParameter('topic') as string; message: KafkaMessage,
messageTopic: string,
options: TriggerNodeOptions,
context: ITriggerFunctions,
): Promise<INodeExecutionData[][]> {
const data: IDataObject = {};
let value = message.value?.toString() as string;
const groupId = this.getNodeParameter('groupId') as string; if (options.jsonParseMessage) {
try {
const credentials = await this.getCredentials('kafka'); value = JSON.parse(value);
} catch (error) {}
const brokers = ((credentials.brokers as string) || '').split(',').map((item) => item.trim());
const clientId = credentials.clientId as string;
const ssl = credentials.ssl as boolean;
const options = this.getNodeParameter('options', {}) as IDataObject;
options.nodeVersion = this.getNode().typeVersion;
const config: KafkaConfig = {
clientId,
brokers,
ssl,
logLevel: logLevel.ERROR,
};
if (credentials.authentication === true) {
if (!(credentials.username && credentials.password)) {
throw new NodeOperationError(
this.getNode(),
'Username and password are required for authentication',
);
}
config.sasl = {
username: credentials.username as string,
password: credentials.password as string,
mechanism: credentials.saslMechanism as string,
} as SASLOptions;
} }
const kafka = new apacheKafka(config); const useSchemaRegistry = context.getNodeParameter('useSchemaRegistry', 0) as boolean;
if (useSchemaRegistry) {
const schemaRegistryUrl = context.getNodeParameter('schemaRegistryUrl', 0) as string;
try {
const registry = new SchemaRegistry({ host: schemaRegistryUrl });
value = await registry.decode(message.value as Buffer);
} catch (error) {}
}
const maxInFlightRequests = ( if (options.onlyMessage) {
this.getNodeParameter('options.maxInFlightRequests', null) === 0 return [context.helpers.returnJsonArray([value as unknown as IDataObject])];
? null }
: this.getNodeParameter('options.maxInFlightRequests', null)
) as number; if (options.returnHeaders && message.headers) {
const headers: { [key: string]: string } = {};
for (const key of Object.keys(message.headers)) {
const header = message.headers[key];
headers[key] = header?.toString('utf8') || '';
}
data.headers = headers;
}
data.message = value;
data.topic = messageTopic;
return [context.helpers.returnJsonArray([data])];
}
async trigger(context: ITriggerFunctions): Promise<ITriggerResponse> {
const topic = context.getNodeParameter('topic') as string;
const groupId = context.getNodeParameter('groupId') as string;
const options = context.getNodeParameter('options', {}) as TriggerNodeOptions;
const nodeVersion = context.getNode().typeVersion;
const credentials = await context.getCredentials<KafkaCredential>('kafka');
const config = getConnectionConfig(context, credentials);
const kafka = new apacheKafka(config);
const consumer = kafka.consumer({ const consumer = kafka.consumer({
groupId, groupId,
maxInFlightRequests, maxInFlightRequests: options.maxInFlightRequests,
sessionTimeout: this.getNodeParameter('options.sessionTimeout', 30000) as number, sessionTimeout: options.sessionTimeout ?? 30000,
heartbeatInterval: this.getNodeParameter('options.heartbeatInterval', 3000) as number, heartbeatInterval: options.heartbeatInterval ?? 3000,
}); });
const parallelProcessing = options.parallelProcessing as boolean;
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: options.fromBeginning ? true : false });
const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean;
const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string;
const startConsumer = async () => { const startConsumer = async () => {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: options.fromBeginning ? true : false });
await consumer.run({ await consumer.run({
autoCommitInterval: (options.autoCommitInterval as number) || null, autoCommitInterval: options.autoCommitInterval || null,
autoCommitThreshold: (options.autoCommitThreshold as number) || null, autoCommitThreshold: options.autoCommitThreshold || null,
eachMessage: async ({ topic: messageTopic, message }) => { eachMessage: async ({ topic: messageTopic, message }) => {
let data: IDataObject = {}; const data = await this.parsePayload(message, messageTopic, options, context);
let value = message.value?.toString() as string; const donePromise =
!options.parallelProcessing && nodeVersion > 1 && context.getMode() === 'trigger'
if (options.jsonParseMessage) { ? context.helpers.createDeferredPromise<IRun>()
try { : undefined;
value = JSON.parse(value); context.emit(data, undefined, donePromise);
} catch (error) {} await donePromise?.promise;
}
if (useSchemaRegistry) {
try {
const registry = new SchemaRegistry({ host: schemaRegistryUrl });
value = await registry.decode(message.value as Buffer);
} catch (error) {}
}
if (options.returnHeaders && message.headers) {
const headers: { [key: string]: string } = {};
for (const key of Object.keys(message.headers)) {
const header = message.headers[key];
headers[key] = header?.toString('utf8') || '';
}
data.headers = headers;
}
data.message = value;
data.topic = messageTopic;
if (options.onlyMessage) {
//@ts-ignore
data = value;
}
let responsePromise = undefined;
if (!parallelProcessing && (options.nodeVersion as number) > 1) {
responsePromise = this.helpers.createDeferredPromise<IRun>();
this.emit([this.helpers.returnJsonArray([data])], undefined, responsePromise);
} else {
this.emit([this.helpers.returnJsonArray([data])]);
}
if (responsePromise) {
await responsePromise.promise;
}
}, },
}); });
}; };
await startConsumer();
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
await consumer.disconnect();
}
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the
// workflow manually. So the function has to make sure that
// the emit() gets called with similar data like when it
// would trigger by itself so that the user knows what data
// to expect.
async function manualTriggerFunction() { async function manualTriggerFunction() {
await startConsumer(); await startConsumer();
} }
if (context.getMode() === 'trigger') {
await startConsumer();
}
async function closeFunction() {
await consumer.disconnect();
}
return { return {
closeFunction, closeFunction,
manualTriggerFunction, manualTriggerFunction,

View file

@ -0,0 +1,32 @@
import type { SASLMechanism } from 'kafkajs';
export type KafkaCredential = {
clientId: string;
brokers: string;
ssl: boolean;
authentication: boolean;
} & (
| {
authentication: true;
username: string;
password: string;
saslMechanism: SASLMechanism;
}
| {
authentication: false;
}
);
export interface TriggerNodeOptions {
allowAutoTopicCreation: boolean;
autoCommitThreshold: number;
autoCommitInterval: number;
heartbeatInterval: number;
maxInFlightRequests: number;
fromBeginning: boolean;
jsonParseMessage: boolean;
parallelProcessing: boolean;
onlyMessage: boolean;
returnHeaders: boolean;
sessionTimeout: number;
}

View file

@ -1661,6 +1661,7 @@ export abstract class Node {
execute?(context: IExecuteFunctions): Promise<INodeExecutionData[][]>; execute?(context: IExecuteFunctions): Promise<INodeExecutionData[][]>;
webhook?(context: IWebhookFunctions): Promise<IWebhookResponseData>; webhook?(context: IWebhookFunctions): Promise<IWebhookResponseData>;
poll?(context: IPollFunctions): Promise<INodeExecutionData[][] | null>; poll?(context: IPollFunctions): Promise<INodeExecutionData[][] | null>;
trigger?(context: ITriggerFunctions): Promise<ITriggerResponse | undefined>;
} }
export interface IVersionedNodeType { export interface IVersionedNodeType {

View file

@ -1148,7 +1148,11 @@ export class Workflow {
if (mode === 'manual') { if (mode === 'manual') {
// In manual mode we do not just start the trigger function we also // In manual mode we do not just start the trigger function we also
// want to be able to get informed as soon as the first data got emitted // want to be able to get informed as soon as the first data got emitted
const triggerResponse = await nodeType.trigger.call(triggerFunctions);
const triggerResponse =
nodeType instanceof Node
? await nodeType.trigger(triggerFunctions)
: await nodeType.trigger.call(triggerFunctions);
// Add the manual trigger response which resolves when the first time data got emitted // Add the manual trigger response which resolves when the first time data got emitted
triggerResponse!.manualTriggerResponse = new Promise((resolve, reject) => { triggerResponse!.manualTriggerResponse = new Promise((resolve, reject) => {
@ -1197,7 +1201,9 @@ export class Workflow {
return triggerResponse; return triggerResponse;
} }
// In all other modes simply start the trigger // In all other modes simply start the trigger
return await nodeType.trigger.call(triggerFunctions); return nodeType instanceof Node
? await nodeType.trigger(triggerFunctions)
: await nodeType.trigger.call(triggerFunctions);
} }
/** /**