diff --git a/cmd/skywire-cli/commands/visor/root.go b/cmd/skywire-cli/commands/visor/root.go index 87825fd8e6..5caa98ea44 100644 --- a/cmd/skywire-cli/commands/visor/root.go +++ b/cmd/skywire-cli/commands/visor/root.go @@ -4,8 +4,6 @@ import ( "net" "time" - "github.com/SkycoinProject/skywire-mainnet/pkg/skyenv" - "github.com/SkycoinProject/skycoin/src/util/logging" "github.com/spf13/cobra" @@ -27,17 +25,11 @@ var RootCmd = &cobra.Command{ } func rpcClient() visor.RPCClient { + const rpcDialTimeout = time.Second * 5 + conn, err := net.DialTimeout("tcp", rpcAddr, rpcDialTimeout) if err != nil { logger.Fatal("RPC connection failed:", err) } - if err := conn.SetDeadline(time.Now().Add(rpcConnDuration)); err != nil { - logger.Fatal("RPC connection failed:", err) - } - return visor.NewRPCClient(logger, conn, visor.RPCPrefix, skyenv.DefaultRPCTimeout) + return visor.NewRPCClient(logger, conn, visor.RPCPrefix, 0) } - -const ( - rpcDialTimeout = time.Second * 5 - rpcConnDuration = time.Second * 60 -) diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 11848023c8..42302d088c 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -216,45 +216,43 @@ func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, tpTy return nil, io.ErrClosedPipe } - const tries = 2 - - var err error - for i := 0; i < tries; i++ { - mTp, err := tm.saveTransport(remote, tpType) - if err != nil { - return nil, err - } - - if err = mTp.Dial(ctx); err != nil { - // TODO(nkryuchkov): Check for an error that underlying connection is not established - // and try again in this case. Otherwise, return the error. - pkTableErr := fmt.Sprintf("pk table: entry of %s does not exist", remote.String()) - - if err.Error() == pkTableErr { - mTp.wg.Wait() - delete(tm.tps, mTp.Entry.ID) +TrySaveTp: + mTp, err := tm.saveTransport(remote, tpType) + if err != nil { + return nil, err + } - return nil, err + if err = mTp.Dial(ctx); err != nil { + // This occurs when an old tp is returned by 'tm.saveTransport', meaning a tp of the same transport ID was just + // deleted (and has not yet fully closed). Hence, we should close and delete the old tp and try again. + if err == ErrNotServing { + if closeErr := mTp.Close(); closeErr != nil { + tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.") } + delete(tm.tps, mTp.Entry.ID) + goto TrySaveTp + } - if err == ErrNotServing { - mTp.wg.Wait() - delete(tm.tps, mTp.Entry.ID) - continue + // This occurs when the tp type is STCP and the requested remote PK is not associated with an IP address in the + // STCP table. There is no point in retrying as a connection would be impossible, so we just return an error. + if isSTCPTableError(remote, err) { + if closeErr := mTp.Close(); closeErr != nil { + tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.") } - - tm.Logger. - WithError(err). - Warn("Underlying connection is not yet established. Will retry later.") + delete(tm.tps, mTp.Entry.ID) + return nil, err } - return mTp, nil + + tm.Logger.WithError(err).Warn("Underlying transport connection is not established, will retry later.") } - tm.Logger. - WithError(err). - WithField("tries", tries). - Error("Failed to serve managed transport. This is unexpected.") - return nil, err + return mTp, nil +} + +// isSTCPPKError returns true if the error is a STCP table error. +// This occurs the requested remote public key does not exist in the STCP table. +func isSTCPTableError(remotePK cipher.PubKey, err error) bool { + return err.Error() == fmt.Sprintf("pk table: entry of %s does not exist", remotePK.String()) } func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*ManagedTransport, error) { @@ -263,13 +261,12 @@ func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*Managed } tpID := tm.tpIDFromPK(remote, netName) - tm.Logger.Debugf("Initializing TP with ID %s", tpID) - tp, ok := tm.tps[tpID] + oldMTp, ok := tm.tps[tpID] if ok { - tm.Logger.Debugln("Got TP from map") - return tp, nil + tm.Logger.Debug("Found an old mTp from internal map.") + return oldMTp, nil } mTp := NewManagedTransport(tm.n, tm.Conf.DiscoveryClient, tm.Conf.LogStore, remote, netName) diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 4c0492ae6c..6869424868 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -1,6 +1,7 @@ package visor import ( + "context" "encoding/binary" "errors" "fmt" @@ -98,17 +99,21 @@ func NewRPCClient(log logrus.FieldLogger, conn io.ReadWriteCloser, prefix string // Call calls the internal rpc.Client with the serviceMethod arg prefixed. func (rc *rpcClient) Call(method string, args, reply interface{}) error { - timer := time.NewTimer(rc.timeout) - defer timer.Stop() + ctx := context.Background() + if rc.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithDeadline(ctx, time.Now().Add(rc.timeout)) + defer cancel() + } select { case call := <-rc.client.Go(rc.prefix+"."+method, args, reply, nil).Done: return call.Error - case <-timer.C: + case <-ctx.Done(): if err := rc.conn.Close(); err != nil { - rc.log.WithError(err).Warn("failed to close underlying rpc connection after timeout error") + rc.log.WithError(err).Warn("Failed to close rpc client after timeout error.") } - return ErrTimeout + return ctx.Err() } }