From 4a2af77044c934870b0733910cd441f3efa5364d Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 15 Jul 2015 09:34:22 -0400 Subject: [PATCH] Fix hypothetical consumer deadlock In the case when the user calls `AsyncClose` at the same instant as the broker leading the partition returns an error. Fixes bug #475. --- consumer.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/consumer.go b/consumer.go index b04761ca6..46491fe2c 100644 --- a/consumer.go +++ b/consumer.go @@ -409,14 +409,23 @@ func (child *partitionConsumer) responseFeeder() { // so shut it down and force the user to choose what to do Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err) child.sendError(err) - child.AsyncClose() + select { + case child.dying <- nil: + case <-child.trigger: + } case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: // these three are not fatal errors, but do require redispatching - child.dying <- err + select { + case child.dying <- err: + case <-child.trigger: + } default: // dunno, tell the user and try redispatching child.sendError(err) - child.dying <- err + select { + case child.dying <- err: + case <-child.trigger: + } } child.broker.acks.Done()