From cdd80eb39afe88abfeed3f0b5971d31e176697b6 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Mon, 31 Aug 2015 14:45:57 -0400 Subject: [PATCH] Fix consumer race panic on close I discovered a "send on closed channel" panic in the consumer while testing #527 which I was finally able to track down. If a partition takes a long time to drain to the user, then the responseFeeder reclaims its ownership token from the broker so that the broker doesn't block its other partitions. However, if the user closes the PartitionConsumer (closing the dying channel) then the brokerConsumer will unconditionally return the ownership token to the dispatcher even if the responseFeeder is holding it. This results in two ownership tokens for the same partition (one in the feeder, one in the dispatcher) which leads to all sorts of subtle brokeness. It manifested in at least two different "send on closed channel" backtraces depending on the exact timing, and possibly more. To fix, move the check on `child.dying` to the top of the `subscriptionConsumer` loop where we are guaranteed to have the ownership token. Combine that check with the 'new subcriptions' check into an `updateSubscriptions` helper method. The diff is huge because this lets us drop an indentation level in `handleResponses`, I suggest reviewing with `w=1` to ignore whitespace. --- consumer.go | 82 +++++++++++++++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 37 deletions(-) diff --git a/consumer.go b/consumer.go index 436706846..53ba21d15 100644 --- a/consumer.go +++ b/consumer.go @@ -572,10 +572,7 @@ func (bc *brokerConsumer) subscriptionConsumer() { // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available for newSubscriptions := range bc.newSubscriptions { - for _, child := range newSubscriptions { - bc.subscriptions[child] = none{} - Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) - } + bc.updateSubscriptions(newSubscriptions) if len(bc.subscriptions) == 0 { // We're about to be shut down or we're about to receive more subscriptions. @@ -601,8 +598,12 @@ func (bc *brokerConsumer) subscriptionConsumer() { } } -func (bc *brokerConsumer) handleResponses() { - // handles the response codes left for us by our subscriptions, and abandons ones that have been closed +func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) { + for _, child := range newSubscriptions { + bc.subscriptions[child] = none{} + Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) + } + for child := range bc.subscriptions { select { case <-child.dying: @@ -610,37 +611,44 @@ func (bc *brokerConsumer) handleResponses() { close(child.trigger) delete(bc.subscriptions, child) default: - result := child.responseResult - child.responseResult = nil - - switch result { - case nil: - break - case errTimedOut: - Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", - bc.broker.ID(), child.topic, child.partition) - delete(bc.subscriptions, child) - case ErrOffsetOutOfRange: - // there's no point in retrying this it will just fail the same way again - // shut it down and force the user to choose what to do - child.sendError(result) - Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result) - close(child.trigger) - delete(bc.subscriptions, child) - case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: - // not an error, but does need redispatching - Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", - bc.broker.ID(), child.topic, child.partition, result) - child.trigger <- none{} - delete(bc.subscriptions, child) - default: - // dunno, tell the user and try redispatching - child.sendError(result) - Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", - bc.broker.ID(), child.topic, child.partition, result) - child.trigger <- none{} - delete(bc.subscriptions, child) - } + break + } + } +} + +func (bc *brokerConsumer) handleResponses() { + // handles the response codes left for us by our subscriptions, and abandons ones that have been closed + for child := range bc.subscriptions { + result := child.responseResult + child.responseResult = nil + + switch result { + case nil: + break + case errTimedOut: + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", + bc.broker.ID(), child.topic, child.partition) + delete(bc.subscriptions, child) + case ErrOffsetOutOfRange: + // there's no point in retrying this it will just fail the same way again + // shut it down and force the user to choose what to do + child.sendError(result) + Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result) + close(child.trigger) + delete(bc.subscriptions, child) + case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: + // not an error, but does need redispatching + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", + bc.broker.ID(), child.topic, child.partition, result) + child.trigger <- none{} + delete(bc.subscriptions, child) + default: + // dunno, tell the user and try redispatching + child.sendError(result) + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", + bc.broker.ID(), child.topic, child.partition, result) + child.trigger <- none{} + delete(bc.subscriptions, child) } } }