Skip to content

Commit

Permalink
Merge pull request #1645 from Stephan14/bugfix/remove_invalid_broker
Browse files Browse the repository at this point in the history
Remove broker(s) which no longer exist in metadata
  • Loading branch information
dnwe authored Apr 7, 2020
2 parents 7edc5c1 + 26bbb04 commit 2d3d6cd
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 7 deletions.
29 changes: 26 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,30 @@ func (client *client) RefreshCoordinator(consumerGroup string) error {

// private broker management helpers

func (client *client) updateBroker(brokers []*Broker) {
var currentBroker = make(map[int32]*Broker, len(brokers))

for _, broker := range brokers {
currentBroker[broker.ID()] = broker
if client.brokers[broker.ID()] == nil { // add new broker
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
} else if broker.Addr() != client.brokers[broker.ID()].Addr() { // replace broker with new address
safeAsyncClose(client.brokers[broker.ID()])
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
}
}

for id, broker := range client.brokers {
if _, exist := currentBroker[id]; !exist { // remove old broker
safeAsyncClose(broker)
delete(client.brokers, id)
Logger.Printf("client/broker remove invalid broker #%d with %s", broker.ID(), broker.Addr())
}
}
}

// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
// in the brokers map. It returns the broker that is registered, which may be the provided broker,
// or a previously registered Broker instance. You must hold the write lock before calling this function.
Expand Down Expand Up @@ -885,10 +909,9 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo
// For all the brokers we received:
// - if it is a new ID, save it
// - if it is an existing ID, but the address we have is stale, discard the old one and save it
// - if some brokers is not exist in it, remove old broker
// - otherwise ignore it, replacing our existing one would just bounce the connection
for _, broker := range data.Brokers {
client.registerBroker(broker)
}
client.updateBroker(data.Brokers)

client.controllerID = data.ControllerID

Expand Down
35 changes: 33 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ func TestClientReceivingPartialMetadata(t *testing.T) {
replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()}

metadataPartial := new(MetadataResponse)
metadataPartial.AddBroker(seedBroker.Addr(), 1)
metadataPartial.AddBroker(leader.Addr(), 5)
metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable)
metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, []int32{}, ErrNoError)
metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, []int32{}, ErrLeaderNotAvailable)
Expand Down Expand Up @@ -485,6 +487,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
seedBroker.Returns(metadataResponse1)

metadataResponse2 := new(MetadataResponse)
metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse2)

Expand Down Expand Up @@ -512,6 +515,36 @@ func TestClientRefreshBehaviour(t *testing.T) {
safeClose(t, client)
}

func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 5)

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
seedBroker.Returns(metadataResponse1)

client, err := NewClient([]string{seedBroker.Addr()}, nil)
if err != nil {
t.Fatal(err)
}

if len(client.Brokers()) != 2 {
t.Error("Meta broker is not 2")
}

metadataResponse2 := new(MetadataResponse)
metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID())
seedBroker.Returns(metadataResponse2)

if err := client.RefreshMetadata(); err != nil {
t.Error(err)
}
if len(client.Brokers()) != 1 {
t.Error("Meta broker is not 1")
}
}

func TestClientResurrectDeadSeeds(t *testing.T) {
initialSeed := NewMockBroker(t, 0)
emptyMetadata := new(MetadataResponse)
Expand Down Expand Up @@ -656,7 +689,6 @@ func TestClientMetadataTimeout(t *testing.T) {

// Start refreshing metadata in the background
errChan := make(chan error)
start := time.Now()
go func() {
errChan <- c.RefreshMetadata()
}()
Expand All @@ -666,7 +698,6 @@ func TestClientMetadataTimeout(t *testing.T) {
maxRefreshDuration := 2 * timeout
select {
case err := <-errChan:
t.Logf("Got err: %v after waiting for: %v", err, time.Since(start))
if err == nil {
t.Fatal("Expected failed RefreshMetadata, got nil")
}
Expand Down
11 changes: 9 additions & 2 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(leader0.Addr(), leader0.BrokerID()).
SetBroker(leader1.Addr(), leader1.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetLeader("my_topic", 0, leader0.BrokerID()).
SetLeader("my_topic", 1, leader1.BrokerID()),
})
Expand Down Expand Up @@ -720,7 +721,10 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetLeader("my_topic", 0, leader1.BrokerID()).
SetLeader("my_topic", 1, leader1.BrokerID()),
SetLeader("my_topic", 1, leader1.BrokerID()).
SetBroker(leader0.Addr(), leader0.BrokerID()).
SetBroker(leader1.Addr(), leader1.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
})

// leader0 says no longer leader of partition 0
Expand Down Expand Up @@ -759,7 +763,10 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetLeader("my_topic", 0, leader1.BrokerID()).
SetLeader("my_topic", 1, leader0.BrokerID()),
SetLeader("my_topic", 1, leader0.BrokerID()).
SetBroker(leader0.Addr(), leader0.BrokerID()).
SetBroker(leader1.Addr(), leader1.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
})

// leader1 provides three more messages on partition0, says no longer leader of partition1
Expand Down

0 comments on commit 2d3d6cd

Please sign in to comment.