Improvements to Kafka-Node

This commit is contained in:
Jan Oberhauser 2020-10-29 15:44:11 +01:00
parent 12bb00fcd6
commit b806ee3a57
4 changed files with 62 additions and 89 deletions

View file

@ -13,12 +13,14 @@ export class Kafka implements ICredentialType {
name: 'clientId',
type: 'string' as NodePropertyTypes,
default: '',
placeholder: 'my-app',
},
{
displayName: 'Brokers',
name: 'brokers',
type: 'string' as NodePropertyTypes,
default: '',
placeholder: 'kafka1:9092,kafka2:9092',
},
{
displayName: 'SSL',
@ -26,5 +28,22 @@ export class Kafka implements ICredentialType {
type: 'boolean' as NodePropertyTypes,
default: true,
},
{
displayName: 'Username',
name: 'username',
type: 'string' as NodePropertyTypes,
default: '',
description: 'Optional username if authenticated is required.',
},
{
displayName: 'Password',
name: 'password',
type: 'string' as NodePropertyTypes,
typeOptions: {
password: true,
},
default: '',
description: 'Optional password if authenticated is required.',
},
];
}

View file

@ -1,29 +0,0 @@
import {
ICredentialType,
NodePropertyTypes,
} from 'n8n-workflow';
export class KafkaPlain implements ICredentialType {
extends = [
'kafka',
];
name = 'kafkaPlain';
displayName = 'Kafka';
properties = [
{
displayName: 'Username',
name: 'username',
type: 'string' as NodePropertyTypes,
default: '',
},
{
displayName: 'Password',
name: 'password',
type: 'string' as NodePropertyTypes,
typeOptions: {
password: true,
},
default: '',
},
];
}

View file

@ -34,58 +34,38 @@ export class Kafka implements INodeType {
credentials: [
{
name: 'kafka',
displayOptions: {
show: {
authentication: [
'none',
],
},
},
required: true,
},
{
name: 'kafkaPlain',
displayOptions: {
show: {
authentication: [
'plain',
],
},
},
required: true,
},
],
properties: [
{
displayName: 'Authentication',
name: 'authentication',
type: 'options',
options: [
{
name: 'None',
value: 'none',
},
{
name: 'Plain',
value: 'plain',
},
],
default: 'none',
},
{
displayName: 'Topic',
name: 'topic',
type: 'string',
default: '',
placeholder: 'topic-name',
description: 'Name of the queue of topic to publish to',
description: 'Name of the queue of topic to publish to.',
},
{
displayName: 'Send Input Data',
name: 'sendInputData',
type: 'boolean',
default: true,
description: 'Send the the data the node receives as JSON to Kafka.',
},
{
displayName: 'Message',
name: 'message',
type: 'string',
displayOptions: {
show: {
sendInputData: [
false,
],
},
},
default: '',
description: 'The message to be sent',
description: 'The message to be sent.',
},
{
displayName: 'JSON Parameters',
@ -142,7 +122,7 @@ export class Kafka implements INodeType {
},
},
default: '',
description: 'Header parameters as JSON (flat object)',
description: 'Header parameters as JSON (flat object).',
},
{
displayName: 'Options',
@ -156,21 +136,21 @@ export class Kafka implements INodeType {
name: 'acks',
type: 'boolean',
default: false,
description: 'Whether or not producer must wait for acknowledgement from all replicas',
description: 'Whether or not producer must wait for acknowledgement from all replicas.',
},
{
displayName: 'Compression',
name: 'compression',
type: 'boolean',
default: false,
description: 'Send the data in a compressed format using the GZIP codec',
description: 'Send the data in a compressed format using the GZIP codec.',
},
{
displayName: 'Timeout',
name: 'timeout',
type: 'number',
default: 30000,
description: 'The time to await a response in ms',
description: 'The time to await a response in ms.',
},
],
},
@ -182,13 +162,12 @@ export class Kafka implements INodeType {
const length = items.length as unknown as number;
const authentication = this.getNodeParameter('authentication', 0) as string;
const topicMessages: TopicMessages[] = [];
let responseData;
let responseData: IDataObject[];
const options = this.getNodeParameter('options', 0) as IDataObject;
const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;
const timeout = options.timeout as number;
@ -200,11 +179,9 @@ export class Kafka implements INodeType {
compression = CompressionTypes.GZIP;
}
let credentials: IDataObject = {};
const credentials = this.getCredentials('kafka') as IDataObject;
const sasl: SASLOptions | IDataObject = {};
const brokers = (credentials.brokers as string || '').split(',') as string[];
const brokers = (credentials.brokers as string || '').split(',').map(item => item.trim()) as string[];
const clientId = credentials.clientId as string;
@ -214,18 +191,13 @@ export class Kafka implements INodeType {
clientId,
brokers,
ssl,
//@ts-ignore
sasl,
};
if (authentication === 'plain') {
credentials = this.getCredentials('kafkaPlain') as IDataObject;
sasl.username = credentials.username as string;
sasl.password = credentials.password as string;
sasl.mechanism = 'plain';
} else {
credentials = this.getCredentials('kafka') as IDataObject;
delete config.sasl;
if (credentials.username || credentials.password) {
config.sasl = {
username: credentials.username as string,
password: credentials.password as string,
} as SASLOptions;
}
const kafka = new apacheKafka(config);
@ -234,9 +206,14 @@ export class Kafka implements INodeType {
await producer.connect();
for (let i = 0; i < length; i++) {
let message: string;
const message = this.getNodeParameter('message', i) as string;
for (let i = 0; i < length; i++) {
if (sendInputData === true) {
message = JSON.stringify(items[i].json);
} else {
message = this.getNodeParameter('message', i) as string;
}
const topic = this.getNodeParameter('topic', i) as string;
@ -278,7 +255,14 @@ export class Kafka implements INodeType {
timeout,
compression,
acks,
},
);
if (responseData.length === 0) {
responseData.push({
success: true,
});
}
await producer.disconnect();

View file

@ -102,7 +102,6 @@
"dist/credentials/JiraSoftwareServerApi.credentials.js",
"dist/credentials/JotFormApi.credentials.js",
"dist/credentials/Kafka.credentials.js",
"dist/credentials/KafkaPlain.credentials.js",
"dist/credentials/KeapOAuth2Api.credentials.js",
"dist/credentials/LinkedInOAuth2Api.credentials.js",
"dist/credentials/MailerLiteApi.credentials.js",