diff --git a/consumer.go b/consumer.go index 436706846..53ba21d15 100644 --- a/consumer.go +++ b/consumer.go @@ -572,10 +572,7 @@ func (bc *brokerConsumer) subscriptionConsumer() { // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available for newSubscriptions := range bc.newSubscriptions { - for _, child := range newSubscriptions { - bc.subscriptions[child] = none{} - Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) - } + bc.updateSubscriptions(newSubscriptions) if len(bc.subscriptions) == 0 { // We're about to be shut down or we're about to receive more subscriptions. @@ -601,8 +598,12 @@ func (bc *brokerConsumer) subscriptionConsumer() { } } -func (bc *brokerConsumer) handleResponses() { - // handles the response codes left for us by our subscriptions, and abandons ones that have been closed +func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) { + for _, child := range newSubscriptions { + bc.subscriptions[child] = none{} + Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) + } + for child := range bc.subscriptions { select { case <-child.dying: @@ -610,37 +611,44 @@ func (bc *brokerConsumer) handleResponses() { close(child.trigger) delete(bc.subscriptions, child) default: - result := child.responseResult - child.responseResult = nil - - switch result { - case nil: - break - case errTimedOut: - Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", - bc.broker.ID(), child.topic, child.partition) - delete(bc.subscriptions, child) - case ErrOffsetOutOfRange: - // there's no point in retrying this it will just fail the same way again - // shut it down and force the user to choose what to do - child.sendError(result) - Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result) - close(child.trigger) - delete(bc.subscriptions, child) - case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: - // not an error, but does need redispatching - Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", - bc.broker.ID(), child.topic, child.partition, result) - child.trigger <- none{} - delete(bc.subscriptions, child) - default: - // dunno, tell the user and try redispatching - child.sendError(result) - Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", - bc.broker.ID(), child.topic, child.partition, result) - child.trigger <- none{} - delete(bc.subscriptions, child) - } + break + } + } +} + +func (bc *brokerConsumer) handleResponses() { + // handles the response codes left for us by our subscriptions, and abandons ones that have been closed + for child := range bc.subscriptions { + result := child.responseResult + child.responseResult = nil + + switch result { + case nil: + break + case errTimedOut: + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", + bc.broker.ID(), child.topic, child.partition) + delete(bc.subscriptions, child) + case ErrOffsetOutOfRange: + // there's no point in retrying this it will just fail the same way again + // shut it down and force the user to choose what to do + child.sendError(result) + Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result) + close(child.trigger) + delete(bc.subscriptions, child) + case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: + // not an error, but does need redispatching + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", + bc.broker.ID(), child.topic, child.partition, result) + child.trigger <- none{} + delete(bc.subscriptions, child) + default: + // dunno, tell the user and try redispatching + child.sendError(result) + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", + bc.broker.ID(), child.topic, child.partition, result) + child.trigger <- none{} + delete(bc.subscriptions, child) } } }