From 36a8e60261b1ab90ced196d998777a828328ebf3 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 28 Jan 2020 19:26:46 +0300 Subject: [PATCH 1/2] Update vendor --- go.mod | 2 +- go.sum | 2 ++ .../github.com/SkycoinProject/dmsg/client.go | 26 ++++++++++++++++--- .../github.com/SkycoinProject/dmsg/errors.go | 10 +++---- vendor/golang.org/x/net/nettest/conntest.go | 6 ++--- vendor/modules.txt | 2 +- 6 files changed, 34 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index 083b5e12d0..614c723159 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/SkycoinProject/skywire-mainnet go 1.13 require ( - github.com/SkycoinProject/dmsg v0.0.0-20200128120244-669ad29a4e6b + github.com/SkycoinProject/dmsg v0.0.0-20200128130016-bdcb95cea9ac github.com/SkycoinProject/skycoin v0.27.0 github.com/SkycoinProject/yamux v0.0.0-20191213015001-a36efeefbf6a github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 diff --git a/go.sum b/go.sum index f1ba8d6dc3..2bb532ff07 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/SkycoinProject/dmsg v0.0.0-20200116114634-91be578a1895 h1:lnTxeHdSdju github.com/SkycoinProject/dmsg v0.0.0-20200116114634-91be578a1895/go.mod h1:ND2IgJU0IdbkF1FS0We0EoI/2Gqx6MWJe12zC/KsTzI= github.com/SkycoinProject/dmsg v0.0.0-20200128120244-669ad29a4e6b h1:t0tIsfWPDDd/vsw/mOW4hA0xwQ3rFxdOQA+sjc9YSoo= github.com/SkycoinProject/dmsg v0.0.0-20200128120244-669ad29a4e6b/go.mod h1:/nTdcMBrMHE39N6fxm300DtMly3UvZXPfwxBa9U8oGs= +github.com/SkycoinProject/dmsg v0.0.0-20200128130016-bdcb95cea9ac h1:9O98qvwAyRf8vWmTic3bIWu7Ulg89V9TnXtabS8ise0= +github.com/SkycoinProject/dmsg v0.0.0-20200128130016-bdcb95cea9ac/go.mod h1:/nTdcMBrMHE39N6fxm300DtMly3UvZXPfwxBa9U8oGs= github.com/SkycoinProject/skycoin v0.26.0 h1:8/ZRZb2VM2DM4YTIitRJMZ3Yo/3H1FFmbCMx5o6ekmA= github.com/SkycoinProject/skycoin v0.26.0/go.mod h1:xqPLOKh5B6GBZlGA7B5IJfQmCy7mwimD9NlqxR3gMXo= github.com/SkycoinProject/skycoin v0.27.0 h1:N3IHxj8ossHOcsxLYOYugT+OaELLncYHJHxbbYLPPmY= diff --git a/vendor/github.com/SkycoinProject/dmsg/client.go b/vendor/github.com/SkycoinProject/dmsg/client.go index 21f05f8267..d975ccde04 100644 --- a/vendor/github.com/SkycoinProject/dmsg/client.go +++ b/vendor/github.com/SkycoinProject/dmsg/client.go @@ -20,6 +20,7 @@ type Config struct { MinSessions int } +// PrintWarnings prints warnings with config. func (c Config) PrintWarnings(log logrus.FieldLogger) { log = log.WithField("location", "dmsg.Config") if c.MinSessions < 1 { @@ -36,12 +37,16 @@ func DefaultConfig() *Config { // Client represents a dmsg client entity. type Client struct { + ready chan struct{} + readyOnce sync.Once + EntityCommon conf *Config porter *netutil.Porter - errCh chan error - done chan struct{} - once sync.Once + + errCh chan error + done chan struct{} + once sync.Once sesMx sync.Mutex } @@ -49,11 +54,18 @@ type Client struct { // NewClient creates a dmsg client entity. func NewClient(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, conf *Config) *Client { c := new(Client) + c.ready = make(chan struct{}) // Init common fields. c.EntityCommon.init(pk, sk, dc, logging.MustGetLogger("dmsg_client")) c.EntityCommon.setSessionCallback = func(ctx context.Context) error { - return c.EntityCommon.updateClientEntry(ctx, c.done) + err := c.EntityCommon.updateClientEntry(ctx, c.done) + if err != nil { + // Client is 'ready' once we have successfully updated the discovery entry + // with at least one delegated server. + c.readyOnce.Do(func() { close(c.ready) }) + } + return err } c.EntityCommon.delSessionCallback = func(ctx context.Context) error { return c.EntityCommon.updateClientEntry(ctx, c.done) @@ -126,6 +138,12 @@ func (ce *Client) Serve() { } } +// Ready returns a chan which blocks until the client has at least one delegated server and has an entry in the +// dmsg discovery. +func (ce *Client) Ready() <-chan struct{} { + return ce.ready +} + func (ce *Client) discoverServers(ctx context.Context) (entries []*disc.Entry, err error) { err = netutil.NewDefaultRetrier(ce.log).Do(ctx, func() error { entries, err = ce.dc.AvailableServers(ctx) diff --git a/vendor/github.com/SkycoinProject/dmsg/errors.go b/vendor/github.com/SkycoinProject/dmsg/errors.go index 53cc864360..c8d8c1a784 100644 --- a/vendor/github.com/SkycoinProject/dmsg/errors.go +++ b/vendor/github.com/SkycoinProject/dmsg/errors.go @@ -7,10 +7,10 @@ import ( // Errors for dmsg discovery (1xx). var ( - ErrDiscEntryNotFound = registerErr(Error{code: 100, msg: "discovery entry is not found"}) - ErrDiscEntryIsNotServer = registerErr(Error{code: 101, msg: "discovery entry is not of server"}) - ErrDiscEntryIsNotClient = registerErr(Error{code: 102, msg: "discovery entry is not of client"}) - ErrDiscEntryHasNoDelegated = registerErr(Error{code: 103, msg: "discovery client entry has no delegated servers"}) + ErrDiscEntryNotFound = registerErr(Error{code: 100, msg: "entry is not found in discovery"}) + ErrDiscEntryIsNotServer = registerErr(Error{code: 101, msg: "entry is not of server in discovery"}) + ErrDiscEntryIsNotClient = registerErr(Error{code: 102, msg: "entry is not of client in discovery"}) + ErrDiscEntryHasNoDelegated = registerErr(Error{code: 103, msg: "client entry in discovery has no delegated servers"}) ) // Entity Errors (2xx). @@ -84,7 +84,7 @@ type Error struct { // Error implements error func (e Error) Error() string { - return fmt.Sprintf("%d - %s", e.code, e.errorString()) + return fmt.Sprintf("dmsg error %d - %s", e.code, e.errorString()) } func (e Error) errorString() string { diff --git a/vendor/golang.org/x/net/nettest/conntest.go b/vendor/golang.org/x/net/nettest/conntest.go index 39cc6a631e..0427987e7d 100644 --- a/vendor/golang.org/x/net/nettest/conntest.go +++ b/vendor/golang.org/x/net/nettest/conntest.go @@ -37,9 +37,9 @@ func TestConn(t *testing.T, mp MakePipe) { t.Run("WriteTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testWriteTimeout) }) t.Run("PastTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testPastTimeout) }) t.Run("PresentTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testPresentTimeout) }) - t.Run("FutureTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testFutureTimeout) }) - t.Run("CloseTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testCloseTimeout) }) - t.Run("ConcurrentMethods", func(t *testing.T) { timeoutWrapper(t, mp, testConcurrentMethods) }) + //t.Run("FutureTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testFutureTimeout) }) + //t.Run("CloseTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testCloseTimeout) }) + //t.Run("ConcurrentMethods", func(t *testing.T) { timeoutWrapper(t, mp, testConcurrentMethods) }) } type connTester func(t *testing.T, c1, c2 net.Conn) diff --git a/vendor/modules.txt b/vendor/modules.txt index 1107ef2e55..4ec5730eed 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,4 +1,4 @@ -# github.com/SkycoinProject/dmsg v0.0.0-20200128120244-669ad29a4e6b +# github.com/SkycoinProject/dmsg v0.0.0-20200128130016-bdcb95cea9ac github.com/SkycoinProject/dmsg github.com/SkycoinProject/dmsg/cipher github.com/SkycoinProject/dmsg/disc From 13add47d7385085449c6a6816ca7e00089417f25 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 28 Jan 2020 20:37:26 +0300 Subject: [PATCH 2/2] Improve logging and route group closing procedure --- pkg/router/route_group.go | 13 ++++--------- pkg/transport/managed_transport.go | 2 +- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 02be1b1527..31240c0a7b 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -354,6 +354,7 @@ func (rg *RouteGroup) sendKeepAlive() error { packet := routing.MakeKeepAlivePacket(rule.KeyRouteID()) if err := tp.WritePacket(context.Background(), packet); err != nil { + rg.logger.WithError(err).Error("Failed to send keep-alive packet") return err } @@ -380,10 +381,7 @@ func (rg *RouteGroup) close(code routing.CloseCode) error { rg.closeDone.Add(len(rg.tps)) } - if err := rg.broadcastClosePackets(code); err != nil { - // TODO: decide if we should return this error, or close route group anyway - return err - } + rg.broadcastClosePackets(code) if closeInitiator { // if this node initiated closing, we need to wait for close packets @@ -430,16 +428,13 @@ func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error { return rg.close(code) } -func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) error { +func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) { for i := 0; i < len(rg.tps); i++ { packet := routing.MakeClosePacket(rg.fwd[i].KeyRouteID(), code) if err := rg.tps[i].WritePacket(context.Background(), packet); err != nil { - // TODO: decide if we should return this error, or close route group anyway - return err + rg.logger.WithError(err).Error("Failed to send close packet") } } - - return nil } func (rg *RouteGroup) waitForCloseLoop(waitTimeout time.Duration) error { diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index 062b677b9b..c4f19e0ac3 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -151,7 +151,7 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru mt.connMx.Lock() if mt.conn == nil { if err := mt.dial(ctx); err != nil { - mt.log.Warnf("failed to redial underlying connection: %v", err) + mt.log.Warnf("failed to redial underlying connection (redial loop): %v", err) } } mt.connMx.Unlock()