Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer: fix another race pointed out by Maxim #494

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 31 additions & 30 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ type partitionConsumer struct {
feeder chan *FetchResponse

trigger, dying chan none
dispatchReason error
responseResult error

fetchSize int32
offset int64
Expand Down Expand Up @@ -402,24 +402,7 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {

func (child *partitionConsumer) responseFeeder() {
for response := range child.feeder {
switch err := child.handleResponse(response); err {
case nil:
break
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// so shut it down and force the user to choose what to do
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err)
child.sendError(err)
child.AsyncClose()
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.dispatchReason = err
default:
// dunno, tell the user and try redispatching
child.sendError(err)
child.dispatchReason = err
}

child.responseResult = child.handleResponse(response)
child.broker.acks.Done()
}

Expand Down Expand Up @@ -569,7 +552,10 @@ func (bc *brokerConsumer) subscriptionConsumer() {

// the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
for newSubscriptions := range bc.newSubscriptions {
bc.updateSubscriptionCache(newSubscriptions)
for _, child := range newSubscriptions {
bc.subscriptions[child] = none{}
Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
}

if len(bc.subscriptions) == 0 {
// We're about to be shut down or we're about to receive more subscriptions.
Expand All @@ -591,30 +577,45 @@ func (bc *brokerConsumer) subscriptionConsumer() {
child.feeder <- response
}
bc.acks.Wait()
bc.handleResponses()
}
}

func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionConsumer) {
// take new subscriptions, and abandon subscriptions that have been closed
for _, child := range newSubscriptions {
bc.subscriptions[child] = none{}
Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
}

func (bc *brokerConsumer) handleResponses() {
// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
for child := range bc.subscriptions {
select {
case <-child.dying:
Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
close(child.trigger)
delete(bc.subscriptions, child)
default:
if child.dispatchReason != nil {
switch child.responseResult {
case nil:
break
case ErrOffsetOutOfRange:
// there's no point in retrying this it will just fail the same way again
// shut it down and force the user to choose what to do
child.sendError(child.responseResult)
Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, child.responseResult)
close(child.trigger)
delete(bc.subscriptions, child)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// not an error, but does need redispatching
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, child.responseResult)
child.trigger <- none{}
delete(bc.subscriptions, child)
default:
// dunno, tell the user and try redispatching
child.sendError(child.responseResult)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
bc.broker.ID(), child.topic, child.partition, child.dispatchReason)
child.dispatchReason = nil
bc.broker.ID(), child.topic, child.partition, child.responseResult)
child.trigger <- none{}
delete(bc.subscriptions, child)
}

child.responseResult = nil
}
}
}
Expand Down