From 58f0d7cffcd16d1c669046dbc00712aa18ca417f Mon Sep 17 00:00:00 2001 From: Pierre Lanvin Date: Wed, 14 Jul 2021 20:31:43 +0200 Subject: [PATCH] :zap: Allow fromBeginning config in kafka trigger node (#1958) * Allow fromBeginning config in kafka trigger node * make sure options in defined --- .../nodes-base/nodes/Kafka/KafkaTrigger.node.ts | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts index 32f3b7a9c..d83538b3d 100644 --- a/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts +++ b/packages/nodes-base/nodes/Kafka/KafkaTrigger.node.ts @@ -70,6 +70,13 @@ export class KafkaTrigger implements INodeType { default: false, description: 'Allow sending message to a previously non exisiting topic .', }, + { + displayName: 'Read messages from beginning', + name: 'fromBeginning', + type: 'boolean', + default: true, + description: 'Read message from beginning .', + }, { displayName: 'JSON Parse Message', name: 'jsonParseMessage', @@ -140,13 +147,13 @@ export class KafkaTrigger implements INodeType { const consumer = kafka.consumer({ groupId }); await consumer.connect(); + + const options = this.getNodeParameter('options', {}) as IDataObject; - await consumer.subscribe({ topic, fromBeginning: true }); + await consumer.subscribe({ topic, fromBeginning: (options.fromBeginning)? true : false }); const self = this; - const options = this.getNodeParameter('options', {}) as IDataObject; - const startConsumer = async () => { await consumer.run({ eachMessage: async ({ topic, message }) => {