n8n/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts
Omar Ajoue 7ce7285f7a
Load credentials from the database (#1741)
* Changes to types so that credentials can be always loaded from DB

This first commit changes all return types from the execute functions
and calls to get credentials to be async so we can use await.

This is a first step as previously credentials were loaded in memory and
always available. We will now be loading them from the DB which requires
turning the whole call chain async.

* Fix updated files

* Removed unnecessary credential loading to improve performance

* Fix typo

*  Fix issue

* Updated new nodes to load credentials async

*  Remove not needed comment

Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
2021-08-20 18:57:30 +02:00

207 lines
4.7 KiB
TypeScript

import {
Kafka as apacheKafka,
KafkaConfig,
logLevel,
SASLOptions,
} from 'kafkajs';
import {
ITriggerFunctions,
} from 'n8n-core';
import {
IDataObject,
INodeType,
INodeTypeDescription,
ITriggerResponse,
NodeOperationError,
} from 'n8n-workflow';
export class KafkaTrigger implements INodeType {
description: INodeTypeDescription = {
displayName: 'Kafka Trigger',
name: 'kafkaTrigger',
icon: 'file:kafka.svg',
group: ['trigger'],
version: 1,
description: 'Consume messages from a Kafka topic',
defaults: {
name: 'Kafka Trigger',
color: '#000000',
},
inputs: [],
outputs: ['main'],
credentials: [
{
name: 'kafka',
required: true,
},
],
properties: [
{
displayName: 'Topic',
name: 'topic',
type: 'string',
default: '',
required: true,
placeholder: 'topic-name',
description: 'Name of the queue of topic to consume from.',
},
{
displayName: 'Group ID',
name: 'groupId',
type: 'string',
default: '',
required: true,
placeholder: 'n8n-kafka',
description: 'ID of the consumer group.',
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
default: {},
placeholder: 'Add Option',
options: [
{
displayName: 'Allow Topic Creation',
name: 'allowAutoTopicCreation',
type: 'boolean',
default: false,
description: 'Allow sending message to a previously non exisiting topic .',
},
{
displayName: 'Read messages from beginning',
name: 'fromBeginning',
type: 'boolean',
default: true,
description: 'Read message from beginning .',
},
{
displayName: 'JSON Parse Message',
name: 'jsonParseMessage',
type: 'boolean',
default: false,
description: 'Try to parse the message to an object.',
},
{
displayName: 'Only Message',
name: 'onlyMessage',
type: 'boolean',
displayOptions: {
show: {
jsonParseMessage: [
true,
],
},
},
default: false,
description: 'Returns only the message property.',
},
{
displayName: 'Session Timeout',
name: 'sessionTimeout',
type: 'number',
default: 30000,
description: 'The time to await a response in ms.',
},
],
},
],
};
async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {
const topic = this.getNodeParameter('topic') as string;
const groupId = this.getNodeParameter('groupId') as string;
const credentials = await this.getCredentials('kafka') as IDataObject;
const brokers = (credentials.brokers as string || '').split(',').map(item => item.trim()) as string[];
const clientId = credentials.clientId as string;
const ssl = credentials.ssl as boolean;
const config: KafkaConfig = {
clientId,
brokers,
ssl,
logLevel: logLevel.ERROR,
};
if (credentials.authentication === true) {
if(!(credentials.username && credentials.password)) {
throw new NodeOperationError(this.getNode(), 'Username and password are required for authentication');
}
config.sasl = {
username: credentials.username as string,
password: credentials.password as string,
mechanism: credentials.saslMechanism as string,
} as SASLOptions;
}
const kafka = new apacheKafka(config);
const consumer = kafka.consumer({ groupId });
await consumer.connect();
const options = this.getNodeParameter('options', {}) as IDataObject;
await consumer.subscribe({ topic, fromBeginning: (options.fromBeginning)? true : false });
const self = this;
const startConsumer = async () => {
await consumer.run({
eachMessage: async ({ topic, message }) => {
let data: IDataObject = {};
let value = message.value?.toString() as string;
if (options.jsonParseMessage) {
try {
value = JSON.parse(value);
} catch (error) { }
}
data.message = value;
data.topic = topic;
if (options.onlyMessage) {
//@ts-ignore
data = value;
}
self.emit([self.helpers.returnJsonArray([data])]);
},
});
};
startConsumer();
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
await consumer.disconnect();
}
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the
// workflow manually. So the function has to make sure that
// the emit() gets called with similar data like when it
// would trigger by itself so that the user knows what data
// to expect.
async function manualTriggerFunction() {
startConsumer();
}
return {
closeFunction,
manualTriggerFunction,
};
}
}