From 1aac8e5424614233887c5600e8ab4258c89c6b80 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 6 May 2021 00:44:27 +0100 Subject: [PATCH] fix(consumer): follow preferred broker Historically (before protocol version 11) if we attempted to consume from a follower, we would get a NotLeaderForPartition response and move our consumer to the new leader. However, since v11 the Kafka broker treats us just like any other follower and permits us to consume from any replica and it is up to us to monitor metadata to determine when the leadership has changed. Modifying the handleResponse func to check the topic partition leadership against the current broker (in the absence of a preferredReadReplica) and trigger a re-create of the consumer for that partition Contributes-to: #1927 --- consumer.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/consumer.go b/consumer.go index 9bd8d1820..61c9bc57a 100644 --- a/consumer.go +++ b/consumer.go @@ -839,10 +839,12 @@ func (bc *brokerConsumer) handleResponses() { child.responseResult = nil if result == nil { - if child.preferredReadReplica >= 0 && bc.broker.ID() != child.preferredReadReplica { - // not an error but needs redispatching to consume from prefered replica - child.trigger <- none{} - delete(bc.subscriptions, child) + if preferredBroker, err := child.preferredBroker(); err == nil { + if bc.broker.ID() != preferredBroker.ID() { + // not an error but needs redispatching to consume from prefered replica + child.trigger <- none{} + delete(bc.subscriptions, child) + } } continue }