From 0016f3de30251aa9143e461ff29ec41b48dccfe0 Mon Sep 17 00:00:00 2001 From: Bouke van der Bijl Date: Wed, 18 Sep 2013 12:19:29 +0200 Subject: [PATCH] Revise broker management Copy the scala library's idea of seed brokers Make the producer and consumer request a metadata refresh when they can't get the leader of a partition they want Connect to a broker only when it is actually requested, not when it is known of Don't request all partition leaders on init --- client.go | 137 +++++++++++++++++++------------------------------ client_test.go | 2 - consumer.go | 29 +++++++++-- producer.go | 6 ++- 4 files changed, 81 insertions(+), 93 deletions(-) diff --git a/client.go b/client.go index c103e88f2..c872280cc 100644 --- a/client.go +++ b/client.go @@ -1,6 +1,7 @@ package sarama import ( + "math/rand" "sort" "sync" "time" @@ -23,8 +24,7 @@ type Client struct { // the broker addresses given to us through the constructor are not guaranteed to be returned in // the cluster metadata (I *think* it only returns brokers who are currently leading partitions?) // so we store them separately - extraBrokerAddrs []string - extraBroker *Broker + seedBrokers []string brokers map[int32]*Broker // maps broker ids to brokers leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids @@ -48,20 +48,11 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error) } client := &Client{ - id: id, - config: *config, - extraBrokerAddrs: addrs, - extraBroker: NewBroker(addrs[0]), - brokers: make(map[int32]*Broker), - leaders: make(map[string]map[int32]int32), - } - client.extraBroker.Open() - - // do an initial fetch of all cluster metadata by specifing an empty list of topics - err := client.RefreshAllMetadata() - if err != nil { - client.Close() // this closes tmp, since it's still in the brokers hash - return nil, err + id: id, + config: *config, + seedBrokers: addrs, + brokers: make(map[int32]*Broker), + leaders: make(map[string]map[int32]int32), } return client, nil @@ -80,10 +71,6 @@ func (client *Client) Close() error { client.brokers = nil client.leaders = nil - if client.extraBroker != nil { - go client.extraBroker.Close() - } - return nil } @@ -130,12 +117,15 @@ func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) { return nil, err } leader = client.cachedLeader(topic, partitionID) - } - if leader == nil { - return nil, UnknownTopicOrPartition + if leader == nil { + return nil, UnknownTopicOrPartition + } } + // Gives an error if it is already connected, ignore that error + leader.Open() + return leader, nil } @@ -153,72 +143,51 @@ func (client *Client) RefreshAllMetadata() error { // misc private helper functions -// XXX: see https://github.com/Shopify/sarama/issues/15 -// and https://github.com/Shopify/sarama/issues/23 -// disconnectBroker is a bad hacky way to accomplish broker management. It should be replaced with -// something sane and the replacement should be made part of the public Client API -func (client *Client) disconnectBroker(broker *Broker) { - client.lock.Lock() - defer client.lock.Unlock() - - if broker == client.extraBroker { - client.extraBrokerAddrs = client.extraBrokerAddrs[1:] - if len(client.extraBrokerAddrs) > 0 { - client.extraBroker = NewBroker(client.extraBrokerAddrs[0]) - client.extraBroker.Open() - } else { - client.extraBroker = nil - } - } else { - // we don't need to update the leaders hash, it will automatically get refreshed next time because - // the broker lookup will return nil - delete(client.brokers, broker.ID()) +func (client *Client) refreshMetadata(topics []string, retries int) error { + // Shuffle the array http://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle + for i := range client.seedBrokers { + j := rand.Intn(i + 1) + client.seedBrokers[i], client.seedBrokers[j] = client.seedBrokers[j], client.seedBrokers[i] } - go broker.Close() -} + for _, seed := range client.seedBrokers { + broker := NewBroker(seed) + broker.Open() + + // Connection failed, try next broker + if ok, _ := broker.Connected(); !ok { + continue + } -func (client *Client) refreshMetadata(topics []string, retries int) error { - for broker := client.any(); broker != nil; broker = client.any() { response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics}) - switch err { - case nil: - // valid response, use it - retry, err := client.update(response) - switch { - case err != nil: - return err - case len(retry) == 0: - return nil - default: - if retries <= 0 { - return LeaderNotAvailable - } - time.Sleep(client.config.WaitForElection) // wait for leader election - return client.refreshMetadata(retry, retries-1) - } - case EncodingError: - // didn't even send, return the error + if err != nil { return err } - // some other error, remove that broker and try again - client.disconnectBroker(broker) - } + // valid response, use it + retry, err := client.update(response) - return OutOfBrokers -} + if err != nil { + return err + } -func (client *Client) any() *Broker { - client.lock.RLock() - defer client.lock.RUnlock() + // all topics have been refreshed, we're done + if len(retry) == 0 { + return nil + } - for _, broker := range client.brokers { - return broker - } + if retries <= 0 { + return LeaderNotAvailable + } - return client.extraBroker + // Back off for a little while before retrying + time.Sleep(client.config.WaitForElection) + return client.refreshMetadata(retry, retries-1) + } + // TODO: if there's still retries left, maybe wait for a certain amount of time and then retry + // the seed brokers + return OutOfBrokers } func (client *Client) cachedLeader(topic string, partitionID int32) *Broker { @@ -261,18 +230,16 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) { // For all the brokers we received: // - if it is a new ID, save it - // - if it is an existing ID, but the address we have is stale, discard the old one and save it - // - otherwise ignore it, replacing our existing one would just bounce the connection - // We asynchronously try to open connections to the new brokers. We don't care if they - // fail, since maybe that broker is unreachable but doesn't have a topic we care about. - // If it fails and we do care, whoever tries to use it will get the connection error. + // - if it is an existing ID, close the existing connection and reopen it + // (like the Scala library does) + // We don't start opening connections, this happens on demand when the leader is requested + // XXX: it could be the case that a perfectly healthy broker is dropped and that another + // consumer/producer is affected by this, causing it to also request new meta data for _, broker := range data.Brokers { if client.brokers[broker.ID()] == nil { - broker.Open() client.brokers[broker.ID()] = broker - } else if broker.Addr() != client.brokers[broker.ID()].Addr() { + } else { go client.brokers[broker.ID()].Close() - broker.Open() client.brokers[broker.ID()] = broker } } diff --git a/client_test.go b/client_test.go index ea6da71a3..fea82c7b8 100644 --- a/client_test.go +++ b/client_test.go @@ -157,6 +157,4 @@ func TestClientRefreshBehaviour(t *testing.T) { } else if tst.ID() != 0xaa { t.Error("Leader for my_topic had incorrect ID.") } - - client.disconnectBroker(tst) } diff --git a/consumer.go b/consumer.go index 79386d8ea..788d87851 100644 --- a/consumer.go +++ b/consumer.go @@ -194,10 +194,29 @@ func (c *Consumer) fetchMessages() { return } default: - c.client.disconnectBroker(c.broker) - for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) { - if !c.sendError(err) { - return + // Connection failed, try to get the new leader + // First let the client refresh the topic metadata to make sure it grabs the correct leader + // The client will close the connection to the old (failing) broker when it finds the new leader + // Endlessly try to reconnect + // XXX: if it can't connect to the seed brokers it will keep trying without delay, not a good idea + for { + if err = c.client.RefreshTopicMetadata(c.topic); err != nil { + if c.sendError(err) { + continue + } else { + return + } + } + if c.broker, err = c.client.Leader(c.topic, c.partition); err != nil { + if c.sendError(err) { + continue + } else { + return + } + } + + if c.broker != nil { + break } } continue @@ -295,7 +314,7 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) { if !retry { return -1, err } - c.client.disconnectBroker(c.broker) + c.client.RefreshTopicMetadata(c.topic) c.broker, err = c.client.Leader(c.topic, c.partition) if err != nil { return -1, err diff --git a/producer.go b/producer.go index c759cc4d0..20e8f0dc7 100644 --- a/producer.go +++ b/producer.go @@ -113,7 +113,9 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error { if !retry { return err } - p.client.disconnectBroker(broker) + + // Try to reconnect to the broker, failing if it can't + p.client.RefreshTopicMetadata(p.topic) return p.safeSendMessage(key, value, false) } @@ -133,6 +135,8 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error { if !retry { return block.Err } + + // Try to find the broker err = p.client.RefreshTopicMetadata(p.topic) if err != nil { return err