From a0e96da4d1e88008ec49c7ea190544c7268f1b14 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 13 Aug 2013 21:45:48 -0400 Subject: [PATCH 1/4] Rework how the client connects to brokers. Fixes #9. This ended up being more complicated than I had hoped and touched several different areas. TL;DR is that we now connect to the other brokers in the cluster asynchronously. Errors connecting only show up when somebody tries to use that broker. This is better than the old behaviour since it means that if some brokers in a cluster go down but the topics we care about are still available, we just keep going instead of blowing up for no reason. The complicated part is that simply calling `go broker.Connect()` doesn't do what we want, so I had to write a `broker.AsyncConnect()`. The problem occurs if you've got code like this: go broker.Connect() // do some stuff broker.SendSomeMessage() What can happen is that SendSomeMessage can be run before the Connect() goroutine ever gets scheduled, in which case SendSomeMessage will simply return NotConnected. The desired behaviour is that SendSomeMessage waits for Connect() to finish, which means Connect() has to *synchronously* take the broker lock before it launches the asynchronous connect call. Lots of fun. And bonus change in this commit: rather than special-casing leader == -1 in `client.cachedLeader` and adding a big long comment to the LEADER_NOT_AVAILABLE case explaining the fallthrough statement, just delete that partition from the hash. So much easier to follow, I must have been on crack when I wrote the old way. --- broker.go | 32 ++++++++++++++++++++++++++++++-- client.go | 32 ++++++++++++-------------------- 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/broker.go b/broker.go index 5958476a7..f422d2234 100644 --- a/broker.go +++ b/broker.go @@ -14,6 +14,7 @@ type Broker struct { correlation_id int32 conn net.Conn + conn_err error lock sync.Mutex responses chan responsePromise @@ -27,7 +28,7 @@ type responsePromise struct { } // NewBroker creates and returns a Broker targetting the given host:port address. -// This does not attempt to actually connect, you have to call Connect() for that. +// This does not attempt to actually connect, you have to call Connect() or AsyncConnect() for that. func NewBroker(host string, port int32) *Broker { b := new(Broker) b.id = -1 // don't know it yet @@ -40,17 +41,39 @@ func (b *Broker) Connect() error { b.lock.Lock() defer b.lock.Unlock() + return b.connect() +} + +// AsyncConnect tries to connect to the Broker in a non-blocking way. Calling `broker.AsyncConnect()` is +// *NOT* the same as calling `go broker.Connect()` - AsyncConnect takes the broker lock synchronously before +// launching its goroutine, so that subsequent operations on the broker are guaranteed to block waiting for +// the connection instead of simply returning NotConnected. This does mean that if someone is already operating +// on the broker, AsyncConnect may not be truly asynchronous while it waits for the lock. +func (b *Broker) AsyncConnect() { + b.lock.Lock() + + go func() { + defer b.lock.Unlock() + b.connect() + }() + +} + +func (b *Broker) connect() error { if b.conn != nil { return AlreadyConnected } + b.conn_err = nil addr, err := net.ResolveIPAddr("ip", b.host) if err != nil { + b.conn_err = err return err } b.conn, err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)}) if err != nil { + b.conn_err = err return err } @@ -78,6 +101,7 @@ func (b *Broker) Close() error { err := b.conn.Close() b.conn = nil + b.conn_err = nil b.done = nil b.responses = nil @@ -184,7 +208,11 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool) defer b.lock.Unlock() if b.conn == nil { - return nil, NotConnected + if b.conn_err != nil { + return nil, b.conn_err + } else { + return nil, NotConnected + } } fullRequest := request{b.correlation_id, clientID, req} diff --git a/client.go b/client.go index 80d70df95..c6020dcb1 100644 --- a/client.go +++ b/client.go @@ -177,7 +177,7 @@ func (client *Client) cachedLeader(topic string, partition_id int32) *Broker { partitions := client.leaders[topic] if partitions != nil { leader, ok := partitions[partition_id] - if ok && leader != -1 { + if ok { return client.brokers[leader] } } @@ -205,34 +205,29 @@ func (client *Client) cachedPartitions(topic string) []int32 { // if no fatal error, returns a list of topics that need retrying due to LEADER_NOT_AVAILABLE func (client *Client) update(data *MetadataResponse) ([]string, error) { + client.lock.Lock() + defer client.lock.Unlock() + // First discard brokers that we already know about. This avoids bouncing TCP connections, // and especially avoids closing valid connections out from under other code which may be trying - // to use them. We only need a read-lock for this. + // to use them. var newBrokers []*Broker - client.lock.RLock() for _, broker := range data.Brokers { if !broker.Equals(client.brokers[broker.ID()]) { newBrokers = append(newBrokers, broker) } } - client.lock.RUnlock() - - // connect to the brokers before taking the write lock, as this can take a while - // to timeout if one of them isn't reachable - for _, broker := range newBrokers { - err := broker.Connect() - if err != nil { - return nil, err - } - } - - client.lock.Lock() - defer client.lock.Unlock() + // Now asynchronously try to open connections to the new brokers. We don't care if they + // fail, since maybe that broker is unreachable but doesn't have a topic we care about. + // If it fails and we do care, whoever tries to use it will get the connection error. + // If we have an old broker with that ID (but a different host/port, since they didn't + // compare as equals above) then close and remove that broker before saving the new one. for _, broker := range newBrokers { if client.brokers[broker.ID()] != nil { go client.brokers[broker.ID()].Close() } + broker.AsyncConnect() client.brokers[broker.ID()] = broker } @@ -251,11 +246,8 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) { for _, partition := range topic.Partitions { switch partition.Err { case LEADER_NOT_AVAILABLE: - // in the LEADER_NOT_AVAILABLE case partition.Leader will be -1 because the - // partition is in the middle of leader election, so we fallthrough to save it - // anyways in order to avoid returning the stale leader (since -1 isn't a valid broker ID) toRetry[topic.Name] = true - fallthrough + delete(client.leaders[topic.Name], partition.Id) case NO_ERROR: client.leaders[topic.Name][partition.Id] = partition.Leader default: From 46629e316918b69c565db54fa4f709153cbcdf43 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 14 Aug 2013 12:42:22 -0400 Subject: [PATCH 2/4] No need for a separate err variable, thanks Burke --- broker.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/broker.go b/broker.go index f422d2234..99ef93f9d 100644 --- a/broker.go +++ b/broker.go @@ -63,18 +63,16 @@ func (b *Broker) connect() error { if b.conn != nil { return AlreadyConnected } - b.conn_err = nil - addr, err := net.ResolveIPAddr("ip", b.host) - if err != nil { - b.conn_err = err - return err + var addr *net.IPAddr + addr, b.conn_err = net.ResolveIPAddr("ip", b.host) + if b.conn_err != nil { + return b.conn_err } - b.conn, err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)}) - if err != nil { - b.conn_err = err - return err + b.conn, b.conn_err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)}) + if b.conn_err != nil { + return b.conn_err } b.done = make(chan bool) From 934c5688a1f860ffdcbeaf70c1b651179402fad1 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 14 Aug 2013 12:57:30 -0400 Subject: [PATCH 3/4] Match the golang database pattern Replaces Connect and AsyncConnect with Open and Connected --- broker.go | 66 ++++++++++++++++++++++++-------------------------- broker_test.go | 4 +-- client.go | 8 ++++-- errors.go | 2 +- 4 files changed, 41 insertions(+), 39 deletions(-) diff --git a/broker.go b/broker.go index 99ef93f9d..7a5685a8a 100644 --- a/broker.go +++ b/broker.go @@ -28,7 +28,7 @@ type responsePromise struct { } // NewBroker creates and returns a Broker targetting the given host:port address. -// This does not attempt to actually connect, you have to call Connect() or AsyncConnect() for that. +// This does not attempt to actually connect, you have to call Open() for that. func NewBroker(host string, port int32) *Broker { b := new(Broker) b.id = -1 // don't know it yet @@ -37,52 +37,50 @@ func NewBroker(host string, port int32) *Broker { return b } -func (b *Broker) Connect() error { +// Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which +// connects and releases the lock. This means any subsequent operations on the broker will block waiting for +// the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). +// The only error Open will return directly is AlreadyConnected. +func (b *Broker) Open() error { b.lock.Lock() - defer b.lock.Unlock() - - return b.connect() -} -// AsyncConnect tries to connect to the Broker in a non-blocking way. Calling `broker.AsyncConnect()` is -// *NOT* the same as calling `go broker.Connect()` - AsyncConnect takes the broker lock synchronously before -// launching its goroutine, so that subsequent operations on the broker are guaranteed to block waiting for -// the connection instead of simply returning NotConnected. This does mean that if someone is already operating -// on the broker, AsyncConnect may not be truly asynchronous while it waits for the lock. -func (b *Broker) AsyncConnect() { - b.lock.Lock() + if b.conn != nil { + b.lock.Unlock() + return AlreadyConnected + } go func() { defer b.lock.Unlock() - b.connect() - }() -} + var addr *net.IPAddr + addr, b.conn_err = net.ResolveIPAddr("ip", b.host) + if b.conn_err != nil { + return + } -func (b *Broker) connect() error { - if b.conn != nil { - return AlreadyConnected - } + b.conn, b.conn_err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)}) + if b.conn_err != nil { + return + } - var addr *net.IPAddr - addr, b.conn_err = net.ResolveIPAddr("ip", b.host) - if b.conn_err != nil { - return b.conn_err - } + b.done = make(chan bool) - b.conn, b.conn_err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)}) - if b.conn_err != nil { - return b.conn_err - } + // permit a few outstanding requests before we block waiting for responses + b.responses = make(chan responsePromise, 4) - b.done = make(chan bool) + go b.responseReceiver() + }() - // permit a few outstanding requests before we block waiting for responses - b.responses = make(chan responsePromise, 4) + return nil +} - go b.responseReceiver() +// Connected returns true if the broker is connected and false otherwise. If the broker is not +// connected but it had tried to connect, the error from that connection attempt is also returned. +func (b *Broker) Connected() (bool, error) { + b.lock.Lock() + defer b.lock.Unlock() - return nil + return b.conn != nil, b.conn_err } func (b *Broker) Close() error { diff --git a/broker_test.go b/broker_test.go index a63191856..9b2fae08a 100644 --- a/broker_test.go +++ b/broker_test.go @@ -141,7 +141,7 @@ func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker { func ExampleBroker() error { broker := NewBroker("localhost", 9092) - err := broker.Connect() + err := broker.Open() if err != nil { return err } @@ -217,7 +217,7 @@ func TestSimpleBrokerCommunication(t *testing.T) { defer mockBroker.Close() broker := NewBroker("localhost", mockBroker.Port()) - err := broker.Connect() + err := broker.Open() if err != nil { t.Fatal(err) } diff --git a/client.go b/client.go index c6020dcb1..691662ed8 100644 --- a/client.go +++ b/client.go @@ -22,7 +22,11 @@ type Client struct { // If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created. func NewClient(id string, host string, port int32) (client *Client, err error) { tmp := NewBroker(host, port) - err = tmp.Connect() + err = tmp.Open() + if err != nil { + return nil, err + } + _, err = tmp.Connected() if err != nil { return nil, err } @@ -227,7 +231,7 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) { if client.brokers[broker.ID()] != nil { go client.brokers[broker.ID()].Close() } - broker.AsyncConnect() + broker.Open() client.brokers[broker.ID()] = broker } diff --git a/errors.go b/errors.go index 818525494..7d2262071 100644 --- a/errors.go +++ b/errors.go @@ -17,7 +17,7 @@ var IncompleteResponse = errors.New("kafka: Response did not contain all the exp // (meaning one outside of the range [0...numPartitions-1]). var InvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index.") -// AlreadyConnected is the error returned when calling Connect() on a Broker that is already connected. +// AlreadyConnected is the error returned when calling Open() on a Broker that is already connected. var AlreadyConnected = errors.New("kafka: broker: already connected") // NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected. From cb8190ccd770bd55e511b9c04f70d508a29f4408 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Wed, 14 Aug 2013 13:03:16 -0400 Subject: [PATCH 4/4] tweak broker example --- broker_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/broker_test.go b/broker_test.go index 9b2fae08a..23dd573f8 100644 --- a/broker_test.go +++ b/broker_test.go @@ -145,14 +145,16 @@ func ExampleBroker() error { if err != nil { return err } + defer broker.Close() request := MetadataRequest{Topics: []string{"myTopic"}} response, err := broker.GetMetadata("myClient", &request) + if err != nil { + return err + } fmt.Println("There are", len(response.Topics), "topics active in the cluster.") - broker.Close() - return nil }