Skip to content

Commit

Permalink
Fix hypothetical consumer deadlock
Browse files Browse the repository at this point in the history
In the case when the user calls `AsyncClose` at the same instant as the broker
leading the partition returns an error.

Fixes bug #475.
  • Loading branch information
eapache committed Jul 15, 2015
1 parent c1d582e commit 4a2af77
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,23 @@ func (child *partitionConsumer) responseFeeder() {
// so shut it down and force the user to choose what to do
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err)
child.sendError(err)
child.AsyncClose()
select {
case child.dying <- nil:
case <-child.trigger:
}
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.dying <- err
select {
case child.dying <- err:
case <-child.trigger:
}
default:
// dunno, tell the user and try redispatching
child.sendError(err)
child.dying <- err
select {
case child.dying <- err:
case <-child.trigger:
}
}

child.broker.acks.Done()
Expand Down

0 comments on commit 4a2af77

Please sign in to comment.