Add Schema Registry to the Kafka producer

This commit is contained in:
Ricardo Georgel 2021-06-05 16:46:06 -03:00
parent 894ed83efc
commit 4af53d1b95

View file

@ -6,6 +6,8 @@ import {
TopicMessages,
} from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import {
IExecuteFunctions,
} from 'n8n-core';
@ -74,6 +76,43 @@ export class Kafka implements INodeType {
type: 'boolean',
default: false,
},
{
displayName: 'Use Schema Registry',
name: 'useSchemaRegistry',
type: 'boolean',
default: false,
description: 'Use Apache Avro serialization format and Confluent\' wire formats.',
},
{
displayName: 'Schema Registry URL',
name: 'schemaRegistryUrl',
type: 'string',
required: true,
displayOptions: {
show: {
useSchemaRegistry: [
true,
],
},
},
default: '',
description: 'URL of the schema registry.',
},
{
displayName: 'Event Name',
name: 'eventName',
type: 'string',
required: true,
displayOptions: {
show: {
useSchemaRegistry: [
true,
],
},
},
default: '',
description: 'Namespace and Name of Schema in Schema Registry (namespace.name).',
},
{
displayName: 'Headers',
name: 'headersUi',
@ -170,6 +209,8 @@ export class Kafka implements INodeType {
const options = this.getNodeParameter('options', 0) as IDataObject;
const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;
const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean;
const timeout = options.timeout as number;
let compression = CompressionTypes.None;
@ -211,7 +252,7 @@ export class Kafka implements INodeType {
await producer.connect();
let message: string;
let message: string | Buffer;
for (let i = 0; i < length; i++) {
if (sendInputData === true) {
@ -220,6 +261,18 @@ export class Kafka implements INodeType {
message = this.getNodeParameter('message', i) as string;
}
if (useSchemaRegistry) {
try {
const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string;
const eventName = this.getNodeParameter('eventName', 0) as string;
const registry = new SchemaRegistry({ host: schemaRegistryUrl });
const id = await registry.getLatestSchemaId(eventName);
message = await registry.encode(id, JSON.parse(message));
} catch (error) {}
}
const topic = this.getNodeParameter('topic', i) as string;
const jsonParameters = this.getNodeParameter('jsonParameters', i) as boolean;