From 7f8505057ab96daf938460eea8c3dd6f93909518 Mon Sep 17 00:00:00 2001 From: Matt Loring Date: Thu, 16 Jan 2020 13:00:41 -0800 Subject: [PATCH] Fix deadlock in consumer group Close --- consumer_group.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/consumer_group.go b/consumer_group.go index b974dd9af..56a0c96c7 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -120,9 +120,6 @@ func (c *consumerGroup) Close() (err error) { c.closeOnce.Do(func() { close(c.closed) - c.lock.Lock() - defer c.lock.Unlock() - // leave group if e := c.leave(); e != nil { err = e @@ -384,8 +381,10 @@ func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) return strategy.Plan(members, topics) } -// Leaves the cluster, called by Close, protected by lock. +// Leaves the cluster, called by Close. func (c *consumerGroup) leave() error { + c.lock.Lock() + defer c.lock.Unlock() if c.memberID == "" { return nil } @@ -430,9 +429,6 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { return } - c.lock.Lock() - defer c.lock.Unlock() - select { case <-c.closed: //consumer is closed