Skip to content

Commit

Permalink
Revise broker management
Browse files Browse the repository at this point in the history
Copy the scala library's idea of seed brokers
Make the producer and consumer request a metadata refresh when they
can't get the leader of a partition they want
Connect to a broker only when it is actually requested, not when it is
known of
Don't request all partition leaders on init
  • Loading branch information
bouk committed Sep 18, 2013
1 parent 901b907 commit 0016f3d
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 93 deletions.
137 changes: 52 additions & 85 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"math/rand"
"sort"
"sync"
"time"
Expand All @@ -23,8 +24,7 @@ type Client struct {
// the broker addresses given to us through the constructor are not guaranteed to be returned in
// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
// so we store them separately
extraBrokerAddrs []string
extraBroker *Broker
seedBrokers []string

brokers map[int32]*Broker // maps broker ids to brokers
leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
Expand All @@ -48,20 +48,11 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
}

client := &Client{
id: id,
config: *config,
extraBrokerAddrs: addrs,
extraBroker: NewBroker(addrs[0]),
brokers: make(map[int32]*Broker),
leaders: make(map[string]map[int32]int32),
}
client.extraBroker.Open()

// do an initial fetch of all cluster metadata by specifing an empty list of topics
err := client.RefreshAllMetadata()
if err != nil {
client.Close() // this closes tmp, since it's still in the brokers hash
return nil, err
id: id,
config: *config,
seedBrokers: addrs,
brokers: make(map[int32]*Broker),
leaders: make(map[string]map[int32]int32),
}

return client, nil
Expand All @@ -80,10 +71,6 @@ func (client *Client) Close() error {
client.brokers = nil
client.leaders = nil

if client.extraBroker != nil {
go client.extraBroker.Close()
}

return nil
}

Expand Down Expand Up @@ -130,12 +117,15 @@ func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
return nil, err
}
leader = client.cachedLeader(topic, partitionID)
}

if leader == nil {
return nil, UnknownTopicOrPartition
if leader == nil {
return nil, UnknownTopicOrPartition
}
}

// Gives an error if it is already connected, ignore that error
leader.Open()

return leader, nil
}

Expand All @@ -153,72 +143,51 @@ func (client *Client) RefreshAllMetadata() error {

// misc private helper functions

// XXX: see https://github.com/Shopify/sarama/issues/15
// and https://github.com/Shopify/sarama/issues/23
// disconnectBroker is a bad hacky way to accomplish broker management. It should be replaced with
// something sane and the replacement should be made part of the public Client API
func (client *Client) disconnectBroker(broker *Broker) {
client.lock.Lock()
defer client.lock.Unlock()

if broker == client.extraBroker {
client.extraBrokerAddrs = client.extraBrokerAddrs[1:]
if len(client.extraBrokerAddrs) > 0 {
client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
client.extraBroker.Open()
} else {
client.extraBroker = nil
}
} else {
// we don't need to update the leaders hash, it will automatically get refreshed next time because
// the broker lookup will return nil
delete(client.brokers, broker.ID())
func (client *Client) refreshMetadata(topics []string, retries int) error {
// Shuffle the array http://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
for i := range client.seedBrokers {
j := rand.Intn(i + 1)
client.seedBrokers[i], client.seedBrokers[j] = client.seedBrokers[j], client.seedBrokers[i]
}

go broker.Close()
}
for _, seed := range client.seedBrokers {
broker := NewBroker(seed)
broker.Open()

// Connection failed, try next broker
if ok, _ := broker.Connected(); !ok {
continue
}

func (client *Client) refreshMetadata(topics []string, retries int) error {
for broker := client.any(); broker != nil; broker = client.any() {
response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})

switch err {
case nil:
// valid response, use it
retry, err := client.update(response)
switch {
case err != nil:
return err
case len(retry) == 0:
return nil
default:
if retries <= 0 {
return LeaderNotAvailable
}
time.Sleep(client.config.WaitForElection) // wait for leader election
return client.refreshMetadata(retry, retries-1)
}
case EncodingError:
// didn't even send, return the error
if err != nil {
return err
}

// some other error, remove that broker and try again
client.disconnectBroker(broker)
}
// valid response, use it
retry, err := client.update(response)

return OutOfBrokers
}
if err != nil {
return err
}

func (client *Client) any() *Broker {
client.lock.RLock()
defer client.lock.RUnlock()
// all topics have been refreshed, we're done
if len(retry) == 0 {
return nil
}

for _, broker := range client.brokers {
return broker
}
if retries <= 0 {
return LeaderNotAvailable
}

return client.extraBroker
// Back off for a little while before retrying
time.Sleep(client.config.WaitForElection)
return client.refreshMetadata(retry, retries-1)
}
// TODO: if there's still retries left, maybe wait for a certain amount of time and then retry
// the seed brokers
return OutOfBrokers
}

func (client *Client) cachedLeader(topic string, partitionID int32) *Broker {
Expand Down Expand Up @@ -261,18 +230,16 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {

// For all the brokers we received:
// - if it is a new ID, save it
// - if it is an existing ID, but the address we have is stale, discard the old one and save it
// - otherwise ignore it, replacing our existing one would just bounce the connection
// We asynchronously try to open connections to the new brokers. We don't care if they
// fail, since maybe that broker is unreachable but doesn't have a topic we care about.
// If it fails and we do care, whoever tries to use it will get the connection error.
// - if it is an existing ID, close the existing connection and reopen it
// (like the Scala library does)
// We don't start opening connections, this happens on demand when the leader is requested
// XXX: it could be the case that a perfectly healthy broker is dropped and that another
// consumer/producer is affected by this, causing it to also request new meta data
for _, broker := range data.Brokers {
if client.brokers[broker.ID()] == nil {
broker.Open()
client.brokers[broker.ID()] = broker
} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
} else {
go client.brokers[broker.ID()].Close()
broker.Open()
client.brokers[broker.ID()] = broker
}
}
Expand Down
2 changes: 0 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,4 @@ func TestClientRefreshBehaviour(t *testing.T) {
} else if tst.ID() != 0xaa {
t.Error("Leader for my_topic had incorrect ID.")
}

client.disconnectBroker(tst)
}
29 changes: 24 additions & 5 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,29 @@ func (c *Consumer) fetchMessages() {
return
}
default:
c.client.disconnectBroker(c.broker)
for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
if !c.sendError(err) {
return
// Connection failed, try to get the new leader
// First let the client refresh the topic metadata to make sure it grabs the correct leader
// The client will close the connection to the old (failing) broker when it finds the new leader
// Endlessly try to reconnect
// XXX: if it can't connect to the seed brokers it will keep trying without delay, not a good idea
for {
if err = c.client.RefreshTopicMetadata(c.topic); err != nil {
if c.sendError(err) {
continue
} else {
return
}
}
if c.broker, err = c.client.Leader(c.topic, c.partition); err != nil {
if c.sendError(err) {
continue
} else {
return
}
}

if c.broker != nil {
break
}
}
continue
Expand Down Expand Up @@ -295,7 +314,7 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
if !retry {
return -1, err
}
c.client.disconnectBroker(c.broker)
c.client.RefreshTopicMetadata(c.topic)
c.broker, err = c.client.Leader(c.topic, c.partition)
if err != nil {
return -1, err
Expand Down
6 changes: 5 additions & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
if !retry {
return err
}
p.client.disconnectBroker(broker)

// Try to reconnect to the broker, failing if it can't
p.client.RefreshTopicMetadata(p.topic)
return p.safeSendMessage(key, value, false)
}

Expand All @@ -133,6 +135,8 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
if !retry {
return block.Err
}

// Try to find the broker
err = p.client.RefreshTopicMetadata(p.topic)
if err != nil {
return err
Expand Down

0 comments on commit 0016f3d

Please sign in to comment.