diff --git a/consumer.go b/consumer.go index b04761ca6..46491fe2c 100644 --- a/consumer.go +++ b/consumer.go @@ -409,14 +409,23 @@ func (child *partitionConsumer) responseFeeder() { // so shut it down and force the user to choose what to do Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err) child.sendError(err) - child.AsyncClose() + select { + case child.dying <- nil: + case <-child.trigger: + } case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: // these three are not fatal errors, but do require redispatching - child.dying <- err + select { + case child.dying <- err: + case <-child.trigger: + } default: // dunno, tell the user and try redispatching child.sendError(err) - child.dying <- err + select { + case child.dying <- err: + case <-child.trigger: + } } child.broker.acks.Done()