From cc444e0373880e6546a1ee2f13d3b73dcfbaed6c Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 25 Feb 2015 15:51:44 +0000 Subject: [PATCH] Lazily connect to brokers in the client Instead of opening a connection to all brokers immediately upon receiving their information in metadata, wait until we are asked for them either via a call to `Leader` or a call to `any`. --- client.go | 8 +++----- client_test.go | 12 ++++-------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/client.go b/client.go index 2698ee2558..0dc6f90341 100644 --- a/client.go +++ b/client.go @@ -337,10 +337,12 @@ func (client *Client) any() *Broker { defer client.lock.RUnlock() if client.seedBroker != nil { + _ = client.seedBroker.Open(client.conf) return client.seedBroker } for _, broker := range client.brokers { + _ = broker.Open(client.conf) return broker } @@ -436,6 +438,7 @@ func (client *Client) cachedLeader(topic string, partitionID int32) (*Broker, er if b == nil { return nil, ErrLeaderNotAvailable } + _ = b.Open(client.conf) return b, nil } } @@ -538,17 +541,12 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) { // - if it is a new ID, save it // - if it is an existing ID, but the address we have is stale, discard the old one and save it // - otherwise ignore it, replacing our existing one would just bounce the connection - // We asynchronously try to open connections to the new brokers. We don't care if they - // fail, since maybe that broker is unreachable but doesn't have a topic we care about. - // If it fails and we do care, whoever tries to use it will get the connection error. for _, broker := range data.Brokers { if client.brokers[broker.ID()] == nil { - _ = broker.Open(client.conf) client.brokers[broker.ID()] = broker Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr()) } else if broker.Addr() != client.brokers[broker.ID()].Addr() { safeAsyncClose(client.brokers[broker.ID()]) - _ = broker.Open(client.conf) client.brokers[broker.ID()] = broker Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr()) } diff --git a/client_test.go b/client_test.go index 0caf1bb801..2d3db991bb 100644 --- a/client_test.go +++ b/client_test.go @@ -28,15 +28,14 @@ func TestSimpleClient(t *testing.T) { func TestCachedPartitions(t *testing.T) { seedBroker := newMockBroker(t, 1) - leader := newMockBroker(t, 5) replicas := []int32{3, 1, 5} isr := []int32{5, 1} metadataResponse := new(MetadataResponse) - metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) - metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, ErrNoError) - metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable) + metadataResponse.AddBroker("localhost:12345", 2) + metadataResponse.AddTopicPartition("my_topic", 0, 2, replicas, isr, ErrNoError) + metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, ErrLeaderNotAvailable) seedBroker.Returns(metadataResponse) config := NewConfig() @@ -61,17 +60,15 @@ func TestCachedPartitions(t *testing.T) { t.Fatal("Not using the cache!") } - leader.Close() seedBroker.Close() safeClose(t, client) } func TestClientSeedBrokers(t *testing.T) { seedBroker := newMockBroker(t, 1) - discoveredBroker := newMockBroker(t, 2) metadataResponse := new(MetadataResponse) - metadataResponse.AddBroker(discoveredBroker.Addr(), discoveredBroker.BrokerID()) + metadataResponse.AddBroker("localhost:12345", 2) seedBroker.Returns(metadataResponse) client, err := NewClient([]string{seedBroker.Addr()}, nil) @@ -79,7 +76,6 @@ func TestClientSeedBrokers(t *testing.T) { t.Fatal(err) } - discoveredBroker.Close() seedBroker.Close() safeClose(t, client) }