Skip to content

Commit

Permalink
Merge pull request #484 from Shopify/fix-consumer-deadlock
Browse files Browse the repository at this point in the history
Fix hypothetical consumer deadlock, attempt 2
  • Loading branch information
eapache committed Jul 27, 2015
2 parents 2b0d726 + 4a7c496 commit 527a3d3
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
feeder: make(chan *FetchResponse, 1),
trigger: make(chan none, 1),
dying: make(chan error, 1),
dying: make(chan none),
fetchSize: c.conf.Consumer.Fetch.Default,
}

Expand Down Expand Up @@ -269,8 +269,9 @@ type partitionConsumer struct {
messages chan *ConsumerMessage
errors chan *ConsumerError
feeder chan *FetchResponse
trigger chan none
dying chan error

trigger, dying chan none
dispatchReason error

fetchSize int32
offset int64
Expand Down Expand Up @@ -372,7 +373,7 @@ func (child *partitionConsumer) AsyncClose() {
// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
// also just close itself)
child.dying <- nil
close(child.dying)
}

func (child *partitionConsumer) Close() error {
Expand Down Expand Up @@ -412,11 +413,11 @@ func (child *partitionConsumer) responseFeeder() {
child.AsyncClose()
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.dying <- err
child.dispatchReason = err
default:
// dunno, tell the user and try redispatching
child.sendError(err)
child.dying <- err
child.dispatchReason = err
}

child.broker.acks.Done()
Expand Down Expand Up @@ -602,16 +603,18 @@ func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionC

for child := range bc.subscriptions {
select {
case err := <-child.dying:
if err == nil {
Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
close(child.trigger)
} else {
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", bc.broker.ID(), child.topic, child.partition, err)
child.trigger <- none{}
}
case <-child.dying:
Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
close(child.trigger)
delete(bc.subscriptions, child)
default:
if child.dispatchReason != nil {
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, child.dispatchReason)
child.dispatchReason = nil
child.trigger <- none{}
delete(bc.subscriptions, child)
}
}
}
}
Expand Down

0 comments on commit 527a3d3

Please sign in to comment.