From 882d1eca8e5087ddab3ce6ef0432f926fa7ac87e Mon Sep 17 00:00:00 2001 From: Ricardo Georgel Date: Fri, 2 Jul 2021 13:24:28 -0300 Subject: [PATCH] PR Suggestions --- packages/nodes-base/nodes/Kafka/Kafka.node.ts | 8 +++++--- packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts | 10 +++------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/packages/nodes-base/nodes/Kafka/Kafka.node.ts b/packages/nodes-base/nodes/Kafka/Kafka.node.ts index 1763d1a85a..473103d4ec 100644 --- a/packages/nodes-base/nodes/Kafka/Kafka.node.ts +++ b/packages/nodes-base/nodes/Kafka/Kafka.node.ts @@ -81,7 +81,7 @@ export class Kafka implements INodeType { name: 'useSchemaRegistry', type: 'boolean', default: false, - description: 'Use Apache Avro serialization format and Confluent\' wire formats.', + description: 'Use Confluent Schema Registry.', }, { displayName: 'Schema Registry URL', @@ -95,7 +95,7 @@ export class Kafka implements INodeType { ], }, }, - default: '', + default: 'https://schema-registry-domain:8081', description: 'URL of the schema registry.', }, { @@ -270,7 +270,9 @@ export class Kafka implements INodeType { const id = await registry.getLatestSchemaId(eventName); message = await registry.encode(id, JSON.parse(message)); - } catch (error) {} + } catch (exception) { + throw new NodeOperationError(this.getNode(), 'Verify your Schema Registry configuration'); + } } const topic = this.getNodeParameter('topic', i) as string; diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 10a0034223..a6ffb37c96 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -63,7 +63,7 @@ export class KafkaTrigger implements INodeType { name: 'useSchemaRegistry', type: 'boolean', default: false, - description: 'Use Apache Avro serialization format and Confluent\' wire formats.', + description: 'Use Confluent Schema Registry.', }, { displayName: 'Schema Registry URL', @@ -77,7 +77,7 @@ export class KafkaTrigger implements INodeType { ], }, }, - default: '', + default: 'https://schema-registry-domain:8081', description: 'URL of the schema registry.', }, { @@ -182,11 +182,6 @@ export class KafkaTrigger implements INodeType { const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string; - let registry: SchemaRegistry; - if (useSchemaRegistry) { - registry = new SchemaRegistry({ host: schemaRegistryUrl }); - } - const startConsumer = async () => { await consumer.run({ eachMessage: async ({ topic, message }) => { @@ -202,6 +197,7 @@ export class KafkaTrigger implements INodeType { if (useSchemaRegistry) { try { + const registry = new SchemaRegistry({ host: schemaRegistryUrl }); value = await registry.decode(message.value as Buffer); } catch (error) { } }