Skip to content

Commit

Permalink
Merge pull request #440 from evanlinjin/bug/fix-lingering-transports-…
Browse files Browse the repository at this point in the history
…after-dmsg-server-disconnect

Fix lingering transports after dmsg.Server disconnects.
  • Loading branch information
志宇 authored Jun 26, 2019
2 parents fe028a2 + cd64bb6 commit c3e882c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
21 changes: 13 additions & 8 deletions pkg/dmsg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,11 @@ func (c *ClientConn) Serve(ctx context.Context, accept chan<- *Transport) (err e
log := c.log.WithField("remoteServer", c.remoteSrv)
log.WithField("connCount", incrementServeCount()).Infoln("ServingConn")
defer func() {
c.close()
log.WithError(err).WithField("connCount", decrementServeCount()).Infoln("ConnectionClosed")
c.wg.Done()
}()

closeConn := func(log *logrus.Entry) {
log.WithError(c.Close()).Warn("ClosingConnection")
}

for {
f, err := readFrame(c.Conn)
if err != nil {
Expand Down Expand Up @@ -224,7 +221,8 @@ func (c *ClientConn) Serve(ctx context.Context, accept chan<- *Transport) (err e
if err != nil {
log.WithField("remoteClient", initPK).WithError(err).Infoln("Rejected [REQUEST]")
if isWriteError(err) || err == ErrClientClosed {
closeConn(log)
err := c.Close()
log.WithError(err).Warn("ClosingConnection")
}
return
}
Expand Down Expand Up @@ -258,9 +256,9 @@ func (c *ClientConn) DialTransport(ctx context.Context, clientPK cipher.PubKey)
return tp, nil
}

// Close closes the connection to dms_server.
func (c *ClientConn) Close() error {
func (c *ClientConn) close() (closed bool) {
c.once.Do(func() {
closed = true
c.log.WithField("remoteServer", c.remoteSrv).Infoln("ClosingConnection")
close(c.done)
c.mx.Lock()
Expand All @@ -271,8 +269,15 @@ func (c *ClientConn) Close() error {
}
_ = c.Conn.Close() //nolint:errcheck
c.mx.Unlock()
c.wg.Wait()
})
return closed
}

// Close closes the connection to dms_server.
func (c *ClientConn) Close() error {
if c.close() {
c.wg.Wait()
}
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/dmsg/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
"testing"
"time"

"github.com/skycoin/skycoin/src/util/logging"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/nettest"

"github.com/skycoin/skycoin/src/util/logging"
"github.com/skycoin/skywire/internal/noise"
"github.com/skycoin/skywire/pkg/cipher"
"github.com/skycoin/skywire/pkg/messaging-discovery/client"
Expand Down Expand Up @@ -909,7 +909,7 @@ func TestServer_Serve(t *testing.T) {
}

func testReconnect(t *testing.T, randomAddr bool) {
const smallDelay = 100 * time.Millisecond
const smallDelay = time.Second * 5
ctx := context.TODO()

serverPK, serverSK := cipher.GenerateKeyPair()
Expand Down Expand Up @@ -1127,6 +1127,7 @@ func testWithTimeout(timeout time.Duration, run func() error) error {
case <-timer.C:
return err
default:
time.Sleep(time.Millisecond * 5)
continue
}
}
Expand Down

0 comments on commit c3e882c

Please sign in to comment.