Skip to content

Commit

Permalink
Fix consumer race panic on close
Browse files Browse the repository at this point in the history
I discovered a "send on closed channel" panic in the consumer while testing #527
which I was finally able to track down. If a partition takes a long time to
drain to the user, then the responseFeeder reclaims its ownership token from
the broker so that the broker doesn't block its other partitions. However, if
the user closes the PartitionConsumer (closing the dying channel) then the
brokerConsumer will unconditionally return the ownership token to the dispatcher
even if the responseFeeder is holding it. This results in two ownership tokens
for the same partition (one in the feeder, one in the dispatcher) which leads to
all sorts of subtle brokeness. It manifested in at least two different "send on
closed channel" backtraces depending on the exact timing, and possibly more.

To fix, move the check on `child.dying` to the top of the `subscriptionConsumer`
loop where we are guaranteed to have the ownership token. Combine that check
with the 'new subcriptions' check into an `updateSubscriptions` helper method.
The diff is huge because this lets us drop an indentation level in
`handleResponses`, I suggest reviewing with `w=1` to ignore whitespace.
  • Loading branch information
eapache committed Aug 31, 2015
1 parent 9822cf0 commit cdd80eb
Showing 1 changed file with 45 additions and 37 deletions.
82 changes: 45 additions & 37 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,10 +572,7 @@ func (bc *brokerConsumer) subscriptionConsumer() {

// the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
for newSubscriptions := range bc.newSubscriptions {
for _, child := range newSubscriptions {
bc.subscriptions[child] = none{}
Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
}
bc.updateSubscriptions(newSubscriptions)

if len(bc.subscriptions) == 0 {
// We're about to be shut down or we're about to receive more subscriptions.
Expand All @@ -601,46 +598,57 @@ func (bc *brokerConsumer) subscriptionConsumer() {
}
}

func (bc *brokerConsumer) handleResponses() {
// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
for _, child := range newSubscriptions {
bc.subscriptions[child] = none{}
Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
}

for child := range bc.subscriptions {
select {
case <-child.dying:
Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
close(child.trigger)
delete(bc.subscriptions, child)
default:
result := child.responseResult
child.responseResult = nil

switch result {
case nil:
break
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
delete(bc.subscriptions, child)
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// shut it down and force the user to choose what to do
child.sendError(result)
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
close(child.trigger)
delete(bc.subscriptions, child)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// not an error, but does need redispatching
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
child.trigger <- none{}
delete(bc.subscriptions, child)
default:
// dunno, tell the user and try redispatching
child.sendError(result)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
child.trigger <- none{}
delete(bc.subscriptions, child)
}
break
}
}
}

func (bc *brokerConsumer) handleResponses() {
// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
for child := range bc.subscriptions {
result := child.responseResult
child.responseResult = nil

switch result {
case nil:
break
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
delete(bc.subscriptions, child)
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// shut it down and force the user to choose what to do
child.sendError(result)
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
close(child.trigger)
delete(bc.subscriptions, child)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// not an error, but does need redispatching
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
child.trigger <- none{}
delete(bc.subscriptions, child)
default:
// dunno, tell the user and try redispatching
child.sendError(result)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, result)
child.trigger <- none{}
delete(bc.subscriptions, child)
}
}
}
Expand Down

0 comments on commit cdd80eb

Please sign in to comment.