Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix transport manager hangs and various improvements. #376

Merged
merged 4 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions cmd/skywire-cli/commands/visor/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"net"
"time"

"github.com/SkycoinProject/skywire-mainnet/pkg/skyenv"

"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/spf13/cobra"

Expand All @@ -27,17 +25,11 @@ var RootCmd = &cobra.Command{
}

func rpcClient() visor.RPCClient {
const rpcDialTimeout = time.Second * 5
nkryuchkov marked this conversation as resolved.
Show resolved Hide resolved

conn, err := net.DialTimeout("tcp", rpcAddr, rpcDialTimeout)
if err != nil {
logger.Fatal("RPC connection failed:", err)
}
if err := conn.SetDeadline(time.Now().Add(rpcConnDuration)); err != nil {
logger.Fatal("RPC connection failed:", err)
}
return visor.NewRPCClient(logger, conn, visor.RPCPrefix, skyenv.DefaultRPCTimeout)
return visor.NewRPCClient(logger, conn, visor.RPCPrefix, 0)
}

const (
rpcDialTimeout = time.Second * 5
rpcConnDuration = time.Second * 60
)
53 changes: 26 additions & 27 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,45 +216,45 @@ func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, tpTy
return nil, io.ErrClosedPipe
}

const tries = 2

var err error
for i := 0; i < tries; i++ {
for {
mTp, err := tm.saveTransport(remote, tpType)
if err != nil {
return nil, err
}

if err = mTp.Dial(ctx); err != nil {
// TODO(nkryuchkov): Check for an error that underlying connection is not established
// and try again in this case. Otherwise, return the error.
pkTableErr := fmt.Sprintf("pk table: entry of %s does not exist", remote.String())

if err.Error() == pkTableErr {
mTp.wg.Wait()
// This occurs when an old tp is returned by 'tm.saveTransport', meaning a tp of the same transport ID was
// just deleted (and has not yet fully closed). Hence, we should close and delete the old tp and try again.
if err == ErrNotServing {
if closeErr := mTp.Close(); closeErr != nil {
tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.")
}
delete(tm.tps, mTp.Entry.ID)

return nil, err
continue
}

if err == ErrNotServing {
mTp.wg.Wait()
// This occurs when the tp type is STCP and the requested remote PK is not associated with an IP address in
// the STCP table. There is no point in retrying as a connection would be impossible, so we just return an
// error.
if isSTCPTableError(remote, err) {
if closeErr := mTp.Close(); closeErr != nil {
tm.Logger.WithError(err).Warn("Closing mTp returns non-nil error.")
}
delete(tm.tps, mTp.Entry.ID)
continue
return nil, err
}

tm.Logger.
WithError(err).
Warn("Underlying connection is not yet established. Will retry later.")
tm.Logger.WithError(err).Warn("Underlying transport connection is not established, will retry later.")
}

return mTp, nil
}
}

tm.Logger.
WithError(err).
WithField("tries", tries).
Error("Failed to serve managed transport. This is unexpected.")
return nil, err
// isSTCPPKError returns true if the error is a STCP table error.
// This occurs the requested remote public key does not exist in the STCP table.
func isSTCPTableError(remotePK cipher.PubKey, err error) bool {
return err.Error() == fmt.Sprintf("pk table: entry of %s does not exist", remotePK.String())
}

func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*ManagedTransport, error) {
Expand All @@ -263,13 +263,12 @@ func (tm *Manager) saveTransport(remote cipher.PubKey, netName string) (*Managed
}

tpID := tm.tpIDFromPK(remote, netName)

tm.Logger.Debugf("Initializing TP with ID %s", tpID)

tp, ok := tm.tps[tpID]
oldMTp, ok := tm.tps[tpID]
if ok {
tm.Logger.Debugln("Got TP from map")
return tp, nil
tm.Logger.Debug("Found an old mTp from internal map.")
return oldMTp, nil
}

mTp := NewManagedTransport(tm.n, tm.Conf.DiscoveryClient, tm.Conf.LogStore, remote, netName)
Expand Down
12 changes: 12 additions & 0 deletions pkg/visor/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ func initSNet(v *Visor) bool {
return report(n.Close())
})

if dmsgC := n.Dmsg(); dmsgC != nil {
const dmsgTimeout = time.Second * 20
log := dmsgC.Logger().WithField("timeout", dmsgTimeout)
log.Info("Connecting to the dmsg network...")
select {
case <-time.After(dmsgTimeout):
log.Warn("Failed to connect to the dmsg network, will try again later.")
case <-n.Dmsg().Ready():
log.Info("Connected to the dmsg network.")
}
}

v.net = n
return report(nil)
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/visor/rpc_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package visor

import (
"context"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -98,17 +99,21 @@ func NewRPCClient(log logrus.FieldLogger, conn io.ReadWriteCloser, prefix string

// Call calls the internal rpc.Client with the serviceMethod arg prefixed.
func (rc *rpcClient) Call(method string, args, reply interface{}) error {
timer := time.NewTimer(rc.timeout)
defer timer.Stop()
ctx := context.Background()
if rc.timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(rc.timeout))
defer cancel()
}

select {
case call := <-rc.client.Go(rc.prefix+"."+method, args, reply, nil).Done:
return call.Error
case <-timer.C:
case <-ctx.Done():
if err := rc.conn.Close(); err != nil {
rc.log.WithError(err).Warn("failed to close underlying rpc connection after timeout error")
rc.log.WithError(err).Warn("Failed to close rpc client after timeout error.")
}
return ErrTimeout
return ctx.Err()
}
}

Expand Down