Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use least loaded broker to refresh metadata #2645

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,11 +1712,15 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) {
seedBroker1.BrokerID(), b.ID())
}

metadataResponse := NewMockMetadataResponse(t).
SetController(seedBroker2.BrokerID()).
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID())
seedBroker1.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker2.BrokerID()).
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
"MetadataRequest": metadataResponse,
})
seedBroker2.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": metadataResponse,
})

if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() {
Expand Down
69 changes: 40 additions & 29 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
// When: a broker connection gets reset by a broker (network glitch, restart, you name it).
leader.Close() // producer should get EOF
leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
leader.Returns(metadataResponse) // tell it to go to broker 2 again

// Then: a produced message goes through the new broker connection.
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
Expand Down Expand Up @@ -591,13 +591,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)

seedBroker.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)
leader2.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader2.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader1.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
Expand Down Expand Up @@ -653,13 +653,13 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)

leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)
leader2.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader2.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader1.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader2)
leader1.Returns(metadataLeader2)
leader2.Returns(prodSuccess)

expectResults(t, producer, 1, 0)
Expand Down Expand Up @@ -739,16 +739,17 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
leader := NewMockBroker(t, 2)

var leaderLock sync.Mutex

// The seed broker only handles Metadata request
seedBroker.setHandler(func(req *request) (res encoderWithHeader) {
metadataRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
leaderLock.Lock()
defer leaderLock.Unlock()
metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
return metadataLeader
})
}

// The seed broker only handles Metadata request in bootstrap
seedBroker.setHandler(metadataRequestHandlerFunc)

var emptyValues int32 = 0

Expand All @@ -770,14 +771,27 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
}
}

leader.setHandler(func(req *request) (res encoderWithHeader) {
failedProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
countRecordsWithEmptyValue(req)

time.Sleep(50 * time.Millisecond)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
return prodSuccess
}

succeededProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
countRecordsWithEmptyValue(req)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
return prodSuccess
}

leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"ProduceRequest": failedProduceRequestHandlerFunc,
"MetadataRequest": metadataRequestHandlerFunc,
})

config := NewTestConfig()
Expand Down Expand Up @@ -816,12 +830,9 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
leaderLock.Lock()
leader = NewMockBroker(t, 2)
leaderLock.Unlock()
leader.setHandler(func(req *request) (res encoderWithHeader) {
countRecordsWithEmptyValue(req)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
return prodSuccess
leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{
"ProduceRequest": succeededProduceRequestHandlerFunc,
"MetadataRequest": metadataRequestHandlerFunc,
})

wg.Wait()
Expand Down Expand Up @@ -938,7 +949,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}

// tell partition 0 to go to that broker again
seedBroker.Returns(metadataResponse)
leader.Returns(metadataResponse)

// succeed this time
prodSuccess = new(ProduceResponse)
Expand Down Expand Up @@ -994,14 +1005,11 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {

time.Sleep(50 * time.Millisecond)

leader.SetHandlerByMap(map[string]MockResponse{
"ProduceRequest": NewMockProduceResponse(t).
SetVersion(0).
SetError("my_topic", 0, ErrNoError),
})

// tell partition 0 to go to that broker again
seedBroker.Returns(metadataResponse)
leader.Returns(metadataResponse)
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)

// succeed this time
expectResults(t, producer, 5, 0)
Expand All @@ -1010,6 +1018,9 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
for i := 0; i < 5; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
}
prodSuccess = new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectResults(t, producer, 5, 0)

// shutdown
Expand Down Expand Up @@ -1051,7 +1062,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) {
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
leader.Returns(prodNotLeader)

seedBroker.Returns(metadataLeader)
leader.Returns(metadataLeader)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
Expand Down
58 changes: 21 additions & 37 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (client *client) Broker(brokerID int32) (*Broker, error) {
func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
// FIXME: this InitProducerID seems to only be called from client_test.go (TestInitProducerIDConnectionRefused) and has been superceded by transaction_manager.go?
brokerErrors := make([]error, 0)
for broker := client.anyBroker(); broker != nil; broker = client.anyBroker() {
for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
request := &InitProducerIDRequest{}

if client.conf.Version.IsAtLeast(V2_7_0_0) {
Expand Down Expand Up @@ -763,22 +763,21 @@ func (client *client) registerBroker(broker *Broker) {
}
}

// deregisterBroker removes a broker from the seedsBroker list, and if it's
// not the seedbroker, removes it from brokers map completely.
// deregisterBroker removes a broker from the broker list, and if it's
// not in the broker list, removes it from seedBrokers.
func (client *client) deregisterBroker(broker *Broker) {
client.lock.Lock()
defer client.lock.Unlock()

_, ok := client.brokers[broker.ID()]
if ok {
Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
delete(client.brokers, broker.ID())
return
}
if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] {
client.deadSeeds = append(client.deadSeeds, broker)
client.seedBrokers = client.seedBrokers[1:]
Comment on lines +766 to 780
Copy link
Collaborator

@dnwe dnwe Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we only seem to deregister the broker from the seedBrokers list if it's the first element in the list (after the most recent shuffle) — is that still the desired behaviour?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it is expected. seed brokers only shuffle during client initialization or hard RefreshBrokers. After that, the cached broker list is empty, and the client will use the first seed broker to fetch metadata. deregisterBroker method deregisters the first seed broker will apply that moment when this first seed broker is unavailable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know when we have a new release including this change so that our side can use it? @dnwe

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {
// we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
// but we really shouldn't have to; once that loop is made better this case can be
// removed, and the function generally can be renamed from `deregisterBroker` to
// `nextSeedBroker` or something
DebugLogger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
delete(client.brokers, broker.ID())
}
}

Expand All @@ -791,33 +790,12 @@ func (client *client) resurrectDeadBrokers() {
client.deadSeeds = nil
}

func (client *client) anyBroker() *Broker {
client.lock.RLock()
defer client.lock.RUnlock()

if len(client.seedBrokers) > 0 {
_ = client.seedBrokers[0].Open(client.conf)
return client.seedBrokers[0]
}

// not guaranteed to be random *or* deterministic
for _, broker := range client.brokers {
_ = broker.Open(client.conf)
return broker
}

return nil
}

// LeastLoadedBroker returns the broker with the least pending requests.
// Firstly, choose the broker from cached broker list. If the broker list is empty, choose from seed brokers.
func (client *client) LeastLoadedBroker() *Broker {
client.lock.RLock()
defer client.lock.RUnlock()

if len(client.seedBrokers) > 0 {
_ = client.seedBrokers[0].Open(client.conf)
return client.seedBrokers[0]
}

var leastLoadedBroker *Broker
pendingRequests := math.MaxInt
for _, broker := range client.brokers {
Expand All @@ -826,10 +804,16 @@ func (client *client) LeastLoadedBroker() *Broker {
leastLoadedBroker = broker
}
}

if leastLoadedBroker != nil {
_ = leastLoadedBroker.Open(client.conf)
return leastLoadedBroker
}

if len(client.seedBrokers) > 0 {
_ = client.seedBrokers[0].Open(client.conf)
return client.seedBrokers[0]
}

return leastLoadedBroker
}

Expand Down Expand Up @@ -1032,9 +1016,9 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
return err
}

broker := client.anyBroker()
broker := client.LeastLoadedBroker()
brokerErrors := make([]error, 0)
for ; broker != nil && !pastDeadline(0); broker = client.anyBroker() {
for ; broker != nil && !pastDeadline(0); broker = client.LeastLoadedBroker() {
allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
if len(topics) > 0 {
DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
Expand Down Expand Up @@ -1212,7 +1196,7 @@ func (client *client) findCoordinator(coordinatorKey string, coordinatorType Coo
}

brokerErrors := make([]error, 0)
for broker := client.anyBroker(); broker != nil; broker = client.anyBroker() {
for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
DebugLogger.Printf("client/coordinator requesting coordinator for %s from %s\n", coordinatorKey, broker.Addr())

request := new(FindCoordinatorRequest)
Expand Down
Loading