From e31c0d7b46894ab262c611d1d2552f36d9e73106 Mon Sep 17 00:00:00 2001 From: AmitMendl Date: Fri, 29 Dec 2023 22:58:55 +0200 Subject: [PATCH] fix: added Errors() read from kafkaSensor's producer Signed-off-by: AmitMendl --- eventbus/kafka/sensor/kafka_sensor.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/eventbus/kafka/sensor/kafka_sensor.go b/eventbus/kafka/sensor/kafka_sensor.go index 593419cf8d..1f39403701 100644 --- a/eventbus/kafka/sensor/kafka_sensor.go +++ b/eventbus/kafka/sensor/kafka_sensor.go @@ -132,6 +132,13 @@ func (s *KafkaSensor) Initialize() error { return err } + // producer is at risk of deadlocking if Errors channel isn't read. + go func() { + for err := range producer.Errors() { + s.Logger.Errorf("Kafka producer error", zap.Error(err)) + } + }() + s.client = client s.consumer = consumer s.kafkaHandler = &KafkaHandler{