diff --git a/cmd/skywire-cli/commands/visor/root.go b/cmd/skywire-cli/commands/visor/root.go index 87825fd8e6..5caa98ea44 100644 --- a/cmd/skywire-cli/commands/visor/root.go +++ b/cmd/skywire-cli/commands/visor/root.go @@ -4,8 +4,6 @@ import ( "net" "time" - "github.com/SkycoinProject/skywire-mainnet/pkg/skyenv" - "github.com/SkycoinProject/skycoin/src/util/logging" "github.com/spf13/cobra" @@ -27,17 +25,11 @@ var RootCmd = &cobra.Command{ } func rpcClient() visor.RPCClient { + const rpcDialTimeout = time.Second * 5 + conn, err := net.DialTimeout("tcp", rpcAddr, rpcDialTimeout) if err != nil { logger.Fatal("RPC connection failed:", err) } - if err := conn.SetDeadline(time.Now().Add(rpcConnDuration)); err != nil { - logger.Fatal("RPC connection failed:", err) - } - return visor.NewRPCClient(logger, conn, visor.RPCPrefix, skyenv.DefaultRPCTimeout) + return visor.NewRPCClient(logger, conn, visor.RPCPrefix, 0) } - -const ( - rpcDialTimeout = time.Second * 5 - rpcConnDuration = time.Second * 60 -) diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 11848023c8..0bcf1a67ac 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -216,45 +216,45 @@ func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, tpTy return nil, io.ErrClosedPipe } - const tries = 2 - - var err error - for i := 0; i < tries; i++ { + for { mTp, err := tm.saveTransport(remote, tpType) if err != nil { return nil, err } if err = mTp.Dial(ctx); err != nil { - // TODO(nkryuchkov): Check for an error that underlying connection is not established - // and try again in this case. Otherwise, return the error. - pkTableErr := fmt.Sprintf("pk table: entry of %s does not exist", remote.String()) - - if err.Error() == pkTableErr { - mTp.wg.Wait() + // This occurs when an old tp is returned by 'tm.saveTransport', meaning a tp of the same transport ID was + // just deleted (and has not yet fully closed). Hence, we should close and delete the old tp and try again. + if err == ErrNotServing { + if closeErr := mTp.Close(); closeErr != nil { + tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.") + } delete(tm.tps, mTp.Entry.ID) - - return nil, err + continue } - if err == ErrNotServing { - mTp.wg.Wait() + // This occurs when the tp type is STCP and the requested remote PK is not associated with an IP address in + // the STCP table. There is no point in retrying as a connection would be impossible, so we just return an + // error. + if isSTCPTableError(remote, err) { + if closeErr := mTp.Close(); closeErr != nil { + tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.") + } delete(tm.tps, mTp.Entry.ID) - continue + return nil, err } - tm.Logger. - WithError(err). - Warn("Underlying connection is not yet established. Will retry later.") + tm.Logger.WithError(err).Warn("Underlying transport connection is not established, will retry later.") } + return mTp, nil } +} - tm.Logger. - WithError(err). - WithField("tries", tries). - Error("Failed to serve managed transport. This is unexpected.") - return nil, err +// isSTCPPKError returns true if the error is a STCP table error. +// This occurs the requested remote public key does not exist in the STCP table. +func isSTCPTableError(remotePK cipher.PubKey, err error) bool { + return err.Error() == fmt.Sprintf("pk table: entry of %s does not exist", remotePK.String()) } func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*ManagedTransport, error) { @@ -263,13 +263,12 @@ func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*Managed } tpID := tm.tpIDFromPK(remote, netName) - tm.Logger.Debugf("Initializing TP with ID %s", tpID) - tp, ok := tm.tps[tpID] + oldMTp, ok := tm.tps[tpID] if ok { - tm.Logger.Debugln("Got TP from map") - return tp, nil + tm.Logger.Debug("Found an old mTp from internal map.") + return oldMTp, nil } mTp := NewManagedTransport(tm.n, tm.Conf.DiscoveryClient, tm.Conf.LogStore, remote, netName) diff --git a/pkg/visor/init.go b/pkg/visor/init.go index 01eafcbf3c..1833c16efb 100644 --- a/pkg/visor/init.go +++ b/pkg/visor/init.go @@ -93,6 +93,18 @@ func initSNet(v *Visor) bool { return report(n.Close()) }) + if dmsgC := n.Dmsg(); dmsgC != nil { + const dmsgTimeout = time.Second * 20 + log := dmsgC.Logger().WithField("timeout", dmsgTimeout) + log.Info("Connecting to the dmsg network...") + select { + case <-time.After(dmsgTimeout): + log.Warn("Failed to connect to the dmsg network, will try again later.") + case <-n.Dmsg().Ready(): + log.Info("Connected to the dmsg network.") + } + } + v.net = n return report(nil) } diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 4c0492ae6c..6869424868 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -1,6 +1,7 @@ package visor import ( + "context" "encoding/binary" "errors" "fmt" @@ -98,17 +99,21 @@ func NewRPCClient(log logrus.FieldLogger, conn io.ReadWriteCloser, prefix string // Call calls the internal rpc.Client with the serviceMethod arg prefixed. func (rc *rpcClient) Call(method string, args, reply interface{}) error { - timer := time.NewTimer(rc.timeout) - defer timer.Stop() + ctx := context.Background() + if rc.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(rc.timeout)) + defer cancel() + } select { case call := <-rc.client.Go(rc.prefix+"."+method, args, reply, nil).Done: return call.Error - case <-timer.C: + case <-ctx.Done(): if err := rc.conn.Close(); err != nil { - rc.log.WithError(err).Warn("failed to close underlying rpc connection after timeout error") + rc.log.WithError(err).Warn("Failed to close rpc client after timeout error.") } - return ErrTimeout + return ctx.Err() } }