Skip to content

Commit

Permalink
Allow disconnection of nsq hosts or lookupds
Browse files Browse the repository at this point in the history
  • Loading branch information
Asim Aslam committed Oct 29, 2014
1 parent fd5892a commit 1fd1128
Showing 1 changed file with 54 additions and 2 deletions.
56 changes: 54 additions & 2 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func (q *Reader) ConnectToNSQ(addr string) error {
q.onConnectionIOError(c, err)
}
conn.CloseCB = func(c *Conn) {
q.onConnectionClosed(c)
q.onConnectionClosed(c, true)
}

cleanupConnection := func() {
Expand Down Expand Up @@ -519,6 +519,54 @@ func (q *Reader) ConnectToNSQ(addr string) error {
return nil
}

func (q *Reader) DisconnectFromLookupd(addr string) error {
var addrs []string
var seen bool

q.Lock()

for _, x := range q.lookupdHTTPAddrs {
if x == addr {
seen = true
continue
}
addrs = append(addrs, x)
}

if seen {
q.lookupdHTTPAddrs = addrs
num := len(q.lookupdHTTPAddrs)
q.lookupdQueryIndex = (q.lookupdQueryIndex + 1) % num
}

q.Unlock()

return nil
}

func (q *Reader) DisconnectFromNSQ(addr string) error {
q.Lock()

c, ok := q.connections[addr]
_, pOk := q.pendingConnections[addr]

if ok {
c.CloseCB = func(c *Conn) {

This comment has been minimized.

Copy link
@mreiferson

mreiferson Nov 2, 2014

I think this assignment is racey

q.onConnectionClosed(c, false)
}
c.Stop()
delete(q.connections, addr)
}

if pOk {
delete(q.pendingConnections, addr)
}

q.Unlock()

return nil
}

func (q *Reader) onConnectionMessage(c *Conn, msg *Message) {
atomic.AddInt64(&q.totalRdyCount, -1)
atomic.AddUint64(&q.MessagesReceived, 1)
Expand Down Expand Up @@ -565,7 +613,7 @@ func (q *Reader) onConnectionIOError(c *Conn, err error) {
c.Stop()
}

func (q *Reader) onConnectionClosed(c *Conn) {
func (q *Reader) onConnectionClosed(c *Conn, reconnect bool) {
var hasRDYRetryTimer bool

// remove this connections RDY count from the reader's total
Expand Down Expand Up @@ -615,6 +663,10 @@ func (q *Reader) onConnectionClosed(c *Conn) {
}
} else if numLookupd == 0 && atomic.LoadInt32(&q.stopFlag) == 0 {
// there are no lookupd, try to reconnect after a bit
if !reconnect {
return
}

go func(addr string) {
for {
log.Debugf("[%s] re-connecting in 15 seconds...", addr)
Expand Down

2 comments on commit 1fd1128

@mreiferson
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than the potential race condition, this seems OK.

But 😁 - I really think we should try to get onto the latest version of go-nsq before making functional changes like this.

In the newer go-nsq codebase, I think the approach we should take is to maintain a list of nsqd addresses like we maintain for nsqlookupd - and then we won't need to add the reconnect flag because we'll be able to remove the nsqd address from the list, close the connection, and the "reconnect logic" in the close handler will check our list to see if it's still there, otherwise drop it on the floor. This also means we don't have the potential racey overwrite of the CloseCB.

I can whip up a PR that adds this functionality on top of go-nsq master taking this approach.

Thoughts?

@mreiferson
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pssst nsqio#91

Please sign in to comment.