From f724286798ed1ce5bf9cf90102087ff542eb82a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E5=BF=97=E5=AE=87?= Date: Wed, 26 Jun 2019 19:16:14 +0800 Subject: [PATCH 1/2] Fix lingering transports after dmsg.Server disconnects. --- go.sum | 2 ++ pkg/dmsg/client.go | 38 ++++++++++++++++++++++++++------------ pkg/dmsg/server_test.go | 2 +- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/go.sum b/go.sum index f1a8fd67e..d4042cd97 100644 --- a/go.sum +++ b/go.sum @@ -118,6 +118,8 @@ golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/xC2Run6RzeW1SyHxpc= +golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= diff --git a/pkg/dmsg/client.go b/pkg/dmsg/client.go index f05990bea..bed326163 100644 --- a/pkg/dmsg/client.go +++ b/pkg/dmsg/client.go @@ -183,14 +183,11 @@ func (c *ClientConn) Serve(ctx context.Context, accept chan<- *Transport) (err e log := c.log.WithField("remoteServer", c.remoteSrv) log.WithField("connCount", incrementServeCount()).Infoln("ServingConn") defer func() { + c.close(true) log.WithError(err).WithField("connCount", decrementServeCount()).Infoln("ConnectionClosed") c.wg.Done() }() - closeConn := func(log *logrus.Entry) { - log.WithError(c.Close()).Warn("ClosingConnection") - } - for { f, err := readFrame(c.Conn) if err != nil { @@ -224,7 +221,8 @@ func (c *ClientConn) Serve(ctx context.Context, accept chan<- *Transport) (err e if err != nil { log.WithField("remoteClient", initPK).WithError(err).Infoln("Rejected [REQUEST]") if isWriteError(err) || err == ErrClientClosed { - closeConn(log) + err := c.Close() + log.WithError(err).Warn("ClosingConnection") } return } @@ -258,21 +256,37 @@ func (c *ClientConn) DialTransport(ctx context.Context, clientPK cipher.PubKey) return tp, nil } -// Close closes the connection to dms_server. -func (c *ClientConn) Close() error { +// If 'isSrvDisconnect' is set, we don't need to send CLOSE frames or close the underlying TCP connection. +func (c *ClientConn) close(isSrvDisconnect bool) (closed bool) { c.once.Do(func() { + closed = true c.log.WithField("remoteServer", c.remoteSrv).Infoln("ClosingConnection") close(c.done) c.mx.Lock() - for _, tp := range c.tps { - if tp != nil { - go tp.Close() //nolint:errcheck + if isSrvDisconnect { + for _, tp := range c.tps { + if tp != nil { + go tp.close() + } + } + } else { + for _, tp := range c.tps { + if tp != nil { + go tp.Close() //nolint:errcheck + } } + _ = c.Conn.Close() //nolint:errcheck } - _ = c.Conn.Close() //nolint:errcheck c.mx.Unlock() - c.wg.Wait() }) + return closed +} + +// Close closes the connection to dms_server. +func (c *ClientConn) Close() error { + if c.close(false) { + c.wg.Wait() + } return nil } diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index 9438f5234..23bebe76b 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -14,11 +14,11 @@ import ( "testing" "time" + "github.com/skycoin/skycoin/src/util/logging" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/net/nettest" - "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/skywire/internal/noise" "github.com/skycoin/skywire/pkg/cipher" "github.com/skycoin/skywire/pkg/messaging-discovery/client" From cd64bb60b7b8df956d61ed5db56aae1c57ac3921 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E5=BF=97=E5=AE=87?= Date: Wed, 26 Jun 2019 20:25:32 +0800 Subject: [PATCH 2/2] Increase timeout in dmsg tests. --- pkg/dmsg/client.go | 23 +++++++---------------- pkg/dmsg/server_test.go | 3 ++- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/pkg/dmsg/client.go b/pkg/dmsg/client.go index bed326163..0bceec2bb 100644 --- a/pkg/dmsg/client.go +++ b/pkg/dmsg/client.go @@ -183,7 +183,7 @@ func (c *ClientConn) Serve(ctx context.Context, accept chan<- *Transport) (err e log := c.log.WithField("remoteServer", c.remoteSrv) log.WithField("connCount", incrementServeCount()).Infoln("ServingConn") defer func() { - c.close(true) + c.close() log.WithError(err).WithField("connCount", decrementServeCount()).Infoln("ConnectionClosed") c.wg.Done() }() @@ -256,27 +256,18 @@ func (c *ClientConn) DialTransport(ctx context.Context, clientPK cipher.PubKey) return tp, nil } -// If 'isSrvDisconnect' is set, we don't need to send CLOSE frames or close the underlying TCP connection. -func (c *ClientConn) close(isSrvDisconnect bool) (closed bool) { +func (c *ClientConn) close() (closed bool) { c.once.Do(func() { closed = true c.log.WithField("remoteServer", c.remoteSrv).Infoln("ClosingConnection") close(c.done) c.mx.Lock() - if isSrvDisconnect { - for _, tp := range c.tps { - if tp != nil { - go tp.close() - } - } - } else { - for _, tp := range c.tps { - if tp != nil { - go tp.Close() //nolint:errcheck - } + for _, tp := range c.tps { + if tp != nil { + go tp.Close() //nolint:errcheck } - _ = c.Conn.Close() //nolint:errcheck } + _ = c.Conn.Close() //nolint:errcheck c.mx.Unlock() }) return closed @@ -284,7 +275,7 @@ func (c *ClientConn) close(isSrvDisconnect bool) (closed bool) { // Close closes the connection to dms_server. func (c *ClientConn) Close() error { - if c.close(false) { + if c.close() { c.wg.Wait() } return nil diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index 350e8490c..cf270d106 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -909,7 +909,7 @@ func TestServer_Serve(t *testing.T) { } func testReconnect(t *testing.T, randomAddr bool) { - const smallDelay = 100 * time.Millisecond + const smallDelay = time.Second * 5 ctx := context.TODO() serverPK, serverSK := cipher.GenerateKeyPair() @@ -1127,6 +1127,7 @@ func testWithTimeout(timeout time.Duration, run func() error) error { case <-timer.C: return err default: + time.Sleep(time.Millisecond * 5) continue } }