From 894ed83efcc02cf179862830261e0a458f254561 Mon Sep 17 00:00:00 2001 From: Ricardo Georgel Date: Sat, 5 Jun 2021 14:33:12 -0300 Subject: [PATCH 1/7] Add Schema Registry to the Kafka consumer --- .../nodes/Kafka/KafkaTrigger.node.ts | 34 +++++++++++++++++++ packages/nodes-base/package.json | 1 + packages/nodes-base/tsconfig.json | 1 + 3 files changed, 36 insertions(+) diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 32f3b7a9c9..ffc48ceea1 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -5,6 +5,8 @@ import { SASLOptions, } from 'kafkajs'; +import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; + import { ITriggerFunctions, } from 'n8n-core'; @@ -98,6 +100,27 @@ export class KafkaTrigger implements INodeType { default: 30000, description: 'The time to await a response in ms.', }, + { + displayName: 'Use Schema Registry', + name: 'useSchemaRegistry', + type: 'boolean', + default: false, + description: 'Use Apache Avro serialization format and Confluent\' wire formats.', + }, + { + displayName: 'Schema Registry URL', + name: 'schemaRegistryUrl', + type: 'string', + displayOptions: { + show: { + useSchemaRegistry: [ + true, + ], + }, + }, + default: '', + description: 'URL of the schema registry.', + }, ], }, ], @@ -147,6 +170,11 @@ export class KafkaTrigger implements INodeType { const options = this.getNodeParameter('options', {}) as IDataObject; + let registry: SchemaRegistry; + if (options.useSchemaRegistry) { + registry = new SchemaRegistry({ host: options.schemaRegistryUrl as string }); + } + const startConsumer = async () => { await consumer.run({ eachMessage: async ({ topic, message }) => { @@ -160,6 +188,12 @@ export class KafkaTrigger implements INodeType { } catch (error) { } } + if (options.useSchemaRegistry) { + try { + value = await registry.decode(message.value as Buffer); + } catch (error) { } + } + data.message = value; data.topic = topic; diff --git a/packages/nodes-base/package.json b/packages/nodes-base/package.json index a52a1781e6..5780a74da4 100644 --- a/packages/nodes-base/package.json +++ b/packages/nodes-base/package.json @@ -607,6 +607,7 @@ "typescript": "~3.9.7" }, "dependencies": { + "@kafkajs/confluent-schema-registry": "1.0.6", "@types/lossless-json": "^1.0.0", "@types/promise-ftp": "^1.3.4", "@types/snowflake-sdk": "^1.5.1", diff --git a/packages/nodes-base/tsconfig.json b/packages/nodes-base/tsconfig.json index cbcbf98937..6edb1080a4 100644 --- a/packages/nodes-base/tsconfig.json +++ b/packages/nodes-base/tsconfig.json @@ -1,6 +1,7 @@ { "compilerOptions": { "lib": [ + "dom", "es2017", "es2019.array" ], From 4af53d1b951c81e3690d8619c305c73ed44e108d Mon Sep 17 00:00:00 2001 From: Ricardo Georgel Date: Sat, 5 Jun 2021 16:46:06 -0300 Subject: [PATCH 2/7] Add Schema Registry to the Kafka producer --- packages/nodes-base/nodes/Kafka/Kafka.node.ts | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/packages/nodes-base/nodes/Kafka/Kafka.node.ts b/packages/nodes-base/nodes/Kafka/Kafka.node.ts index 6270da679a..1763d1a85a 100644 --- a/packages/nodes-base/nodes/Kafka/Kafka.node.ts +++ b/packages/nodes-base/nodes/Kafka/Kafka.node.ts @@ -6,6 +6,8 @@ import { TopicMessages, } from 'kafkajs'; +import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; + import { IExecuteFunctions, } from 'n8n-core'; @@ -74,6 +76,43 @@ export class Kafka implements INodeType { type: 'boolean', default: false, }, + { + displayName: 'Use Schema Registry', + name: 'useSchemaRegistry', + type: 'boolean', + default: false, + description: 'Use Apache Avro serialization format and Confluent\' wire formats.', + }, + { + displayName: 'Schema Registry URL', + name: 'schemaRegistryUrl', + type: 'string', + required: true, + displayOptions: { + show: { + useSchemaRegistry: [ + true, + ], + }, + }, + default: '', + description: 'URL of the schema registry.', + }, + { + displayName: 'Event Name', + name: 'eventName', + type: 'string', + required: true, + displayOptions: { + show: { + useSchemaRegistry: [ + true, + ], + }, + }, + default: '', + description: 'Namespace and Name of Schema in Schema Registry (namespace.name).', + }, { displayName: 'Headers', name: 'headersUi', @@ -170,6 +209,8 @@ export class Kafka implements INodeType { const options = this.getNodeParameter('options', 0) as IDataObject; const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean; + const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean; + const timeout = options.timeout as number; let compression = CompressionTypes.None; @@ -211,7 +252,7 @@ export class Kafka implements INodeType { await producer.connect(); - let message: string; + let message: string | Buffer; for (let i = 0; i < length; i++) { if (sendInputData === true) { @@ -220,6 +261,18 @@ export class Kafka implements INodeType { message = this.getNodeParameter('message', i) as string; } + if (useSchemaRegistry) { + try { + const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string; + const eventName = this.getNodeParameter('eventName', 0) as string; + + const registry = new SchemaRegistry({ host: schemaRegistryUrl }); + const id = await registry.getLatestSchemaId(eventName); + + message = await registry.encode(id, JSON.parse(message)); + } catch (error) {} + } + const topic = this.getNodeParameter('topic', i) as string; const jsonParameters = this.getNodeParameter('jsonParameters', i) as boolean; From e5e09293326698db6613b477103a9c0dddf4e4df Mon Sep 17 00:00:00 2001 From: Ricardo Georgel Date: Sat, 5 Jun 2021 18:48:00 -0300 Subject: [PATCH 3/7] Move useSchemaRegistry out of the options to be easier to see the extra fields --- .../nodes/Kafka/KafkaTrigger.node.ts | 51 ++++++++++--------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index ffc48ceea1..c2ab8db751 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -58,6 +58,28 @@ export class KafkaTrigger implements INodeType { placeholder: 'n8n-kafka', description: 'ID of the consumer group.', }, + { + displayName: 'Use Schema Registry', + name: 'useSchemaRegistry', + type: 'boolean', + default: false, + description: 'Use Apache Avro serialization format and Confluent\' wire formats.', + }, + { + displayName: 'Schema Registry URL', + name: 'schemaRegistryUrl', + type: 'string', + required: true, + displayOptions: { + show: { + useSchemaRegistry: [ + true, + ], + }, + }, + default: '', + description: 'URL of the schema registry.', + }, { displayName: 'Options', name: 'options', @@ -100,27 +122,6 @@ export class KafkaTrigger implements INodeType { default: 30000, description: 'The time to await a response in ms.', }, - { - displayName: 'Use Schema Registry', - name: 'useSchemaRegistry', - type: 'boolean', - default: false, - description: 'Use Apache Avro serialization format and Confluent\' wire formats.', - }, - { - displayName: 'Schema Registry URL', - name: 'schemaRegistryUrl', - type: 'string', - displayOptions: { - show: { - useSchemaRegistry: [ - true, - ], - }, - }, - default: '', - description: 'URL of the schema registry.', - }, ], }, ], @@ -170,9 +171,13 @@ export class KafkaTrigger implements INodeType { const options = this.getNodeParameter('options', {}) as IDataObject; + const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean; + + const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string; + let registry: SchemaRegistry; - if (options.useSchemaRegistry) { - registry = new SchemaRegistry({ host: options.schemaRegistryUrl as string }); + if (useSchemaRegistry) { + registry = new SchemaRegistry({ host: schemaRegistryUrl }); } const startConsumer = async () => { From 87b0e9992323dbd259d52f09199624b54aa06797 Mon Sep 17 00:00:00 2001 From: Ricardo Georgel Date: Sat, 5 Jun 2021 21:22:32 -0300 Subject: [PATCH 4/7] fix typo options.useSchemaRegistry -> useSchemaRegistry --- packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index c2ab8db751..c4587beca3 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -193,7 +193,7 @@ export class KafkaTrigger implements INodeType { } catch (error) { } } - if (options.useSchemaRegistry) { + if (useSchemaRegistry) { try { value = await registry.decode(message.value as Buffer); } catch (error) { } From 23b9e7bbe151e2edd9436619c19d3fd11f6d9b80 Mon Sep 17 00:00:00 2001 From: Ricardo Georgel Date: Sun, 27 Jun 2021 15:18:58 -0300 Subject: [PATCH 5/7] Add option to return the headers of the message --- .../nodes-base/nodes/Kafka/KafkaTrigger.node.ts | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index c4587beca3..10a0034223 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -122,6 +122,13 @@ export class KafkaTrigger implements INodeType { default: 30000, description: 'The time to await a response in ms.', }, + { + displayName: 'Return headers', + name: 'returnHeaders', + type: 'boolean', + default: false, + description: 'Return the headers received from Kafka', + }, ], }, ], @@ -199,6 +206,16 @@ export class KafkaTrigger implements INodeType { } catch (error) { } } + if (options.returnHeaders) { + const headers: {[key: string]: string} = {}; + for (const key in message.headers) { + const header = message.headers[key]; + headers[key] = header?.toString('utf8') || ''; + } + + data.headers = headers; + } + data.message = value; data.topic = topic; From 882d1eca8e5087ddab3ce6ef0432f926fa7ac87e Mon Sep 17 00:00:00 2001 From: Ricardo Georgel Date: Fri, 2 Jul 2021 13:24:28 -0300 Subject: [PATCH 6/7] PR Suggestions --- packages/nodes-base/nodes/Kafka/Kafka.node.ts | 8 +++++--- packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts | 10 +++------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/packages/nodes-base/nodes/Kafka/Kafka.node.ts b/packages/nodes-base/nodes/Kafka/Kafka.node.ts index 1763d1a85a..473103d4ec 100644 --- a/packages/nodes-base/nodes/Kafka/Kafka.node.ts +++ b/packages/nodes-base/nodes/Kafka/Kafka.node.ts @@ -81,7 +81,7 @@ export class Kafka implements INodeType { name: 'useSchemaRegistry', type: 'boolean', default: false, - description: 'Use Apache Avro serialization format and Confluent\' wire formats.', + description: 'Use Confluent Schema Registry.', }, { displayName: 'Schema Registry URL', @@ -95,7 +95,7 @@ export class Kafka implements INodeType { ], }, }, - default: '', + default: 'https://schema-registry-domain:8081', description: 'URL of the schema registry.', }, { @@ -270,7 +270,9 @@ export class Kafka implements INodeType { const id = await registry.getLatestSchemaId(eventName); message = await registry.encode(id, JSON.parse(message)); - } catch (error) {} + } catch (exception) { + throw new NodeOperationError(this.getNode(), 'Verify your Schema Registry configuration'); + } } const topic = this.getNodeParameter('topic', i) as string; diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 10a0034223..a6ffb37c96 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -63,7 +63,7 @@ export class KafkaTrigger implements INodeType { name: 'useSchemaRegistry', type: 'boolean', default: false, - description: 'Use Apache Avro serialization format and Confluent\' wire formats.', + description: 'Use Confluent Schema Registry.', }, { displayName: 'Schema Registry URL', @@ -77,7 +77,7 @@ export class KafkaTrigger implements INodeType { ], }, }, - default: '', + default: 'https://schema-registry-domain:8081', description: 'URL of the schema registry.', }, { @@ -182,11 +182,6 @@ export class KafkaTrigger implements INodeType { const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string; - let registry: SchemaRegistry; - if (useSchemaRegistry) { - registry = new SchemaRegistry({ host: schemaRegistryUrl }); - } - const startConsumer = async () => { await consumer.run({ eachMessage: async ({ topic, message }) => { @@ -202,6 +197,7 @@ export class KafkaTrigger implements INodeType { if (useSchemaRegistry) { try { + const registry = new SchemaRegistry({ host: schemaRegistryUrl }); value = await registry.decode(message.value as Buffer); } catch (error) { } } From 7239e4f355aaa437aa6bb9a640024d57d2670e5d Mon Sep 17 00:00:00 2001 From: Ricardo Georgel Date: Mon, 5 Jul 2021 11:54:37 -0300 Subject: [PATCH 7/7] PR suggestions - add placeholder --- packages/nodes-base/nodes/Kafka/Kafka.node.ts | 3 ++- packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/nodes-base/nodes/Kafka/Kafka.node.ts b/packages/nodes-base/nodes/Kafka/Kafka.node.ts index 473103d4ec..55bd5dc735 100644 --- a/packages/nodes-base/nodes/Kafka/Kafka.node.ts +++ b/packages/nodes-base/nodes/Kafka/Kafka.node.ts @@ -95,7 +95,8 @@ export class Kafka implements INodeType { ], }, }, - default: 'https://schema-registry-domain:8081', + placeholder: 'https://schema-registry-domain:8081', + default: '', description: 'URL of the schema registry.', }, { diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index a6ffb37c96..7640bcd7fc 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -77,7 +77,8 @@ export class KafkaTrigger implements INodeType { ], }, }, - default: 'https://schema-registry-domain:8081', + placeholder: 'https://schema-registry-domain:8081', + default: '', description: 'URL of the schema registry.', }, {