Skip to content

Commit

Permalink
Merge pull request #1581 from matthewloring/deadlock-fix
Browse files Browse the repository at this point in the history
Fix deadlock in consumer group handleError
  • Loading branch information
d1egoaz authored Jan 22, 2020
2 parents 2ead77f + 7f85050 commit 33aa349
Showing 1 changed file with 3 additions and 7 deletions.
10 changes: 3 additions & 7 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ func (c *consumerGroup) Close() (err error) {
c.closeOnce.Do(func() {
close(c.closed)

c.lock.Lock()
defer c.lock.Unlock()

// leave group
if e := c.leave(); e != nil {
err = e
Expand Down Expand Up @@ -385,8 +382,10 @@ func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata)
return strategy.Plan(members, topics)
}

// Leaves the cluster, called by Close, protected by lock.
// Leaves the cluster, called by Close.
func (c *consumerGroup) leave() error {
c.lock.Lock()
defer c.lock.Unlock()
if c.memberID == "" {
return nil
}
Expand Down Expand Up @@ -431,9 +430,6 @@ func (c *consumerGroup) handleError(err error, topic string, partition int32) {
return
}

c.lock.Lock()
defer c.lock.Unlock()

select {
case <-c.closed:
//consumer is closed
Expand Down

0 comments on commit 33aa349

Please sign in to comment.