n8n/packages/nodes-base/nodes/Kafka/Kafka.node.ts
Iván Ovejero 0448feec56
refactor: Apply eslint-plugin-n8n-nodes-base autofixable rules (#3174)
*  Initial setup

* 👕 Update `.eslintignore`

* 👕 Autofix node-param-default-missing (#3173)

* 🔥 Remove duplicate key

* 👕 Add exceptions

* 📦 Update package-lock.json

* 👕 Apply `node-class-description-inputs-wrong-trigger-node` (#3176)

* 👕 Apply `node-class-description-inputs-wrong-regular-node` (#3177)

* 👕 Apply `node-class-description-outputs-wrong` (#3178)

* 👕 Apply `node-execute-block-double-assertion-for-items` (#3179)

* 👕 Apply `node-param-default-wrong-for-collection` (#3180)

* 👕 Apply node-param-default-wrong-for-boolean (#3181)

* Autofixed default missing

* Autofixed booleans, worked well

*  Fix params

*  Undo exempted autofixes

* 📦 Update package-lock.json

* 👕 Apply node-class-description-missing-subtitle (#3182)

*  Fix missing comma

* 👕 Apply `node-param-default-wrong-for-fixed-collection` (#3184)

* 👕 Add exception for `node-class-description-missing-subtitle`

* 👕 Apply `node-param-default-wrong-for-multi-options` (#3185)

* 👕 Apply `node-param-collection-type-unsorted-items` (#3186)

* Missing coma

* 👕 Apply `node-param-default-wrong-for-simplify` (#3187)

* 👕 Apply `node-param-description-comma-separated-hyphen` (#3190)

* 👕 Apply `node-param-description-empty-string` (#3189)

* 👕 Apply `node-param-description-excess-inner-whitespace` (#3191)

* Rule looks good

* Add whitespace rule in eslint config

* :zao: fix

* 👕 Apply `node-param-description-identical-to-display-name` (#3193)

* 👕 Apply `node-param-description-missing-for-ignore-ssl-issues` (#3195)

*  Revert ":zao: fix"

This reverts commit ef8a76f3df.

* 👕 Apply `node-param-description-missing-for-simplify`  (#3196)

* 👕 Apply `node-param-description-missing-final-period` (#3194)

* Rule working as intended

* Add rule to eslint

* 👕 Apply node-param-description-missing-for-return-all (#3197)

*  Restore `lintfix` command

Co-authored-by: agobrech <45268029+agobrech@users.noreply.github.com>
Co-authored-by: Omar Ajoue <krynble@gmail.com>
Co-authored-by: agobrech <ael.gobrecht@gmail.com>
Co-authored-by: Michael Kret <michael.k@radency.com>
2022-04-22 18:29:51 +02:00

340 lines
7.5 KiB
TypeScript

import {
CompressionTypes,
Kafka as apacheKafka,
KafkaConfig,
SASLOptions,
TopicMessages,
} from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import {
IExecuteFunctions,
} from 'n8n-core';
import {
IDataObject,
INodeExecutionData,
INodeType,
INodeTypeDescription,
NodeOperationError,
} from 'n8n-workflow';
export class Kafka implements INodeType {
description: INodeTypeDescription = {
displayName: 'Kafka',
name: 'kafka',
icon: 'file:kafka.svg',
group: ['transform'],
version: 1,
description: 'Sends messages to a Kafka topic',
defaults: {
name: 'Kafka',
},
inputs: ['main'],
outputs: ['main'],
credentials: [
{
name: 'kafka',
required: true,
},
],
properties: [
{
displayName: 'Topic',
name: 'topic',
type: 'string',
default: '',
placeholder: 'topic-name',
description: 'Name of the queue of topic to publish to.',
},
{
displayName: 'Send Input Data',
name: 'sendInputData',
type: 'boolean',
default: true,
description: 'Send the the data the node receives as JSON to Kafka.',
},
{
displayName: 'Message',
name: 'message',
type: 'string',
displayOptions: {
show: {
sendInputData: [
false,
],
},
},
default: '',
description: 'The message to be sent.',
},
{
displayName: 'JSON Parameters',
name: 'jsonParameters',
type: 'boolean',
default: false,
},
{
displayName: 'Use Schema Registry',
name: 'useSchemaRegistry',
type: 'boolean',
default: false,
description: 'Use Confluent Schema Registry.',
},
{
displayName: 'Schema Registry URL',
name: 'schemaRegistryUrl',
type: 'string',
required: true,
displayOptions: {
show: {
useSchemaRegistry: [
true,
],
},
},
placeholder: 'https://schema-registry-domain:8081',
default: '',
description: 'URL of the schema registry.',
},
{
displayName: 'Event Name',
name: 'eventName',
type: 'string',
required: true,
displayOptions: {
show: {
useSchemaRegistry: [
true,
],
},
},
default: '',
description: 'Namespace and Name of Schema in Schema Registry (namespace.name).',
},
{
displayName: 'Headers',
name: 'headersUi',
placeholder: 'Add Header',
type: 'fixedCollection',
displayOptions: {
show: {
jsonParameters: [
false,
],
},
},
typeOptions: {
multipleValues: true,
},
default: {},
options: [
{
name: 'headerValues',
displayName: 'Header',
values: [
{
displayName: 'Key',
name: 'key',
type: 'string',
default: '',
},
{
displayName: 'Value',
name: 'value',
type: 'string',
default: '',
},
],
},
],
},
{
displayName: 'Headers (JSON)',
name: 'headerParametersJson',
type: 'json',
displayOptions: {
show: {
jsonParameters: [
true,
],
},
},
default: '',
description: 'Header parameters as JSON (flat object).',
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
default: {},
placeholder: 'Add Option',
options: [
{
displayName: 'Acks',
name: 'acks',
type: 'boolean',
default: false,
description: 'Whether or not producer must wait for acknowledgement from all replicas.',
},
{
displayName: 'Compression',
name: 'compression',
type: 'boolean',
default: false,
description: 'Send the data in a compressed format using the GZIP codec.',
},
{
displayName: 'Timeout',
name: 'timeout',
type: 'number',
default: 30000,
description: 'The time to await a response in ms.',
},
],
},
],
};
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const length = items.length;
const topicMessages: TopicMessages[] = [];
let responseData: IDataObject[];
try {
const options = this.getNodeParameter('options', 0) as IDataObject;
const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;
const useSchemaRegistry = this.getNodeParameter('useSchemaRegistry', 0) as boolean;
const timeout = options.timeout as number;
let compression = CompressionTypes.None;
const acks = (options.acks === true) ? 1 : 0;
if (options.compression === true) {
compression = CompressionTypes.GZIP;
}
const credentials = await this.getCredentials('kafka');
const brokers = (credentials.brokers as string || '').split(',').map(item => item.trim()) as string[];
const clientId = credentials.clientId as string;
const ssl = credentials.ssl as boolean;
const config: KafkaConfig = {
clientId,
brokers,
ssl,
};
if (credentials.authentication === true) {
if(!(credentials.username && credentials.password)) {
throw new NodeOperationError(this.getNode(), 'Username and password are required for authentication');
}
config.sasl = {
username: credentials.username as string,
password: credentials.password as string,
mechanism: credentials.saslMechanism as string,
} as SASLOptions;
}
const kafka = new apacheKafka(config);
const producer = kafka.producer();
await producer.connect();
let message: string | Buffer;
for (let i = 0; i < length; i++) {
if (sendInputData === true) {
message = JSON.stringify(items[i].json);
} else {
message = this.getNodeParameter('message', i) as string;
}
if (useSchemaRegistry) {
try {
const schemaRegistryUrl = this.getNodeParameter('schemaRegistryUrl', 0) as string;
const eventName = this.getNodeParameter('eventName', 0) as string;
const registry = new SchemaRegistry({ host: schemaRegistryUrl });
const id = await registry.getLatestSchemaId(eventName);
message = await registry.encode(id, JSON.parse(message));
} catch (exception) {
throw new NodeOperationError(this.getNode(), 'Verify your Schema Registry configuration');
}
}
const topic = this.getNodeParameter('topic', i) as string;
const jsonParameters = this.getNodeParameter('jsonParameters', i) as boolean;
let headers;
if (jsonParameters === true) {
headers = this.getNodeParameter('headerParametersJson', i) as string;
try {
headers = JSON.parse(headers);
} catch (exception) {
throw new NodeOperationError(this.getNode(), 'Headers must be a valid json');
}
} else {
const values = (this.getNodeParameter('headersUi', i) as IDataObject).headerValues as IDataObject[];
headers = {};
if (values !== undefined) {
for (const value of values) {
//@ts-ignore
headers[value.key] = value.value;
}
}
}
topicMessages.push(
{
topic,
messages: [{
value: message,
headers,
}],
});
}
responseData = await producer.sendBatch(
{
topicMessages,
timeout,
compression,
acks,
},
);
if (responseData.length === 0) {
responseData.push({
success: true,
});
}
await producer.disconnect();
return [this.helpers.returnJsonArray(responseData)];
} catch (error) {
if (this.continueOnFail()) {
return [this.helpers.returnJsonArray({ error: error.message })];
} else {
throw error;
}
}
}
}