Skip to content

Commit

Permalink
Merge pull request #89 from mreiferson/debug_89
Browse files Browse the repository at this point in the history
various improvements
  • Loading branch information
jehiah committed Nov 7, 2014
2 parents 68ce738 + b7d7e92 commit 6e0548d
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ func (c *Conn) auth(secret string) error {
}

func (c *Conn) readLoop() {
delegate := &connMessageDelegate{c}
for {
if atomic.LoadInt32(&c.closeFlag) == 1 {
goto exit
Expand Down Expand Up @@ -494,7 +495,7 @@ func (c *Conn) readLoop() {
c.delegate.OnIOError(c, err)
goto exit
}
msg.Delegate = &connMessageDelegate{c}
msg.Delegate = delegate

atomic.AddInt64(&c.rdyCount, -1)
atomic.AddInt64(&c.messagesInFlight, 1)
Expand Down Expand Up @@ -620,6 +621,7 @@ func (c *Conn) close() {
func (c *Conn) cleanup() {
<-c.drainReady
ticker := time.NewTicker(100 * time.Millisecond)
lastWarning := time.Now()
// writeLoop has exited, drain any remaining in flight messages
for {
// we're racing with readLoop which potentially has a message
Expand All @@ -633,13 +635,19 @@ func (c *Conn) cleanup() {
msgsInFlight = atomic.LoadInt64(&c.messagesInFlight)
}
if msgsInFlight > 0 {
c.log(LogLevelWarning, "draining... waiting for %d messages in flight", msgsInFlight)
if time.Now().Sub(lastWarning) > time.Second {
c.log(LogLevelWarning, "draining... waiting for %d messages in flight", msgsInFlight)
lastWarning = time.Now()
}
continue
}
// until the readLoop has exited we cannot be sure that there
// still won't be a race
if atomic.LoadInt32(&c.readLoopRunning) == 1 {
c.log(LogLevelWarning, "draining... readLoop still running")
if time.Now().Sub(lastWarning) > time.Second {
c.log(LogLevelWarning, "draining... readLoop still running")
lastWarning = time.Now()
}
continue
}
goto exit
Expand Down

0 comments on commit 6e0548d

Please sign in to comment.