diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 32f3b7a9c9..d83538b3d9 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -70,6 +70,13 @@ export class KafkaTrigger implements INodeType { default: false, description: 'Allow sending message to a previously non exisiting topic .', }, + { + displayName: 'Read messages from beginning', + name: 'fromBeginning', + type: 'boolean', + default: true, + description: 'Read message from beginning .', + }, { displayName: 'JSON Parse Message', name: 'jsonParseMessage', @@ -140,13 +147,13 @@ export class KafkaTrigger implements INodeType { const consumer = kafka.consumer({ groupId }); await consumer.connect(); + + const options = this.getNodeParameter('options', {}) as IDataObject; - await consumer.subscribe({ topic, fromBeginning: true }); + await consumer.subscribe({ topic, fromBeginning: (options.fromBeginning)? true : false }); const self = this; - const options = this.getNodeParameter('options', {}) as IDataObject; - const startConsumer = async () => { await consumer.run({ eachMessage: async ({ topic, message }) => {