Skip to content

Commit

Permalink
Merge pull request #376 from SkycoinProject/fix/tpm-hangs-#375
Browse files Browse the repository at this point in the history
Fix transport manager hangs and various improvements.

Former-commit-id: 84fd905
  • Loading branch information
evanlinjin authored May 27, 2020
2 parents f08b046 + 54916a4 commit 249ae70
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 43 deletions.
14 changes: 3 additions & 11 deletions cmd/skywire-cli/commands/visor/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"net"
"time"

"github.com/SkycoinProject/skywire-mainnet/pkg/skyenv"

"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/spf13/cobra"

Expand All @@ -27,17 +25,11 @@ var RootCmd = &cobra.Command{
}

func rpcClient() visor.RPCClient {
const rpcDialTimeout = time.Second * 5

conn, err := net.DialTimeout("tcp", rpcAddr, rpcDialTimeout)
if err != nil {
logger.Fatal("RPC connection failed:", err)
}
if err := conn.SetDeadline(time.Now().Add(rpcConnDuration)); err != nil {
logger.Fatal("RPC connection failed:", err)
}
return visor.NewRPCClient(logger, conn, visor.RPCPrefix, skyenv.DefaultRPCTimeout)
return visor.NewRPCClient(logger, conn, visor.RPCPrefix, 0)
}

const (
rpcDialTimeout = time.Second * 5
rpcConnDuration = time.Second * 60
)
53 changes: 26 additions & 27 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,45 +216,45 @@ func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, tpTy
return nil, io.ErrClosedPipe
}

const tries = 2

var err error
for i := 0; i < tries; i++ {
for {
mTp, err := tm.saveTransport(remote, tpType)
if err != nil {
return nil, err
}

if err = mTp.Dial(ctx); err != nil {
// TODO(nkryuchkov): Check for an error that underlying connection is not established
// and try again in this case. Otherwise, return the error.
pkTableErr := fmt.Sprintf("pk table: entry of %s does not exist", remote.String())

if err.Error() == pkTableErr {
mTp.wg.Wait()
// This occurs when an old tp is returned by 'tm.saveTransport', meaning a tp of the same transport ID was
// just deleted (and has not yet fully closed). Hence, we should close and delete the old tp and try again.
if err == ErrNotServing {
if closeErr := mTp.Close(); closeErr != nil {
tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.")
}
delete(tm.tps, mTp.Entry.ID)

return nil, err
continue
}

if err == ErrNotServing {
mTp.wg.Wait()
// This occurs when the tp type is STCP and the requested remote PK is not associated with an IP address in
// the STCP table. There is no point in retrying as a connection would be impossible, so we just return an
// error.
if isSTCPTableError(remote, err) {
if closeErr := mTp.Close(); closeErr != nil {
tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.")
}
delete(tm.tps, mTp.Entry.ID)
continue
return nil, err
}

tm.Logger.
WithError(err).
Warn("Underlying connection is not yet established. Will retry later.")
tm.Logger.WithError(err).Warn("Underlying transport connection is not established, will retry later.")
}

return mTp, nil
}
}

tm.Logger.
WithError(err).
WithField("tries", tries).
Error("Failed to serve managed transport. This is unexpected.")
return nil, err
// isSTCPPKError returns true if the error is a STCP table error.
// This occurs the requested remote public key does not exist in the STCP table.
func isSTCPTableError(remotePK cipher.PubKey, err error) bool {
return err.Error() == fmt.Sprintf("pk table: entry of %s does not exist", remotePK.String())
}

func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*ManagedTransport, error) {
Expand All @@ -263,13 +263,12 @@ func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*Managed
}

tpID := tm.tpIDFromPK(remote, netName)

tm.Logger.Debugf("Initializing TP with ID %s", tpID)

tp, ok := tm.tps[tpID]
oldMTp, ok := tm.tps[tpID]
if ok {
tm.Logger.Debugln("Got TP from map")
return tp, nil
tm.Logger.Debug("Found an old mTp from internal map.")
return oldMTp, nil
}

mTp := NewManagedTransport(tm.n, tm.Conf.DiscoveryClient, tm.Conf.LogStore, remote, netName)
Expand Down
12 changes: 12 additions & 0 deletions pkg/visor/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ func initSNet(v *Visor) bool {
return report(n.Close())
})

if dmsgC := n.Dmsg(); dmsgC != nil {
const dmsgTimeout = time.Second * 20
log := dmsgC.Logger().WithField("timeout", dmsgTimeout)
log.Info("Connecting to the dmsg network...")
select {
case <-time.After(dmsgTimeout):
log.Warn("Failed to connect to the dmsg network, will try again later.")
case <-n.Dmsg().Ready():
log.Info("Connected to the dmsg network.")
}
}

v.net = n
return report(nil)
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/visor/rpc_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package visor

import (
"context"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -98,17 +99,21 @@ func NewRPCClient(log logrus.FieldLogger, conn io.ReadWriteCloser, prefix string

// Call calls the internal rpc.Client with the serviceMethod arg prefixed.
func (rc *rpcClient) Call(method string, args, reply interface{}) error {
timer := time.NewTimer(rc.timeout)
defer timer.Stop()
ctx := context.Background()
if rc.timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(rc.timeout))
defer cancel()
}

select {
case call := <-rc.client.Go(rc.prefix+"."+method, args, reply, nil).Done:
return call.Error
case <-timer.C:
case <-ctx.Done():
if err := rc.conn.Close(); err != nil {
rc.log.WithError(err).Warn("failed to close underlying rpc connection after timeout error")
rc.log.WithError(err).Warn("Failed to close rpc client after timeout error.")
}
return ErrTimeout
return ctx.Err()
}
}

Expand Down

0 comments on commit 249ae70

Please sign in to comment.