diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index ffc48ceea1..c2ab8db751 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -58,6 +58,28 @@ export class KafkaTrigger implements INodeType { placeholder: 'n8n-kafka', description: 'ID of the consumer group.', }, + { + displayName: 'Use Schema Registry', + name: 'useSchemaRegistry', + type: 'boolean', + default: false, + description: 'Use Apache Avro serialization format and Confluent\' wire formats.', + }, + { + displayName: 'Schema Registry URL', + name: 'schemaRegistryUrl', + type: 'string', + required: true, + displayOptions: { + show: { + useSchemaRegistry: [ + true, + ], + }, + }, + default: '', + description: 'URL of the schema registry.', + }, { displayName: 'Options', name: 'options', @@ -100,27 +122,6 @@ export class KafkaTrigger implements INodeType { default: 30000, description: 'The time to await a response in ms.', }, - { - displayName: 'Use Schema Registry', - name: 'useSchemaRegistry', - type: 'boolean', - default: false, - description: 'Use Apache Avro serialization format and Confluent\' wire formats.', - }, - { - displayName: 'Schema Registry URL', - name: 'schemaRegistryUrl', - type: 'string', - displayOptions: { - show: { - useSchemaRegistry: [ - true, - ], - }, - }, - default: '', - description: 'URL of the schema registry.', - }, ], }, ], @@ -170,9 +171,13 @@ export class KafkaTrigger implements INodeType { const options = this.getNodeParameter('options', {}) as IDataObject; + const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean; + + const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string; + let registry: SchemaRegistry; - if (options.useSchemaRegistry) { - registry = new SchemaRegistry({ host: options.schemaRegistryUrl as string }); + if (useSchemaRegistry) { + registry = new SchemaRegistry({ host: schemaRegistryUrl }); } const startConsumer = async () => {