diff --git a/admin_test.go b/admin_test.go index 3387a7270b..2b70aa9bbc 100644 --- a/admin_test.go +++ b/admin_test.go @@ -1712,11 +1712,15 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) { seedBroker1.BrokerID(), b.ID()) } + metadataResponse := NewMockMetadataResponse(t). + SetController(seedBroker2.BrokerID()). + SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()). + SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()) seedBroker1.SetHandlerByMap(map[string]MockResponse{ - "MetadataRequest": NewMockMetadataResponse(t). - SetController(seedBroker2.BrokerID()). - SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()). - SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()), + "MetadataRequest": metadataResponse, + }) + seedBroker2.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": metadataResponse, }) if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() { diff --git a/async_producer_test.go b/async_producer_test.go index b3eebe099d..c192235cc0 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -505,7 +505,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) { // When: a broker connection gets reset by a broker (network glitch, restart, you name it). leader.Close() // producer should get EOF leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles - seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again + leader.Returns(metadataResponse) // tell it to go to broker 2 again // Then: a produced message goes through the new broker connection. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} @@ -591,13 +591,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) { metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) - seedBroker.Returns(metadataLeader2) + leader1.Returns(metadataLeader2) leader2.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader1) + leader2.Returns(metadataLeader1) leader1.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader1) + leader1.Returns(metadataLeader1) leader1.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader2) + leader1.Returns(metadataLeader2) prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) @@ -653,13 +653,13 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) leader1.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader2) + leader1.Returns(metadataLeader2) leader2.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader1) + leader2.Returns(metadataLeader1) leader1.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader1) + leader1.Returns(metadataLeader1) leader1.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader2) + leader1.Returns(metadataLeader2) leader2.Returns(prodSuccess) expectResults(t, producer, 1, 0) @@ -739,16 +739,17 @@ func TestAsyncProducerBrokerRestart(t *testing.T) { leader := NewMockBroker(t, 2) var leaderLock sync.Mutex - - // The seed broker only handles Metadata request - seedBroker.setHandler(func(req *request) (res encoderWithHeader) { + metadataRequestHandlerFunc := func(req *request) (res encoderWithHeader) { leaderLock.Lock() defer leaderLock.Unlock() metadataLeader := new(MetadataResponse) metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) return metadataLeader - }) + } + + // The seed broker only handles Metadata request in bootstrap + seedBroker.setHandler(metadataRequestHandlerFunc) var emptyValues int32 = 0 @@ -770,7 +771,7 @@ func TestAsyncProducerBrokerRestart(t *testing.T) { } } - leader.setHandler(func(req *request) (res encoderWithHeader) { + failedProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) { countRecordsWithEmptyValue(req) time.Sleep(50 * time.Millisecond) @@ -778,6 +779,19 @@ func TestAsyncProducerBrokerRestart(t *testing.T) { prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) return prodSuccess + } + + succeededProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) { + countRecordsWithEmptyValue(req) + + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + return prodSuccess + } + + leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{ + "ProduceRequest": failedProduceRequestHandlerFunc, + "MetadataRequest": metadataRequestHandlerFunc, }) config := NewTestConfig() @@ -816,12 +830,9 @@ func TestAsyncProducerBrokerRestart(t *testing.T) { leaderLock.Lock() leader = NewMockBroker(t, 2) leaderLock.Unlock() - leader.setHandler(func(req *request) (res encoderWithHeader) { - countRecordsWithEmptyValue(req) - - prodSuccess := new(ProduceResponse) - prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) - return prodSuccess + leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{ + "ProduceRequest": succeededProduceRequestHandlerFunc, + "MetadataRequest": metadataRequestHandlerFunc, }) wg.Wait() @@ -938,7 +949,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} // tell partition 0 to go to that broker again - seedBroker.Returns(metadataResponse) + leader.Returns(metadataResponse) // succeed this time prodSuccess = new(ProduceResponse) @@ -994,14 +1005,11 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { time.Sleep(50 * time.Millisecond) - leader.SetHandlerByMap(map[string]MockResponse{ - "ProduceRequest": NewMockProduceResponse(t). - SetVersion(0). - SetError("my_topic", 0, ErrNoError), - }) - // tell partition 0 to go to that broker again - seedBroker.Returns(metadataResponse) + leader.Returns(metadataResponse) + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) // succeed this time expectResults(t, producer, 5, 0) @@ -1010,6 +1018,9 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) { for i := 0; i < 5; i++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0} } + prodSuccess = new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) expectResults(t, producer, 5, 0) // shutdown @@ -1051,7 +1062,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) { prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) leader.Returns(prodNotLeader) - seedBroker.Returns(metadataLeader) + leader.Returns(metadataLeader) prodSuccess := new(ProduceResponse) prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) diff --git a/client.go b/client.go index bbba585678..5e665eaeff 100644 --- a/client.go +++ b/client.go @@ -260,7 +260,7 @@ func (client *client) Broker(brokerID int32) (*Broker, error) { func (client *client) InitProducerID() (*InitProducerIDResponse, error) { // FIXME: this InitProducerID seems to only be called from client_test.go (TestInitProducerIDConnectionRefused) and has been superceded by transaction_manager.go? brokerErrors := make([]error, 0) - for broker := client.anyBroker(); broker != nil; broker = client.anyBroker() { + for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() { request := &InitProducerIDRequest{} if client.conf.Version.IsAtLeast(V2_7_0_0) { @@ -763,22 +763,21 @@ func (client *client) registerBroker(broker *Broker) { } } -// deregisterBroker removes a broker from the seedsBroker list, and if it's -// not the seedbroker, removes it from brokers map completely. +// deregisterBroker removes a broker from the broker list, and if it's +// not in the broker list, removes it from seedBrokers. func (client *client) deregisterBroker(broker *Broker) { client.lock.Lock() defer client.lock.Unlock() + _, ok := client.brokers[broker.ID()] + if ok { + Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr()) + delete(client.brokers, broker.ID()) + return + } if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] { client.deadSeeds = append(client.deadSeeds, broker) client.seedBrokers = client.seedBrokers[1:] - } else { - // we do this so that our loop in `tryRefreshMetadata` doesn't go on forever, - // but we really shouldn't have to; once that loop is made better this case can be - // removed, and the function generally can be renamed from `deregisterBroker` to - // `nextSeedBroker` or something - DebugLogger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr()) - delete(client.brokers, broker.ID()) } } @@ -791,33 +790,12 @@ func (client *client) resurrectDeadBrokers() { client.deadSeeds = nil } -func (client *client) anyBroker() *Broker { - client.lock.RLock() - defer client.lock.RUnlock() - - if len(client.seedBrokers) > 0 { - _ = client.seedBrokers[0].Open(client.conf) - return client.seedBrokers[0] - } - - // not guaranteed to be random *or* deterministic - for _, broker := range client.brokers { - _ = broker.Open(client.conf) - return broker - } - - return nil -} - +// LeastLoadedBroker returns the broker with the least pending requests. +// Firstly, choose the broker from cached broker list. If the broker list is empty, choose from seed brokers. func (client *client) LeastLoadedBroker() *Broker { client.lock.RLock() defer client.lock.RUnlock() - if len(client.seedBrokers) > 0 { - _ = client.seedBrokers[0].Open(client.conf) - return client.seedBrokers[0] - } - var leastLoadedBroker *Broker pendingRequests := math.MaxInt for _, broker := range client.brokers { @@ -826,10 +804,16 @@ func (client *client) LeastLoadedBroker() *Broker { leastLoadedBroker = broker } } - if leastLoadedBroker != nil { _ = leastLoadedBroker.Open(client.conf) + return leastLoadedBroker } + + if len(client.seedBrokers) > 0 { + _ = client.seedBrokers[0].Open(client.conf) + return client.seedBrokers[0] + } + return leastLoadedBroker } @@ -1032,9 +1016,9 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, return err } - broker := client.anyBroker() + broker := client.LeastLoadedBroker() brokerErrors := make([]error, 0) - for ; broker != nil && !pastDeadline(0); broker = client.anyBroker() { + for ; broker != nil && !pastDeadline(0); broker = client.LeastLoadedBroker() { allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation if len(topics) > 0 { DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr) @@ -1212,7 +1196,7 @@ func (client *client) findCoordinator(coordinatorKey string, coordinatorType Coo } brokerErrors := make([]error, 0) - for broker := client.anyBroker(); broker != nil; broker = client.anyBroker() { + for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() { DebugLogger.Printf("client/coordinator requesting coordinator for %s from %s\n", coordinatorKey, broker.Addr()) request := new(FindCoordinatorRequest) diff --git a/client_test.go b/client_test.go index c9d5b56ada..78243bce05 100644 --- a/client_test.go +++ b/client_test.go @@ -334,11 +334,11 @@ func TestClientGetOffset(t *testing.T) { } leader.Close() - seedBroker.Returns(metadata) leader = NewMockBrokerAddr(t, 2, leaderAddr) offsetResponse = new(OffsetResponse) offsetResponse.AddTopicPartition("foo", 0, 456) + leader.Returns(metadata) leader.Returns(offsetResponse) offset, err = client.GetOffset("foo", 0, OffsetNewest) @@ -445,12 +445,11 @@ func TestClientReceivingPartialMetadata(t *testing.T) { replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()} metadataPartial := new(MetadataResponse) - metadataPartial.AddBroker(seedBroker.Addr(), 1) metadataPartial.AddBroker(leader.Addr(), 5) metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable) metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, []int32{}, ErrNoError) metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, []int32{}, ErrLeaderNotAvailable) - seedBroker.Returns(metadataPartial) + leader.Returns(metadataPartial) if err := client.RefreshMetadata("new_topic"); err != nil { t.Error("ErrLeaderNotAvailable should not make RefreshMetadata respond with an error") @@ -469,7 +468,7 @@ func TestClientReceivingPartialMetadata(t *testing.T) { // If we are asking for the leader of a partition that didn't have a leader before, // we will do another metadata request. - seedBroker.Returns(metadataPartial) + leader.Returns(metadataPartial) // Still no leader for the partition, so asking for it should return an error. _, err = client.Leader("new_topic", 1) @@ -493,7 +492,7 @@ func TestClientRefreshBehaviour(t *testing.T) { metadataResponse2 := new(MetadataResponse) metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError) - seedBroker.Returns(metadataResponse2) + leader.Returns(metadataResponse2) client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) if err != nil { @@ -573,9 +572,13 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) { t.Error("Meta broker is not 2") } - metadataResponse2 := new(MetadataResponse) - metadataResponse2.AddBroker(leader.Addr(), leader.BrokerID()) - seedBroker.Returns(metadataResponse2) + metadataResponse2 := NewMockMetadataResponse(t).SetBroker(leader.Addr(), leader.BrokerID()) + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": metadataResponse2, + }) + leader.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": metadataResponse2, + }) if err := client.RefreshMetadata(); err != nil { t.Error(err) @@ -611,9 +614,13 @@ func TestClientGetBroker(t *testing.T) { t.Errorf("Expected broker to have address %s, found %s", leader.Addr(), broker.Addr()) } - metadataResponse2 := new(MetadataResponse) - metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID()) - seedBroker.Returns(metadataResponse2) + metadataResponse2 := NewMockMetadataResponse(t).SetBroker(seedBroker.Addr(), seedBroker.BrokerID()) + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": metadataResponse2, + }) + leader.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": metadataResponse2, + }) if err := client.RefreshMetadata(); err != nil { t.Error(err) @@ -856,13 +863,11 @@ func TestClientUpdateMetadataErrorAndRetry(t *testing.T) { func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { seedBroker := NewMockBroker(t, 1) - staleCoordinator := NewMockBroker(t, 2) - freshCoordinator := NewMockBroker(t, 3) + coordinator := NewMockBroker(t, 2) - replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()} + replicas := []int32{coordinator.BrokerID()} metadataResponse1 := new(MetadataResponse) - metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID()) - metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID()) + metadataResponse1.AddBroker(coordinator.Addr(), coordinator.BrokerID()) metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) seedBroker.Returns(metadataResponse1) @@ -873,20 +878,72 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { coordinatorResponse1 := new(ConsumerMetadataResponse) coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable - seedBroker.Returns(coordinatorResponse1) + coordinator.Returns(coordinatorResponse1) coordinatorResponse2 := new(ConsumerMetadataResponse) - coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID() + coordinatorResponse2.CoordinatorID = coordinator.BrokerID() coordinatorResponse2.CoordinatorHost = "127.0.0.1" - coordinatorResponse2.CoordinatorPort = staleCoordinator.Port() + coordinatorResponse2.CoordinatorPort = coordinator.Port() - seedBroker.Returns(coordinatorResponse2) + coordinator.Returns(coordinatorResponse2) broker, err := client.Coordinator("my_group") if err != nil { t.Error(err) } + if coordinator.Addr() != broker.Addr() { + t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr()) + } + + if coordinator.BrokerID() != broker.ID() { + t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID()) + } + + // Grab the cached value + broker2, err := client.Coordinator("my_group") + if err != nil { + t.Error(err) + } + + if broker2.Addr() != broker.Addr() { + t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr()) + } + + coordinator.Close() + seedBroker.Close() + safeClose(t, client) +} + +func TestClientCoordinatorChangeWithConsumerOffsetsTopic(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + staleCoordinator := NewMockBroker(t, 2) + freshCoordinator := NewMockBroker(t, 3) + + replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()} + metadataResponse1 := new(MetadataResponse) + metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID()) + metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID()) + metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) + seedBroker.Returns(metadataResponse1) + + client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) + if err != nil { + t.Fatal(err) + } + + findCoordinatorResponse := NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, "my_group", staleCoordinator) + staleCoordinator.SetHandlerByMap(map[string]MockResponse{ + "FindCoordinatorRequest": findCoordinatorResponse, + }) + freshCoordinator.SetHandlerByMap(map[string]MockResponse{ + "FindCoordinatorRequest": findCoordinatorResponse, + }) + broker, err := client.Coordinator("my_group") + if err != nil { + t.Error(err) + } + if staleCoordinator.Addr() != broker.Addr() { t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr()) } @@ -905,12 +962,13 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr()) } - coordinatorResponse3 := new(ConsumerMetadataResponse) - coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID() - coordinatorResponse3.CoordinatorHost = "127.0.0.1" - coordinatorResponse3.CoordinatorPort = freshCoordinator.Port() - - seedBroker.Returns(coordinatorResponse3) + findCoordinatorResponse2 := NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, "my_group", freshCoordinator) + staleCoordinator.SetHandlerByMap(map[string]MockResponse{ + "FindCoordinatorRequest": findCoordinatorResponse2, + }) + freshCoordinator.SetHandlerByMap(map[string]MockResponse{ + "FindCoordinatorRequest": findCoordinatorResponse2, + }) // Refresh the locally cached value because it's stale if err := client.RefreshCoordinator("my_group"); err != nil { diff --git a/consumer_test.go b/consumer_test.go index 4096bdd734..126cef6c3e 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -1142,6 +1142,10 @@ func TestConsumeMessagesTrackLeader(t *testing.T) { SetMessage("my_topic", 0, 2, testMsg), }) + leader2.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": mockMetadataResponse1, + }) + client, err := NewClient([]string{leader1.Addr()}, cfg) if err != nil { t.Fatal(err) @@ -1362,21 +1366,23 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { seedBroker will give leader1 as serving my_topic/0 now * my_topic/1 -> leader1 will serve 0 messages`) - // seed broker tells that the new partition 0 leader is leader1 - seedBroker.SetHandlerByMap(map[string]MockResponse{ - "MetadataRequest": NewMockMetadataResponse(t). - SetLeader("my_topic", 0, leader1.BrokerID()). - SetLeader("my_topic", 1, leader1.BrokerID()). - SetBroker(leader0.Addr(), leader0.BrokerID()). - SetBroker(leader1.Addr(), leader1.BrokerID()). - SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), - }) - // leader0 says no longer leader of partition 0 fetchResponse := new(FetchResponse) fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition) + metadataResponse := NewMockMetadataResponse(t). + SetLeader("my_topic", 0, leader1.BrokerID()). + SetLeader("my_topic", 1, leader1.BrokerID()). + SetBroker(leader0.Addr(), leader0.BrokerID()). + SetBroker(leader1.Addr(), leader1.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()) + leader0.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": NewMockWrapper(fetchResponse), + "FetchRequest": NewMockWrapper(fetchResponse), + "MetadataRequest": metadataResponse, + }) + leader1.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": NewMockFetchResponse(t, 1), + "MetadataRequest": metadataResponse, }) time.Sleep(50 * time.Millisecond) @@ -1393,7 +1399,8 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg) } leader1.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": mockFetchResponse2, + "FetchRequest": mockFetchResponse2, + "MetadataRequest": metadataResponse, }) for i := 0; i < 8; i++ { @@ -1409,6 +1416,12 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { * my_topic/1 -> leader1 will return NotLeaderForPartition seedBroker will give leader0 as serving my_topic/1 now`) + metadataResponse2 := NewMockMetadataResponse(t). + SetLeader("my_topic", 0, leader1.BrokerID()). + SetLeader("my_topic", 1, leader0.BrokerID()). + SetBroker(leader0.Addr(), leader0.BrokerID()). + SetBroker(leader1.Addr(), leader1.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()) leader0.SetHandlerByMap(map[string]MockResponse{ "FetchRequest": NewMockFetchResponse(t, 1), }) @@ -1416,16 +1429,6 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { "FetchRequest": NewMockFetchResponse(t, 1), }) - // metadata assigns 0 to leader1 and 1 to leader0 - seedBroker.SetHandlerByMap(map[string]MockResponse{ - "MetadataRequest": NewMockMetadataResponse(t). - SetLeader("my_topic", 0, leader1.BrokerID()). - SetLeader("my_topic", 1, leader0.BrokerID()). - SetBroker(leader0.Addr(), leader0.BrokerID()). - SetBroker(leader1.Addr(), leader1.BrokerID()). - SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), - }) - // leader1 provides three more messages on partition0, says no longer leader of partition1 mockFetchResponse3 := NewMockFetchResponse(t, 3). SetMessage("my_topic", 0, int64(7), testMsg). @@ -1435,7 +1438,12 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { fetchResponse4.AddError("my_topic", 0, ErrNoError) fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition) leader1.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4), + "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4), + "MetadataRequest": metadataResponse2, + }) + leader0.SetHandlerByMap(map[string]MockResponse{ + "FetchRequest": NewMockFetchResponse(t, 1), + "MetadataRequest": metadataResponse2, }) t.Log(` STAGE 5: @@ -1448,7 +1456,8 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg) } leader0.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": mockFetchResponse4, + "FetchRequest": mockFetchResponse4, + "MetadataRequest": metadataResponse2, }) for i := 7; i < 10; i++ { @@ -1593,7 +1602,8 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) { // Bring broker0 back to service. broker0 = NewMockBrokerAddr(t, 0, broker0Addr) broker0.SetHandlerByMap(map[string]MockResponse{ - "FetchRequest": mockFetchResponse, + "MetadataRequest": mockMetadataResponse, + "FetchRequest": mockFetchResponse, }) // Read the rest of messages from both partitions. diff --git a/functional_test.go b/functional_test.go index 3607fd145a..fd953ec7cc 100644 --- a/functional_test.go +++ b/functional_test.go @@ -228,7 +228,7 @@ mainLoop: } for _, broker := range brokers { err := broker.Open(client.Config()) - if err != nil { + if err != nil && !errors.Is(err, ErrAlreadyConnected) { client.Close() continue retryLoop } diff --git a/mockbroker.go b/mockbroker.go index 8b73074fb3..6e5d90608a 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -98,6 +98,20 @@ func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) { }) } +// SetHandlerFuncByMap defines mapping of Request types to RequestHandlerFunc. When a +// request is received by the broker, it looks up the request type in the map +// and invoke the found RequestHandlerFunc instance to generate an appropriate reply. +func (b *MockBroker) SetHandlerFuncByMap(handlerMap map[string]requestHandlerFunc) { + fnMap := make(map[string]requestHandlerFunc) + for k, v := range handlerMap { + fnMap[k] = v + } + b.setHandler(func(req *request) (res encoderWithHeader) { + reqTypeName := reflect.TypeOf(req.body).Elem().Name() + return fnMap[reqTypeName](req) + }) +} + // SetNotifier set a function that will get invoked whenever a request has been // processed successfully and will provide the number of bytes read and written func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) { diff --git a/offset_manager_test.go b/offset_manager_test.go index af95bc9e4c..c3ac33641f 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -37,7 +37,7 @@ func initOffsetManagerWithBackoffFunc( t.Fatal(err) } - broker.Returns(&ConsumerMetadataResponse{ + coordinator.Returns(&ConsumerMetadataResponse{ CoordinatorID: coordinator.BrokerID(), CoordinatorHost: "127.0.0.1", CoordinatorPort: coordinator.Port(), @@ -251,7 +251,7 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) { // Refresh coordinator newCoordinator := NewMockBroker(t, 3) defer newCoordinator.Close() - broker.Returns(&ConsumerMetadataResponse{ + coordinator.Returns(&ConsumerMetadataResponse{ CoordinatorID: newCoordinator.BrokerID(), CoordinatorHost: "127.0.0.1", CoordinatorPort: newCoordinator.Port(), @@ -492,36 +492,34 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) { ocResponse.AddError("my_topic", 1, ErrNoError) coordinator.Returns(ocResponse) - newCoordinator := NewMockBroker(t, 3) - defer newCoordinator.Close() - // For RefreshCoordinator() - broker.Returns(&ConsumerMetadataResponse{ - CoordinatorID: newCoordinator.BrokerID(), + coordinator.Returns(&ConsumerMetadataResponse{ + CoordinatorID: coordinator.BrokerID(), CoordinatorHost: "127.0.0.1", - CoordinatorPort: newCoordinator.Port(), + CoordinatorPort: coordinator.Port(), }) // Nothing in response.Errors at all ocResponse2 := new(OffsetCommitResponse) - newCoordinator.Returns(ocResponse2) + coordinator.Returns(ocResponse2) // No error, no need to refresh coordinator // Error on the wrong partition for this pom ocResponse3 := new(OffsetCommitResponse) ocResponse3.AddError("my_topic", 1, ErrNoError) - newCoordinator.Returns(ocResponse3) - - // No error, no need to refresh coordinator + coordinator.Returns(ocResponse3) // ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block ocResponse4 := new(OffsetCommitResponse) ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition) - newCoordinator.Returns(ocResponse4) + coordinator.Returns(ocResponse4) + + newCoordinator := NewMockBroker(t, 3) + defer newCoordinator.Close() // For RefreshCoordinator() - broker.Returns(&ConsumerMetadataResponse{ + coordinator.Returns(&ConsumerMetadataResponse{ CoordinatorID: newCoordinator.BrokerID(), CoordinatorHost: "127.0.0.1", CoordinatorPort: newCoordinator.Port(), diff --git a/sync_producer_test.go b/sync_producer_test.go index d554448583..8d366b011b 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -89,7 +89,7 @@ func TestSyncProducerTransactional(t *testing.T) { findCoordinatorResponse := new(FindCoordinatorResponse) findCoordinatorResponse.Coordinator = client.Brokers()[0] findCoordinatorResponse.Version = 1 - seedBroker.Returns(findCoordinatorResponse) + leader.Returns(findCoordinatorResponse) initProducerIdResponse := new(InitProducerIDResponse) leader.Returns(initProducerIdResponse)