From a86bb72d4d63f0f0a7c211385f4e5848c96e57e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 27 May 2020 08:12:05 +1200 Subject: [PATCH 1/3] Wait for dmsg network during visor startup. Former-commit-id: 7e2dc121c85b9905d740ea1db69f8962a272e705 --- pkg/visor/init.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 31e4a53919..1f9fb2be33 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -61,6 +61,18 @@ func initSNet(v *Visor) bool { return report(n.Close()) }) + if dmsgC := n.Dmsg(); dmsgC != nil { + const dmsgTimeout = time.Second * 20 + log := dmsgC.Logger().WithField("timeout", dmsgTimeout) + log.Info("Connecting to the dmsg network...") + select { + case <-time.After(dmsgTimeout): + log.Warn("Failed to connect to the dmsg network, will try again later.") + case <-n.Dmsg().Ready(): + log.Info("Connected to the dmsg network.") + } + } + v.net = n return report(nil) } From dd65c06c95bbd8e4dbf6191822e4dddd775ecaf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Wed, 27 May 2020 09:50:06 +1200 Subject: [PATCH 2/3] Fix transport manager hangs and various improvements (#375). * Restructured (transport.Manager).SaveTransport to improve readability and ensure we always close the managed transport before calling mTp.wg.Wait(). In fact, we are now using mTp.Close() directly (which does both). * Visor's RPC client timeout is now optional. Setting a value of 0 duration will result in no timeout. * Disabled visor RPC client timeout for visor CLI. Former-commit-id: 2b7cb63fea3d5070bcd079d52bf5f6d065cacfff --- cmd/skywire-cli/commands/visor/root.go | 14 ++---- pkg/transport/manager.go | 69 ++++++++++++-------------- pkg/visor/rpc_client.go | 15 ++++-- 3 files changed, 46 insertions(+), 52 deletions(-) diff --git a/cmd/skywire-cli/commands/visor/root.go b/cmd/skywire-cli/commands/visor/root.go index 87825fd8e6..5caa98ea44 100644 --- a/cmd/skywire-cli/commands/visor/root.go +++ b/cmd/skywire-cli/commands/visor/root.go @@ -4,8 +4,6 @@ import ( "net" "time" - "github.com/SkycoinProject/skywire-mainnet/pkg/skyenv" - "github.com/SkycoinProject/skycoin/src/util/logging" "github.com/spf13/cobra" @@ -27,17 +25,11 @@ var RootCmd = &cobra.Command{ } func rpcClient() visor.RPCClient { + const rpcDialTimeout = time.Second * 5 + conn, err := net.DialTimeout("tcp", rpcAddr, rpcDialTimeout) if err != nil { logger.Fatal("RPC connection failed:", err) } - if err := conn.SetDeadline(time.Now().Add(rpcConnDuration)); err != nil { - logger.Fatal("RPC connection failed:", err) - } - return visor.NewRPCClient(logger, conn, visor.RPCPrefix, skyenv.DefaultRPCTimeout) + return visor.NewRPCClient(logger, conn, visor.RPCPrefix, 0) } - -const ( - rpcDialTimeout = time.Second * 5 - rpcConnDuration = time.Second * 60 -) diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 11848023c8..42302d088c 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -216,45 +216,43 @@ func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, tpTy return nil, io.ErrClosedPipe } - const tries = 2 - - var err error - for i := 0; i < tries; i++ { - mTp, err := tm.saveTransport(remote, tpType) - if err != nil { - return nil, err - } - - if err = mTp.Dial(ctx); err != nil { - // TODO(nkryuchkov): Check for an error that underlying connection is not established - // and try again in this case. Otherwise, return the error. - pkTableErr := fmt.Sprintf("pk table: entry of %s does not exist", remote.String()) - - if err.Error() == pkTableErr { - mTp.wg.Wait() - delete(tm.tps, mTp.Entry.ID) +TrySaveTp: + mTp, err := tm.saveTransport(remote, tpType) + if err != nil { + return nil, err + } - return nil, err + if err = mTp.Dial(ctx); err != nil { + // This occurs when an old tp is returned by 'tm.saveTransport', meaning a tp of the same transport ID was just + // deleted (and has not yet fully closed). Hence, we should close and delete the old tp and try again. + if err == ErrNotServing { + if closeErr := mTp.Close(); closeErr != nil { + tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.") } + delete(tm.tps, mTp.Entry.ID) + goto TrySaveTp + } - if err == ErrNotServing { - mTp.wg.Wait() - delete(tm.tps, mTp.Entry.ID) - continue + // This occurs when the tp type is STCP and the requested remote PK is not associated with an IP address in the + // STCP table. There is no point in retrying as a connection would be impossible, so we just return an error. + if isSTCPTableError(remote, err) { + if closeErr := mTp.Close(); closeErr != nil { + tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.") } - - tm.Logger. - WithError(err). - Warn("Underlying connection is not yet established. Will retry later.") + delete(tm.tps, mTp.Entry.ID) + return nil, err } - return mTp, nil + + tm.Logger.WithError(err).Warn("Underlying transport connection is not established, will retry later.") } - tm.Logger. - WithError(err). - WithField("tries", tries). - Error("Failed to serve managed transport. This is unexpected.") - return nil, err + return mTp, nil +} + +// isSTCPPKError returns true if the error is a STCP table error. +// This occurs the requested remote public key does not exist in the STCP table. +func isSTCPTableError(remotePK cipher.PubKey, err error) bool { + return err.Error() == fmt.Sprintf("pk table: entry of %s does not exist", remotePK.String()) } func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*ManagedTransport, error) { @@ -263,13 +261,12 @@ func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*Managed } tpID := tm.tpIDFromPK(remote, netName) - tm.Logger.Debugf("Initializing TP with ID %s", tpID) - tp, ok := tm.tps[tpID] + oldMTp, ok := tm.tps[tpID] if ok { - tm.Logger.Debugln("Got TP from map") - return tp, nil + tm.Logger.Debug("Found an old mTp from internal map.") + return oldMTp, nil } mTp := NewManagedTransport(tm.n, tm.Conf.DiscoveryClient, tm.Conf.LogStore, remote, netName) diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 4c0492ae6c..6869424868 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -1,6 +1,7 @@ package visor import ( + "context" "encoding/binary" "errors" "fmt" @@ -98,17 +99,21 @@ func NewRPCClient(log logrus.FieldLogger, conn io.ReadWriteCloser, prefix string // Call calls the internal rpc.Client with the serviceMethod arg prefixed. func (rc *rpcClient) Call(method string, args, reply interface{}) error { - timer := time.NewTimer(rc.timeout) - defer timer.Stop() + ctx := context.Background() + if rc.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(rc.timeout)) + defer cancel() + } select { case call := <-rc.client.Go(rc.prefix+"."+method, args, reply, nil).Done: return call.Error - case <-timer.C: + case <-ctx.Done(): if err := rc.conn.Close(); err != nil { - rc.log.WithError(err).Warn("failed to close underlying rpc connection after timeout error") + rc.log.WithError(err).Warn("Failed to close rpc client after timeout error.") } - return ErrTimeout + return ctx.Err() } } From 54916a449db6199efa94dbb40ca96492d8bf2ca4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 28 May 2020 02:16:08 +1200 Subject: [PATCH 3/3] Changes as suggested by @Darkren and @nkryuchkov * Replaced goto statement with for loop in transport.Manager Former-commit-id: c84dbe7e9210da76659167c2b69a10cddccb80cb --- pkg/transport/manager.go | 50 +++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 42302d088c..0bcf1a67ac 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -216,37 +216,39 @@ func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, tpTy return nil, io.ErrClosedPipe } -TrySaveTp: - mTp, err := tm.saveTransport(remote, tpType) - if err != nil { - return nil, err - } + for { + mTp, err := tm.saveTransport(remote, tpType) + if err != nil { + return nil, err + } - if err = mTp.Dial(ctx); err != nil { - // This occurs when an old tp is returned by 'tm.saveTransport', meaning a tp of the same transport ID was just - // deleted (and has not yet fully closed). Hence, we should close and delete the old tp and try again. - if err == ErrNotServing { - if closeErr := mTp.Close(); closeErr != nil { - tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.") + if err = mTp.Dial(ctx); err != nil { + // This occurs when an old tp is returned by 'tm.saveTransport', meaning a tp of the same transport ID was + // just deleted (and has not yet fully closed). Hence, we should close and delete the old tp and try again. + if err == ErrNotServing { + if closeErr := mTp.Close(); closeErr != nil { + tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.") + } + delete(tm.tps, mTp.Entry.ID) + continue } - delete(tm.tps, mTp.Entry.ID) - goto TrySaveTp - } - // This occurs when the tp type is STCP and the requested remote PK is not associated with an IP address in the - // STCP table. There is no point in retrying as a connection would be impossible, so we just return an error. - if isSTCPTableError(remote, err) { - if closeErr := mTp.Close(); closeErr != nil { - tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.") + // This occurs when the tp type is STCP and the requested remote PK is not associated with an IP address in + // the STCP table. There is no point in retrying as a connection would be impossible, so we just return an + // error. + if isSTCPTableError(remote, err) { + if closeErr := mTp.Close(); closeErr != nil { + tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.") + } + delete(tm.tps, mTp.Entry.ID) + return nil, err } - delete(tm.tps, mTp.Entry.ID) - return nil, err + + tm.Logger.WithError(err).Warn("Underlying transport connection is not established, will retry later.") } - tm.Logger.WithError(err).Warn("Underlying transport connection is not established, will retry later.") + return mTp, nil } - - return mTp, nil } // isSTCPPKError returns true if the error is a STCP table error.