mirror of
https://github.com/n8n-io/n8n.git
synced 2025-01-11 21:07:28 -08:00
PR Suggestions
This commit is contained in:
parent
23b9e7bbe1
commit
882d1eca8e
|
@ -81,7 +81,7 @@ export class Kafka implements INodeType {
|
||||||
name: 'useSchemaRegistry',
|
name: 'useSchemaRegistry',
|
||||||
type: 'boolean',
|
type: 'boolean',
|
||||||
default: false,
|
default: false,
|
||||||
description: 'Use Apache Avro serialization format and Confluent\' wire formats.',
|
description: 'Use Confluent Schema Registry.',
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
displayName: 'Schema Registry URL',
|
displayName: 'Schema Registry URL',
|
||||||
|
@ -95,7 +95,7 @@ export class Kafka implements INodeType {
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
default: '',
|
default: 'https://schema-registry-domain:8081',
|
||||||
description: 'URL of the schema registry.',
|
description: 'URL of the schema registry.',
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -270,7 +270,9 @@ export class Kafka implements INodeType {
|
||||||
const id = await registry.getLatestSchemaId(eventName);
|
const id = await registry.getLatestSchemaId(eventName);
|
||||||
|
|
||||||
message = await registry.encode(id, JSON.parse(message));
|
message = await registry.encode(id, JSON.parse(message));
|
||||||
} catch (error) {}
|
} catch (exception) {
|
||||||
|
throw new NodeOperationError(this.getNode(), 'Verify your Schema Registry configuration');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const topic = this.getNodeParameter('topic', i) as string;
|
const topic = this.getNodeParameter('topic', i) as string;
|
||||||
|
|
|
@ -63,7 +63,7 @@ export class KafkaTrigger implements INodeType {
|
||||||
name: 'useSchemaRegistry',
|
name: 'useSchemaRegistry',
|
||||||
type: 'boolean',
|
type: 'boolean',
|
||||||
default: false,
|
default: false,
|
||||||
description: 'Use Apache Avro serialization format and Confluent\' wire formats.',
|
description: 'Use Confluent Schema Registry.',
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
displayName: 'Schema Registry URL',
|
displayName: 'Schema Registry URL',
|
||||||
|
@ -77,7 +77,7 @@ export class KafkaTrigger implements INodeType {
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
default: '',
|
default: 'https://schema-registry-domain:8081',
|
||||||
description: 'URL of the schema registry.',
|
description: 'URL of the schema registry.',
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -182,11 +182,6 @@ export class KafkaTrigger implements INodeType {
|
||||||
|
|
||||||
const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string;
|
const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string;
|
||||||
|
|
||||||
let registry: SchemaRegistry;
|
|
||||||
if (useSchemaRegistry) {
|
|
||||||
registry = new SchemaRegistry({ host: schemaRegistryUrl });
|
|
||||||
}
|
|
||||||
|
|
||||||
const startConsumer = async () => {
|
const startConsumer = async () => {
|
||||||
await consumer.run({
|
await consumer.run({
|
||||||
eachMessage: async ({ topic, message }) => {
|
eachMessage: async ({ topic, message }) => {
|
||||||
|
@ -202,6 +197,7 @@ export class KafkaTrigger implements INodeType {
|
||||||
|
|
||||||
if (useSchemaRegistry) {
|
if (useSchemaRegistry) {
|
||||||
try {
|
try {
|
||||||
|
const registry = new SchemaRegistry({ host: schemaRegistryUrl });
|
||||||
value = await registry.decode(message.value as Buffer);
|
value = await registry.decode(message.value as Buffer);
|
||||||
} catch (error) { }
|
} catch (error) { }
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue