Skip to content

Commit

Permalink
Fix Sarama client using a stale broker from seed brokers to do metada…
Browse files Browse the repository at this point in the history
…ta refresh IBM#2637

Seed brokers never change after client initialization. If the first seed broker became stale (still online, but moved to other Kafka cluster), Sarama client will use this stale broker to get the wrong metadata. To avoid using the stale broker to do metadata refresh, we will choose the least loaded broker in the cached broker list which is the similar the Java client implementation (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L671-L736)
  • Loading branch information
HaoSunUber committed Sep 7, 2023
1 parent 05cb9fa commit 227227d
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 137 deletions.
12 changes: 8 additions & 4 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,11 +1712,15 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) {
seedBroker1.BrokerID(), b.ID())
}

metadataResponse := NewMockMetadataResponse(t).
SetController(seedBroker2.BrokerID()).
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID())
seedBroker1.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker2.BrokerID()).
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
"MetadataRequest": metadataResponse,
})
seedBroker2.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": metadataResponse,
})

if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() {
Expand Down
70 changes: 40 additions & 30 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
// When: a broker connection gets reset by a broker (network glitch, restart, you name it).
leader.Close() // producer should get EOF
leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
leader.Returns(metadataResponse) // tell it to go to broker 2 again

// Then: a produced message goes through the new broker connection.
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
Expand Down Expand Up @@ -591,13 +591,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)

seedBroker.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)
leader2.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader2.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader1.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
Expand Down Expand Up @@ -653,13 +653,13 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)

leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)
leader2.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader2.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader1.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)
leader2.Returns(prodSuccess)

expectResults(t, producer, 1, 0)
Expand Down Expand Up @@ -733,22 +733,22 @@ func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) {
}

func TestAsyncProducerBrokerRestart(t *testing.T) {
// Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

var leaderLock sync.Mutex

// The seed broker only handles Metadata request
seedBroker.setHandler(func(req *request) (res encoderWithHeader) {
metadataRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
leaderLock.Lock()
defer leaderLock.Unlock()
metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
return metadataLeader
})
}

// The seed broker only handles Metadata request in bootstrap
seedBroker.setHandler(metadataRequestHandlerFunc)

var emptyValues int32 = 0

Expand All @@ -770,14 +770,27 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
}
}

leader.setHandler(func(req *request) (res encoderWithHeader) {
failedProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
countRecordsWithEmptyValue(req)

time.Sleep(50 * time.Millisecond)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
return prodSuccess
}

succeededProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
countRecordsWithEmptyValue(req)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
return prodSuccess
}

leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"ProduceRequest": failedProduceRequestHandlerFunc,
"MetadataRequest": metadataRequestHandlerFunc,
})

config := NewTestConfig()
Expand Down Expand Up @@ -816,12 +829,9 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
leaderLock.Lock()
leader = NewMockBroker(t, 2)
leaderLock.Unlock()
leader.setHandler(func(req *request) (res encoderWithHeader) {
countRecordsWithEmptyValue(req)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
return prodSuccess
leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"ProduceRequest": succeededProduceRequestHandlerFunc,
"MetadataRequest": metadataRequestHandlerFunc,
})

wg.Wait()
Expand Down Expand Up @@ -938,7 +948,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}

// tell partition 0 to go to that broker again
seedBroker.Returns(metadataResponse)
leader.Returns(metadataResponse)

// succeed this time
prodSuccess = new(ProduceResponse)
Expand Down Expand Up @@ -994,14 +1004,11 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {

time.Sleep(50 * time.Millisecond)

leader.SetHandlerByMap(map[string]MockResponse{
"ProduceRequest": NewMockProduceResponse(t).
SetVersion(0).
SetError("my_topic", 0, ErrNoError),
})

// tell partition 0 to go to that broker again
seedBroker.Returns(metadataResponse)
leader.Returns(metadataResponse)
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)

// succeed this time
expectResults(t, producer, 5, 0)
Expand All @@ -1010,6 +1017,9 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
}
prodSuccess = new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectResults(t, producer, 5, 0)

// shutdown
Expand Down Expand Up @@ -1051,7 +1061,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) {
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
leader.Returns(prodNotLeader)

seedBroker.Returns(metadataLeader)
leader.Returns(metadataLeader)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
Expand Down
58 changes: 21 additions & 37 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (client *client) Broker(brokerID int32) (*Broker, error) {
func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
// FIXME: this InitProducerID seems to only be called from client_test.go (TestInitProducerIDConnectionRefused) and has been superceded by transaction_manager.go?
brokerErrors := make([]error, 0)
for broker := client.anyBroker(); broker != nil; broker = client.anyBroker() {
for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
request := &InitProducerIDRequest{}

if client.conf.Version.IsAtLeast(V2_7_0_0) {
Expand Down Expand Up @@ -763,22 +763,21 @@ func (client *client) registerBroker(broker *Broker) {
}
}

// deregisterBroker removes a broker from the seedsBroker list, and if it's
// not the seedbroker, removes it from brokers map completely.
// deregisterBroker removes a broker from the broker list, and if it's
// not in the broker list, removes it from seedBrokers.
func (client *client) deregisterBroker(broker *Broker) {
client.lock.Lock()
defer client.lock.Unlock()

_, ok := client.brokers[broker.ID()]
if ok {
Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
delete(client.brokers, broker.ID())
return
}
if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] {
client.deadSeeds = append(client.deadSeeds, broker)
client.seedBrokers = client.seedBrokers[1:]
} else {
// we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
// but we really shouldn't have to; once that loop is made better this case can be
// removed, and the function generally can be renamed from `deregisterBroker` to
// `nextSeedBroker` or something
DebugLogger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
delete(client.brokers, broker.ID())
}
}

Expand All @@ -791,33 +790,12 @@ func (client *client) resurrectDeadBrokers() {
client.deadSeeds = nil
}

func (client *client) anyBroker() *Broker {
client.lock.RLock()
defer client.lock.RUnlock()

if len(client.seedBrokers) > 0 {
_ = client.seedBrokers[0].Open(client.conf)
return client.seedBrokers[0]
}

// not guaranteed to be random *or* deterministic
for _, broker := range client.brokers {
_ = broker.Open(client.conf)
return broker
}

return nil
}

// LeastLoadedBroker returns the broker with the least pending requests.
// Firstly, pick up the broker from cached broker list. If the broker list is empty, pick up the broker seed brokers.
func (client *client) LeastLoadedBroker() *Broker {
client.lock.RLock()
defer client.lock.RUnlock()

if len(client.seedBrokers) > 0 {
_ = client.seedBrokers[0].Open(client.conf)
return client.seedBrokers[0]
}

var leastLoadedBroker *Broker
pendingRequests := math.MaxInt
for _, broker := range client.brokers {
Expand All @@ -826,10 +804,16 @@ func (client *client) LeastLoadedBroker() *Broker {
leastLoadedBroker = broker
}
}

if leastLoadedBroker != nil {
_ = leastLoadedBroker.Open(client.conf)
return leastLoadedBroker
}

if len(client.seedBrokers) > 0 {
_ = client.seedBrokers[0].Open(client.conf)
return client.seedBrokers[0]
}

return leastLoadedBroker
}

Expand Down Expand Up @@ -1032,9 +1016,9 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
return err
}

broker := client.anyBroker()
broker := client.LeastLoadedBroker()
brokerErrors := make([]error, 0)
for ; broker != nil && !pastDeadline(0); broker = client.anyBroker() {
for ; broker != nil && !pastDeadline(0); broker = client.LeastLoadedBroker() {
allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
if len(topics) > 0 {
DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
Expand Down Expand Up @@ -1212,7 +1196,7 @@ func (client *client) findCoordinator(coordinatorKey string, coordinatorType Coo
}

brokerErrors := make([]error, 0)
for broker := client.anyBroker(); broker != nil; broker = client.anyBroker() {
for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
DebugLogger.Printf("client/coordinator requesting coordinator for %s from %s\n", coordinatorKey, broker.Addr())

request := new(FindCoordinatorRequest)
Expand Down
Loading

0 comments on commit 227227d

Please sign in to comment.