Skip to content

Commit

Permalink
fix(consumer): follow preferred broker
Browse files Browse the repository at this point in the history
Historically (before protocol version 11) if we attempted to consume
from a follower, we would get a NotLeaderForPartition response and move
our consumer to the new leader. However, since v11 the Kafka broker
treats us just like any other follower and permits us to consume from
any replica and it is up to us to monitor metadata to determine when the
leadership has changed.

Modifying the handleResponse func to check the topic partition
leadership against the current broker (in the absence of a
preferredReadReplica) and trigger a re-create of the consumer for that
partition

Contributes-to: #1927
  • Loading branch information
dnwe committed May 5, 2021
1 parent 8dbbfb5 commit 1aac8e5
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,10 +839,12 @@ func (bc *brokerConsumer) handleResponses() {
child.responseResult = nil

if result == nil {
if child.preferredReadReplica >= 0 && bc.broker.ID() != child.preferredReadReplica {
// not an error but needs redispatching to consume from prefered replica
child.trigger <- none{}
delete(bc.subscriptions, child)
if preferredBroker, err := child.preferredBroker(); err == nil {
if bc.broker.ID() != preferredBroker.ID() {
// not an error but needs redispatching to consume from prefered replica
child.trigger <- none{}
delete(bc.subscriptions, child)
}
}
continue
}
Expand Down

1 comment on commit 1aac8e5

@lizthegrey
Copy link
Contributor

@lizthegrey lizthegrey commented on 1aac8e5 Nov 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

introduces regression, see #2071

Please sign in to comment.