diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 32f3b7a9c9..ffc48ceea1 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -5,6 +5,8 @@ import { SASLOptions, } from 'kafkajs'; +import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; + import { ITriggerFunctions, } from 'n8n-core'; @@ -98,6 +100,27 @@ export class KafkaTrigger implements INodeType { default: 30000, description: 'The time to await a response in ms.', }, + { + displayName: 'Use Schema Registry', + name: 'useSchemaRegistry', + type: 'boolean', + default: false, + description: 'Use Apache Avro serialization format and Confluent\' wire formats.', + }, + { + displayName: 'Schema Registry URL', + name: 'schemaRegistryUrl', + type: 'string', + displayOptions: { + show: { + useSchemaRegistry: [ + true, + ], + }, + }, + default: '', + description: 'URL of the schema registry.', + }, ], }, ], @@ -147,6 +170,11 @@ export class KafkaTrigger implements INodeType { const options = this.getNodeParameter('options', {}) as IDataObject; + let registry: SchemaRegistry; + if (options.useSchemaRegistry) { + registry = new SchemaRegistry({ host: options.schemaRegistryUrl as string }); + } + const startConsumer = async () => { await consumer.run({ eachMessage: async ({ topic, message }) => { @@ -160,6 +188,12 @@ export class KafkaTrigger implements INodeType { } catch (error) { } } + if (options.useSchemaRegistry) { + try { + value = await registry.decode(message.value as Buffer); + } catch (error) { } + } + data.message = value; data.topic = topic; diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index a52a1781e6..5780a74da4 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -607,6 +607,7 @@ "typescript": "~3.9.7" }, "dependencies": { + "@kafkajs/confluent-schema-registry": "1.0.6", "@types/lossless-json": "^1.0.0", "@types/promise-ftp": "^1.3.4", "@types/snowflake-sdk": "^1.5.1", diff --git a/packages/nodes-base/tsconfig.json b/packages/nodes-base/tsconfig.json index cbcbf98937..6edb1080a4 100644 --- a/packages/nodes-base/tsconfig.json +++ b/packages/nodes-base/tsconfig.json @@ -1,6 +1,7 @@ { "compilerOptions": { "lib": [ + "dom", "es2017", "es2019.array" ],