Skip to content

Commit

Permalink
Fix transport manager hangs and various improvements (#375).
Browse files Browse the repository at this point in the history
* Restructured (transport.Manager).SaveTransport to improve readability and ensure we always close the managed transport before calling mTp.wg.Wait(). In fact, we are now using mTp.Close() directly (which does both).
* Visor's RPC client timeout is now optional. Setting a value of 0 duration will result in no timeout.
* Disabled visor RPC client timeout for visor CLI.
  • Loading branch information
evanlinjin committed May 26, 2020
1 parent 6e19a66 commit 2b7cb63
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 52 deletions.
14 changes: 3 additions & 11 deletions cmd/skywire-cli/commands/visor/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"net"
"time"

"github.com/SkycoinProject/skywire-mainnet/pkg/skyenv"

"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/spf13/cobra"

Expand All @@ -27,17 +25,11 @@ var RootCmd = &cobra.Command{
}

func rpcClient() visor.RPCClient {
const rpcDialTimeout = time.Second * 5

conn, err := net.DialTimeout("tcp", rpcAddr, rpcDialTimeout)
if err != nil {
logger.Fatal("RPC connection failed:", err)
}
if err := conn.SetDeadline(time.Now().Add(rpcConnDuration)); err != nil {
logger.Fatal("RPC connection failed:", err)
}
return visor.NewRPCClient(logger, conn, visor.RPCPrefix, skyenv.DefaultRPCTimeout)
return visor.NewRPCClient(logger, conn, visor.RPCPrefix, 0)
}

const (
rpcDialTimeout = time.Second * 5
rpcConnDuration = time.Second * 60
)
69 changes: 33 additions & 36 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,45 +216,43 @@ func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, tpTy
return nil, io.ErrClosedPipe
}

const tries = 2

var err error
for i := 0; i < tries; i++ {
mTp, err := tm.saveTransport(remote, tpType)
if err != nil {
return nil, err
}

if err = mTp.Dial(ctx); err != nil {
// TODO(nkryuchkov): Check for an error that underlying connection is not established
// and try again in this case. Otherwise, return the error.
pkTableErr := fmt.Sprintf("pk table: entry of %s does not exist", remote.String())

if err.Error() == pkTableErr {
mTp.wg.Wait()
delete(tm.tps, mTp.Entry.ID)
TrySaveTp:
mTp, err := tm.saveTransport(remote, tpType)
if err != nil {
return nil, err
}

return nil, err
if err = mTp.Dial(ctx); err != nil {
// This occurs when an old tp is returned by 'tm.saveTransport', meaning a tp of the same transport ID was just
// deleted (and has not yet fully closed). Hence, we should close and delete the old tp and try again.
if err == ErrNotServing {
if closeErr := mTp.Close(); closeErr != nil {
tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.")
}
delete(tm.tps, mTp.Entry.ID)
goto TrySaveTp
}

if err == ErrNotServing {
mTp.wg.Wait()
delete(tm.tps, mTp.Entry.ID)
continue
// This occurs when the tp type is STCP and the requested remote PK is not associated with an IP address in the
// STCP table. There is no point in retrying as a connection would be impossible, so we just return an error.
if isSTCPTableError(remote, err) {
if closeErr := mTp.Close(); closeErr != nil {
tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.")
}

tm.Logger.
WithError(err).
Warn("Underlying connection is not yet established. Will retry later.")
delete(tm.tps, mTp.Entry.ID)
return nil, err
}
return mTp, nil

tm.Logger.WithError(err).Warn("Underlying transport connection is not established, will retry later.")
}

tm.Logger.
WithError(err).
WithField("tries", tries).
Error("Failed to serve managed transport. This is unexpected.")
return nil, err
return mTp, nil
}

// isSTCPPKError returns true if the error is a STCP table error.
// This occurs the requested remote public key does not exist in the STCP table.
func isSTCPTableError(remotePK cipher.PubKey, err error) bool {
return err.Error() == fmt.Sprintf("pk table: entry of %s does not exist", remotePK.String())
}

func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*ManagedTransport, error) {
Expand All @@ -263,13 +261,12 @@ func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*Managed
}

tpID := tm.tpIDFromPK(remote, netName)

tm.Logger.Debugf("Initializing TP with ID %s", tpID)

tp, ok := tm.tps[tpID]
oldMTp, ok := tm.tps[tpID]
if ok {
tm.Logger.Debugln("Got TP from map")
return tp, nil
tm.Logger.Debug("Found an old mTp from internal map.")
return oldMTp, nil
}

mTp := NewManagedTransport(tm.n, tm.Conf.DiscoveryClient, tm.Conf.LogStore, remote, netName)
Expand Down
15 changes: 10 additions & 5 deletions pkg/visor/rpc_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package visor

import (
"context"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -98,17 +99,21 @@ func NewRPCClient(log logrus.FieldLogger, conn io.ReadWriteCloser, prefix string

// Call calls the internal rpc.Client with the serviceMethod arg prefixed.
func (rc *rpcClient) Call(method string, args, reply interface{}) error {
timer := time.NewTimer(rc.timeout)
defer timer.Stop()
ctx := context.Background()
if rc.timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(rc.timeout))
defer cancel()
}

select {
case call := <-rc.client.Go(rc.prefix+"."+method, args, reply, nil).Done:
return call.Error
case <-timer.C:
case <-ctx.Done():
if err := rc.conn.Close(); err != nil {
rc.log.WithError(err).Warn("failed to close underlying rpc connection after timeout error")
rc.log.WithError(err).Warn("Failed to close rpc client after timeout error.")
}
return ErrTimeout
return ctx.Err()
}
}

Expand Down

0 comments on commit 2b7cb63

Please sign in to comment.