Add Schema Registry to the Kafka consumer

This commit is contained in:
Ricardo Georgel 2021-06-05 14:33:12 -03:00
parent ea9f956f0d
commit 894ed83efc
3 changed files with 36 additions and 0 deletions

View file

@ -5,6 +5,8 @@ import {
SASLOptions,
} from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import {
ITriggerFunctions,
} from 'n8n-core';
@ -98,6 +100,27 @@ export class KafkaTrigger implements INodeType {
default: 30000,
description: 'The time to await a response in ms.',
},
{
displayName: 'Use Schema Registry',
name: 'useSchemaRegistry',
type: 'boolean',
default: false,
description: 'Use Apache Avro serialization format and Confluent\' wire formats.',
},
{
displayName: 'Schema Registry URL',
name: 'schemaRegistryUrl',
type: 'string',
displayOptions: {
show: {
useSchemaRegistry: [
true,
],
},
},
default: '',
description: 'URL of the schema registry.',
},
],
},
],
@ -147,6 +170,11 @@ export class KafkaTrigger implements INodeType {
const options = this.getNodeParameter('options', {}) as IDataObject;
let registry: SchemaRegistry;
if (options.useSchemaRegistry) {
registry = new SchemaRegistry({ host: options.schemaRegistryUrl as string });
}
const startConsumer = async () => {
await consumer.run({
eachMessage: async ({ topic, message }) => {
@ -160,6 +188,12 @@ export class KafkaTrigger implements INodeType {
} catch (error) { }
}
if (options.useSchemaRegistry) {
try {
value = await registry.decode(message.value as Buffer);
} catch (error) { }
}
data.message = value;
data.topic = topic;

View file

@ -607,6 +607,7 @@
"typescript": "~3.9.7"
},
"dependencies": {
"@kafkajs/confluent-schema-registry": "1.0.6",
"@types/lossless-json": "^1.0.0",
"@types/promise-ftp": "^1.3.4",
"@types/snowflake-sdk": "^1.5.1",

View file

@ -1,6 +1,7 @@
{
"compilerOptions": {
"lib": [
"dom",
"es2017",
"es2019.array"
],