Add Kafka Trigger-Node (#1213)

*  Kafka trigger node

*  Small improvements

*  Minor improvements to Kafka Trigger-Node

Co-authored-by: ricardo <ricardoespinoza105@gmail.com>
Co-authored-by: Jan Oberhauser <jan.oberhauser@gmail.com>
This commit is contained in:
Ahsan Virani 2020-12-03 13:02:22 +01:00 committed by GitHub
parent 8fee799d3d
commit 23f088e2e6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 195 additions and 0 deletions

View file

@ -0,0 +1,194 @@
import {
Kafka as apacheKafka,
KafkaConfig,
logLevel,
SASLOptions,
} from 'kafkajs';
import {
ITriggerFunctions,
} from 'n8n-core';
import {
IDataObject,
INodeType,
INodeTypeDescription,
ITriggerResponse,
} from 'n8n-workflow';
export class KafkaTrigger implements INodeType {
description: INodeTypeDescription = {
displayName: 'Kafka Trigger',
name: 'kafkaTrigger',
icon: 'file:kafka.svg',
group: ['trigger'],
version: 1,
description: 'Consume messages from a Kafka topic',
defaults: {
name: 'Kafka Trigger',
color: '#000000',
},
inputs: [],
outputs: ['main'],
credentials: [
{
name: 'kafka',
required: true,
},
],
properties: [
{
displayName: 'Topic',
name: 'topic',
type: 'string',
default: '',
required: true,
placeholder: 'topic-name',
description: 'Name of the queue of topic to consume from.',
},
{
displayName: 'Group ID',
name: 'groupId',
type: 'string',
default: '',
required: true,
placeholder: 'n8n-kafka',
description: 'ID of the consumer group.',
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
default: {},
placeholder: 'Add Option',
options: [
{
displayName: 'Allow Topic Creation',
name: 'allowAutoTopicCreation',
type: 'boolean',
default: false,
description: 'Allow sending message to a previously non exisiting topic .',
},
{
displayName: 'JSON Parse Message',
name: 'jsonParseMessage',
type: 'boolean',
default: false,
description: 'Try to parse the message to an object.',
},
{
displayName: 'Only Message',
name: 'onlyMessage',
type: 'boolean',
displayOptions: {
show: {
jsonParseMessage: [
true,
],
},
},
default: false,
description: 'Returns only the message property.',
},
{
displayName: 'Session Timeout',
name: 'sessionTimeout',
type: 'number',
default: 30000,
description: 'The time to await a response in ms.',
},
],
},
],
};
async trigger(this: ITriggerFunctions): Promise<ITriggerResponse> {
const topic = this.getNodeParameter('topic') as string;
const groupId = this.getNodeParameter('groupId') as string;
const credentials = this.getCredentials('kafka') as IDataObject;
const brokers = (credentials.brokers as string || '').split(',').map(item => item.trim()) as string[];
const clientId = credentials.clientId as string;
const ssl = credentials.ssl as boolean;
const config: KafkaConfig = {
clientId,
brokers,
ssl,
logLevel: logLevel.ERROR,
};
if (credentials.username || credentials.password) {
config.sasl = {
username: credentials.username as string,
password: credentials.password as string,
} as SASLOptions;
}
const kafka = new apacheKafka(config);
const consumer = kafka.consumer({ groupId });
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
const self = this;
const options = this.getNodeParameter('options', {}) as IDataObject;
const startConsumer = async () => {
await consumer.run({
eachMessage: async ({ topic, message }) => {
let data: IDataObject = {};
let value = message.value?.toString() as string;
if (options.jsonParseMessage) {
try {
value = JSON.parse(value);
} catch (err) { }
}
data.message = value;
data.topic = topic;
if (options.onlyMessage) {
//@ts-ignore
data = value;
}
self.emit([self.helpers.returnJsonArray([data])]);
},
});
};
startConsumer();
// The "closeFunction" function gets called by n8n whenever
// the workflow gets deactivated and can so clean up.
async function closeFunction() {
await consumer.disconnect();
}
// The "manualTriggerFunction" function gets called by n8n
// when a user is in the workflow editor and starts the
// workflow manually. So the function has to make sure that
// the emit() gets called with similar data like when it
// would trigger by itself so that the user knows what data
// to expect.
async function manualTriggerFunction() {
startConsumer();
}
return {
closeFunction,
manualTriggerFunction,
};
}
}

View file

@ -337,6 +337,7 @@
"dist/nodes/Jira/JiraTrigger.node.js",
"dist/nodes/JotForm/JotFormTrigger.node.js",
"dist/nodes/Kafka/Kafka.node.js",
"dist/nodes/Kafka/KafkaTrigger.node.js",
"dist/nodes/Keap/Keap.node.js",
"dist/nodes/Keap/KeapTrigger.node.js",
"dist/nodes/Line/Line.node.js",