Skip to content

Commit

Permalink
Merge pull request #693 from Shopify/consumer/692-tight-loop
Browse files Browse the repository at this point in the history
Consumer: fix possible tight loop
  • Loading branch information
eapache authored Jul 5, 2016
2 parents 1a3d124 + d41f123 commit 15fe77a
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func (bc *brokerConsumer) subscriptionManager() {
var buffer []*partitionConsumer

// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
// so the main goroutine can block waiting for work if it has none.
Expand Down Expand Up @@ -671,8 +671,12 @@ func (bc *brokerConsumer) abort(err error) {
child.trigger <- none{}
}

for newSubscription := range bc.newSubscriptions {
for _, child := range newSubscription {
for newSubscriptions := range bc.newSubscriptions {
if len(newSubscriptions) == 0 {
<-bc.wait
continue
}
for _, child := range newSubscriptions {
child.sendError(err)
child.trigger <- none{}
}
Expand Down

0 comments on commit 15fe77a

Please sign in to comment.