From 93f4001ec386a68fee0ebd8380416d9cf19ff17a Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Sat, 2 Nov 2019 22:44:38 +0100 Subject: [PATCH] fixed race conditions and nil panic on calling registerBroker after close --- client.go | 8 ++++++++ consumer_group.go | 28 ++++++++++++++++------------ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/client.go b/client.go index 7833d3f2a..e9bdecf03 100644 --- a/client.go +++ b/client.go @@ -242,6 +242,9 @@ func (client *client) Close() error { } func (client *client) Closed() bool { + client.lock.RLock() + defer client.lock.RUnlock() + return client.brokers == nil } @@ -529,6 +532,11 @@ func (client *client) RefreshCoordinator(consumerGroup string) error { // in the brokers map. It returns the broker that is registered, which may be the provided broker, // or a previously registered Broker instance. You must hold the write lock before calling this function. func (client *client) registerBroker(broker *Broker) { + if client.brokers == nil { + Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr()) + return + } + if client.brokers[broker.ID()] == nil { client.brokers[broker.ID()] = broker Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr()) diff --git a/consumer_group.go b/consumer_group.go index da99e8811..90ff14ee0 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -417,12 +417,6 @@ func (c *consumerGroup) leave() error { } func (c *consumerGroup) handleError(err error, topic string, partition int32) { - select { - case <-c.closed: - return - default: - } - if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 { err = &ConsumerError{ Topic: topic, @@ -431,13 +425,23 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { } } - if c.config.Consumer.Return.Errors { - select { - case c.errors <- err: - default: - } - } else { + if !c.config.Consumer.Return.Errors { Logger.Println(err) + return + } + + c.lock.Lock() + defer c.lock.Unlock() + + select { + case <-c.closed: + return + default: + } + + select { + case c.errors <- err: + default: } }