Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Nov 5, 2014
1 parent e97e872 commit ebdf26c
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,10 +529,10 @@ func indexOf(n string, h []string) int {

func (r *Consumer) DisconnectFromNSQD(addr string) error {
r.mtx.Lock()
defer r.mtx.Unlock()

idx := indexOf(addr, r.nsqdTCPAddrs)
if idx == -1 {
r.mtx.Unlock()
return errors.New(fmt.Sprintf("unknown nsqd TCP address %s", addr))
}

Expand All @@ -542,8 +542,6 @@ func (r *Consumer) DisconnectFromNSQD(addr string) error {
pendingConn, pendingOk := r.pendingConnections[addr]
conn, ok := r.connections[addr]

r.mtx.Unlock()

if ok {
conn.Close()
} else if pendingOk {
Expand All @@ -553,6 +551,24 @@ func (r *Consumer) DisconnectFromNSQD(addr string) error {
return nil
}

func (r *Consumer) DisconnectFromNSQLookupd(addr string) error {
r.mtx.Lock()
defer r.mtx.Unlock()

idx := indexOf(addr, r.lookupdHTTPAddrs)
if idx == -1 {
return errors.New(fmt.Sprintf("unknown nsqlookupd HTTP address %s", addr))
}

if len(r.lookupdHTTPAddrs) == 1 {
return errors.New(fmt.Sprintf("cannot disconnect from only remaining nsqlookupd HTTP address %s", addr))
}

r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs[:idx], r.lookupdHTTPAddrs[idx+1:]...)

return nil
}

func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
atomic.AddInt64(&r.totalRdyCount, -1)
atomic.AddUint64(&r.messagesReceived, 1)
Expand Down Expand Up @@ -730,6 +746,13 @@ func (r *Consumer) onConnClose(c *Conn) {
if atomic.LoadInt32(&r.stopFlag) == 1 {
break
}
r.mtx.RLock()
reconnect := indexOf(addr, r.nsqdTCPAddrs) >= 0
r.mtx.RUnlock()
if !reconnect {
r.log(LogLevelWarning, "(%s) skipped reconnect after removal...", addr)
return
}
err := r.ConnectToNSQD(addr)
if err != nil && err != ErrAlreadyConnected {
r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
Expand Down

0 comments on commit ebdf26c

Please sign in to comment.