diff --git a/consumer_group.go b/consumer_group.go index b974dd9af..56a0c96c7 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -120,9 +120,6 @@ func (c *consumerGroup) Close() (err error) { c.closeOnce.Do(func() { close(c.closed) - c.lock.Lock() - defer c.lock.Unlock() - // leave group if e := c.leave(); e != nil { err = e @@ -384,8 +381,10 @@ func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) return strategy.Plan(members, topics) } -// Leaves the cluster, called by Close, protected by lock. +// Leaves the cluster, called by Close. func (c *consumerGroup) leave() error { + c.lock.Lock() + defer c.lock.Unlock() if c.memberID == "" { return nil } @@ -430,9 +429,6 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) { return } - c.lock.Lock() - defer c.lock.Unlock() - select { case <-c.closed: //consumer is closed