diff --git a/consumer_group.go b/consumer_group.go index 97c08e9cc..fc95cd0df 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -120,9 +120,6 @@ func (c *consumerGroup) Close() (err error) { c.closeOnce.Do(func() { close(c.closed) - c.lock.Lock() - defer c.lock.Unlock() - // leave group if e := c.leave(); e != nil { err = e @@ -385,8 +382,10 @@ func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) return strategy.Plan(members, topics) } -// Leaves the cluster, called by Close, protected by lock. +// Leaves the cluster, called by Close. func (c *consumerGroup) leave() error { + c.lock.Lock() + defer c.lock.Unlock() if c.memberID == "" { return nil } @@ -431,9 +430,6 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { return } - c.lock.Lock() - defer c.lock.Unlock() - select { case <-c.closed: //consumer is closed