n8n/packages/nodes-base/nodes/Kafka/Kafka.node.ts
Omar Ajoue 7ce7285f7a
Load credentials from the database (#1741)
* Changes to types so that credentials can be always loaded from DB

This first commit changes all return types from the execute functions
and calls to get credentials to be async so we can use await.

This is a first step as previously credentials were loaded in memory and
always available. We will now be loading them from the DB which requires
turning the whole call chain async.

* Fix updated files

* Removed unnecessary credential loading to improve performance

* Fix typo

*  Fix issue

* Updated new nodes to load credentials async

*  Remove not needed comment

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-08-20 18:57:30 +02:00

285 lines
6 KiB
TypeScript

import {
CompressionTypes,
Kafka as apacheKafka,
KafkaConfig,
SASLOptions,
TopicMessages,
} from 'kafkajs';
import {
IExecuteFunctions,
} from 'n8n-core';
import {
IDataObject,
INodeExecutionData,
INodeType,
INodeTypeDescription,
NodeOperationError,
} from 'n8n-workflow';
export class Kafka implements INodeType {
description: INodeTypeDescription = {
displayName: 'Kafka',
name: 'kafka',
icon: 'file:kafka.svg',
group: ['transform'],
version: 1,
description: 'Sends messages to a Kafka topic',
defaults: {
name: 'Kafka',
color: '#000000',
},
inputs: ['main'],
outputs: ['main'],
credentials: [
{
name: 'kafka',
required: true,
},
],
properties: [
{
displayName: 'Topic',
name: 'topic',
type: 'string',
default: '',
placeholder: 'topic-name',
description: 'Name of the queue of topic to publish to.',
},
{
displayName: 'Send Input Data',
name: 'sendInputData',
type: 'boolean',
default: true,
description: 'Send the the data the node receives as JSON to Kafka.',
},
{
displayName: 'Message',
name: 'message',
type: 'string',
displayOptions: {
show: {
sendInputData: [
false,
],
},
},
default: '',
description: 'The message to be sent.',
},
{
displayName: 'JSON Parameters',
name: 'jsonParameters',
type: 'boolean',
default: false,
},
{
displayName: 'Headers',
name: 'headersUi',
placeholder: 'Add Header',
type: 'fixedCollection',
displayOptions: {
show: {
jsonParameters: [
false,
],
},
},
typeOptions: {
multipleValues: true,
},
default: {},
options: [
{
name: 'headerValues',
displayName: 'Header',
values: [
{
displayName: 'Key',
name: 'key',
type: 'string',
default: '',
},
{
displayName: 'Value',
name: 'value',
type: 'string',
default: '',
},
],
},
],
},
{
displayName: 'Headers (JSON)',
name: 'headerParametersJson',
type: 'json',
displayOptions: {
show: {
jsonParameters: [
true,
],
},
},
default: '',
description: 'Header parameters as JSON (flat object).',
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
default: {},
placeholder: 'Add Option',
options: [
{
displayName: 'Acks',
name: 'acks',
type: 'boolean',
default: false,
description: 'Whether or not producer must wait for acknowledgement from all replicas.',
},
{
displayName: 'Compression',
name: 'compression',
type: 'boolean',
default: false,
description: 'Send the data in a compressed format using the GZIP codec.',
},
{
displayName: 'Timeout',
name: 'timeout',
type: 'number',
default: 30000,
description: 'The time to await a response in ms.',
},
],
},
],
};
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
const items = this.getInputData();
const length = items.length as unknown as number;
const topicMessages: TopicMessages[] = [];
let responseData: IDataObject[];
try {
const options = this.getNodeParameter('options', 0) as IDataObject;
const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;
const timeout = options.timeout as number;
let compression = CompressionTypes.None;
const acks = (options.acks === true) ? 1 : 0;
if (options.compression === true) {
compression = CompressionTypes.GZIP;
}
const credentials = await this.getCredentials('kafka') as IDataObject;
const brokers = (credentials.brokers as string || '').split(',').map(item => item.trim()) as string[];
const clientId = credentials.clientId as string;
const ssl = credentials.ssl as boolean;
const config: KafkaConfig = {
clientId,
brokers,
ssl,
};
if (credentials.authentication === true) {
if(!(credentials.username && credentials.password)) {
throw new NodeOperationError(this.getNode(), 'Username and password are required for authentication');
}
config.sasl = {
username: credentials.username as string,
password: credentials.password as string,
mechanism: credentials.saslMechanism as string,
} as SASLOptions;
}
const kafka = new apacheKafka(config);
const producer = kafka.producer();
await producer.connect();
let message: string;
for (let i = 0; i < length; i++) {
if (sendInputData === true) {
message = JSON.stringify(items[i].json);
} else {
message = this.getNodeParameter('message', i) as string;
}
const topic = this.getNodeParameter('topic', i) as string;
const jsonParameters = this.getNodeParameter('jsonParameters', i) as boolean;
let headers;
if (jsonParameters === true) {
headers = this.getNodeParameter('headerParametersJson', i) as string;
try {
headers = JSON.parse(headers);
} catch (exception) {
throw new NodeOperationError(this.getNode(), 'Headers must be a valid json');
}
} else {
const values = (this.getNodeParameter('headersUi', i) as IDataObject).headerValues as IDataObject[];
headers = {};
if (values !== undefined) {
for (const value of values) {
//@ts-ignore
headers[value.key] = value.value;
}
}
}
topicMessages.push(
{
topic,
messages: [{
value: message,
headers,
}],
});
}
responseData = await producer.sendBatch(
{
topicMessages,
timeout,
compression,
acks,
},
);
if (responseData.length === 0) {
responseData.push({
success: true,
});
}
await producer.disconnect();
return [this.helpers.returnJsonArray(responseData)];
} catch (error) {
if (this.continueOnFail()) {
return [this.helpers.returnJsonArray({ error: error.message })];
} else {
throw error;
}
}
}
}