Skip to content

Commit

Permalink
Match the golang database pattern
Browse files Browse the repository at this point in the history
Replaces Connect and AsyncConnect with Open and Connected
  • Loading branch information
Evan Huus committed Aug 14, 2013
1 parent 46629e3 commit 934c568
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 39 deletions.
66 changes: 32 additions & 34 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type responsePromise struct {
}

// NewBroker creates and returns a Broker targetting the given host:port address.
// This does not attempt to actually connect, you have to call Connect() or AsyncConnect() for that.
// This does not attempt to actually connect, you have to call Open() for that.
func NewBroker(host string, port int32) *Broker {
b := new(Broker)
b.id = -1 // don't know it yet
Expand All @@ -37,52 +37,50 @@ func NewBroker(host string, port int32) *Broker {
return b
}

func (b *Broker) Connect() error {
// Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which
// connects and releases the lock. This means any subsequent operations on the broker will block waiting for
// the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected().
// The only error Open will return directly is AlreadyConnected.
func (b *Broker) Open() error {
b.lock.Lock()
defer b.lock.Unlock()

return b.connect()
}

// AsyncConnect tries to connect to the Broker in a non-blocking way. Calling `broker.AsyncConnect()` is
// *NOT* the same as calling `go broker.Connect()` - AsyncConnect takes the broker lock synchronously before
// launching its goroutine, so that subsequent operations on the broker are guaranteed to block waiting for
// the connection instead of simply returning NotConnected. This does mean that if someone is already operating
// on the broker, AsyncConnect may not be truly asynchronous while it waits for the lock.
func (b *Broker) AsyncConnect() {
b.lock.Lock()
if b.conn != nil {
b.lock.Unlock()
return AlreadyConnected
}

go func() {
defer b.lock.Unlock()
b.connect()
}()

}
var addr *net.IPAddr
addr, b.conn_err = net.ResolveIPAddr("ip", b.host)
if b.conn_err != nil {
return
}

func (b *Broker) connect() error {
if b.conn != nil {
return AlreadyConnected
}
b.conn, b.conn_err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)})
if b.conn_err != nil {
return
}

var addr *net.IPAddr
addr, b.conn_err = net.ResolveIPAddr("ip", b.host)
if b.conn_err != nil {
return b.conn_err
}
b.done = make(chan bool)

b.conn, b.conn_err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)})
if b.conn_err != nil {
return b.conn_err
}
// permit a few outstanding requests before we block waiting for responses
b.responses = make(chan responsePromise, 4)

b.done = make(chan bool)
go b.responseReceiver()
}()

// permit a few outstanding requests before we block waiting for responses
b.responses = make(chan responsePromise, 4)
return nil
}

go b.responseReceiver()
// Connected returns true if the broker is connected and false otherwise. If the broker is not
// connected but it had tried to connect, the error from that connection attempt is also returned.
func (b *Broker) Connected() (bool, error) {
b.lock.Lock()
defer b.lock.Unlock()

return nil
return b.conn != nil, b.conn_err
}

func (b *Broker) Close() error {
Expand Down
4 changes: 2 additions & 2 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {

func ExampleBroker() error {
broker := NewBroker("localhost", 9092)
err := broker.Connect()
err := broker.Open()
if err != nil {
return err
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
defer mockBroker.Close()

broker := NewBroker("localhost", mockBroker.Port())
err := broker.Connect()
err := broker.Open()
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 6 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ type Client struct {
// If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
func NewClient(id string, host string, port int32) (client *Client, err error) {
tmp := NewBroker(host, port)
err = tmp.Connect()
err = tmp.Open()
if err != nil {
return nil, err
}
_, err = tmp.Connected()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -227,7 +231,7 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
if client.brokers[broker.ID()] != nil {
go client.brokers[broker.ID()].Close()
}
broker.AsyncConnect()
broker.Open()
client.brokers[broker.ID()] = broker
}

Expand Down
2 changes: 1 addition & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var IncompleteResponse = errors.New("kafka: Response did not contain all the exp
// (meaning one outside of the range [0...numPartitions-1]).
var InvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index.")

// AlreadyConnected is the error returned when calling Connect() on a Broker that is already connected.
// AlreadyConnected is the error returned when calling Open() on a Broker that is already connected.
var AlreadyConnected = errors.New("kafka: broker: already connected")

// NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
Expand Down

0 comments on commit 934c568

Please sign in to comment.