From 4af53d1b951c81e3690d8619c305c73ed44e108d Mon Sep 17 00:00:00 2001 From: Ricardo Georgel Date: Sat, 5 Jun 2021 16:46:06 -0300 Subject: [PATCH] Add Schema Registry to the Kafka producer --- packages/nodes-base/nodes/Kafka/Kafka.node.ts | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/packages/nodes-base/nodes/Kafka/Kafka.node.ts b/packages/nodes-base/nodes/Kafka/Kafka.node.ts index 6270da679a..1763d1a85a 100644 --- a/packages/nodes-base/nodes/Kafka/Kafka.node.ts +++ b/packages/nodes-base/nodes/Kafka/Kafka.node.ts @@ -6,6 +6,8 @@ import { TopicMessages, } from 'kafkajs'; +import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; + import { IExecuteFunctions, } from 'n8n-core'; @@ -74,6 +76,43 @@ export class Kafka implements INodeType { type: 'boolean', default: false, }, + { + displayName: 'Use Schema Registry', + name: 'useSchemaRegistry', + type: 'boolean', + default: false, + description: 'Use Apache Avro serialization format and Confluent\' wire formats.', + }, + { + displayName: 'Schema Registry URL', + name: 'schemaRegistryUrl', + type: 'string', + required: true, + displayOptions: { + show: { + useSchemaRegistry: [ + true, + ], + }, + }, + default: '', + description: 'URL of the schema registry.', + }, + { + displayName: 'Event Name', + name: 'eventName', + type: 'string', + required: true, + displayOptions: { + show: { + useSchemaRegistry: [ + true, + ], + }, + }, + default: '', + description: 'Namespace and Name of Schema in Schema Registry (namespace.name).', + }, { displayName: 'Headers', name: 'headersUi', @@ -170,6 +209,8 @@ export class Kafka implements INodeType { const options = this.getNodeParameter('options', 0) as IDataObject; const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean; + const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean; + const timeout = options.timeout as number; let compression = CompressionTypes.None; @@ -211,7 +252,7 @@ export class Kafka implements INodeType { await producer.connect(); - let message: string; + let message: string | Buffer; for (let i = 0; i < length; i++) { if (sendInputData === true) { @@ -220,6 +261,18 @@ export class Kafka implements INodeType { message = this.getNodeParameter('message', i) as string; } + if (useSchemaRegistry) { + try { + const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string; + const eventName = this.getNodeParameter('eventName', 0) as string; + + const registry = new SchemaRegistry({ host: schemaRegistryUrl }); + const id = await registry.getLatestSchemaId(eventName); + + message = await registry.encode(id, JSON.parse(message)); + } catch (error) {} + } + const topic = this.getNodeParameter('topic', i) as string; const jsonParameters = this.getNodeParameter('jsonParameters', i) as boolean;