Skip to content

Commit

Permalink
Re-implemented RPCClientDialer.
Browse files Browse the repository at this point in the history
  • Loading branch information
志宇 committed Feb 25, 2020
1 parent 04ade12 commit 68c8de2
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 132 deletions.
1 change: 1 addition & 0 deletions internal/skyenv/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
DmsgSetupPort = uint16(36) // Listening port of a setup node.
DmsgAwaitSetupPort = uint16(136) // Listening port of a visor for setup operations.
DmsgTransportPort = uint16(45) // Listening port of a visor for incoming transports.
DmsgHypervisorPort = uint16(46) // Listening port of a visor for incoming hypervisor connections.
)

// Default dmsgpty constants.
Expand Down
96 changes: 0 additions & 96 deletions pkg/visor/rpc_client.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package visor

import (
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"net"
"net/http"
"net/rpc"
"sync"
Expand All @@ -19,7 +17,6 @@ import (
"github.com/SkycoinProject/skywire-mainnet/pkg/app"
"github.com/SkycoinProject/skywire-mainnet/pkg/router"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
"github.com/SkycoinProject/skywire-mainnet/pkg/snet"
"github.com/SkycoinProject/skywire-mainnet/pkg/snet/snettest"
"github.com/SkycoinProject/skywire-mainnet/pkg/transport"
"github.com/SkycoinProject/skywire-mainnet/pkg/util/buildinfo"
Expand Down Expand Up @@ -246,99 +243,6 @@ func (rc *rpcClient) Loops() ([]LoopInfo, error) {
return loops, err
}

// RPCClientDialer keeps track of an rpc connection and retries to connect if it fails at some point
type RPCClientDialer struct {
dialer *snet.Network
pk cipher.PubKey
port uint16
conn net.Conn
mu sync.Mutex
done chan struct{} // nil: loop is not running, non-nil: loop is running.
}

// NewRPCClientDialer creates a new RPCDialer to the given address
func NewRPCClientDialer(dialer *snet.Network, pk cipher.PubKey, port uint16) *RPCClientDialer {
return &RPCClientDialer{
dialer: dialer,
pk: pk,
port: port,
}
}

// Run repeatedly dials to remote until a successful connection is established.
// It exposes a RPC Server.
// It will return if Close is called or crypto fails.
func (d *RPCClientDialer) Run(srv *rpc.Server, retry time.Duration) error {
if ok := d.setDone(); !ok {
return ErrAlreadyServing
}
for {
if err := d.establishConn(); err != nil {
return err
}
// Only serve when then dial succeeds.
srv.ServeConn(d.conn)
d.setConn(nil)
select {
case <-d.done:
d.clearDone()
return nil
case <-time.After(retry):
}
}
}

// Close closes the handler.
func (d *RPCClientDialer) Close() (err error) {
if d == nil {
return nil
}
d.mu.Lock()
if d.done != nil {
close(d.done)
}
if d.conn != nil {
err = d.conn.Close()
}
d.mu.Unlock()
return
}

// This operation should be atomic, hence protected by mutex.
func (d *RPCClientDialer) establishConn() error {
d.mu.Lock()
defer d.mu.Unlock()

conn, err := d.dialer.Dial(context.Background(), snet.DmsgType, d.pk, d.port)
if err != nil {
return err
}

d.conn = conn
return nil
}

func (d *RPCClientDialer) setConn(conn net.Conn) {
d.mu.Lock()
d.conn = conn
d.mu.Unlock()
}

func (d *RPCClientDialer) setDone() (ok bool) {
d.mu.Lock()
if ok = d.done == nil; ok {
d.done = make(chan struct{})
}
d.mu.Unlock()
return
}

func (d *RPCClientDialer) clearDone() {
d.mu.Lock()
d.done = nil
d.mu.Unlock()
}

// Restart calls Restart.
func (rc *rpcClient) Restart() error {
return rc.Call("Restart", &struct{}{}, &struct{}{})
Expand Down
52 changes: 52 additions & 0 deletions pkg/visor/rpc_client_dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package visor

import (
"context"
"net/rpc"

"github.com/SkycoinProject/dmsg"
"github.com/SkycoinProject/dmsg/netutil"
"github.com/sirupsen/logrus"

"github.com/SkycoinProject/skywire-mainnet/pkg/snet"
)

func isDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

func ServeRPCClient(ctx context.Context, log logrus.FieldLogger, n *snet.Network, rpcS *rpc.Server, rAddr dmsg.Addr, errCh chan<- error) {
for {
var conn *snet.Conn
err := netutil.NewDefaultRetrier(log).Do(ctx, func() (rErr error) {
conn, rErr = n.Dial(ctx, snet.DmsgType, rAddr.PK, rAddr.Port)
return rErr
})
if err != nil {
if errCh != nil {
errCh <- err
}
return
}
if conn == nil {
log.WithField("conn == nil", conn == nil).
Fatal("An unexpected occurrence happened.")
}

connCtx, cancel := context.WithCancel(ctx)
go func() {
rpcS.ServeConn(conn)
cancel()
}()
<-connCtx.Done()

log.WithError(conn.Close()).
WithField("context_done", isDone(ctx)).
Debug("ServeRPCClient: Closed conn.")
}
}
63 changes: 27 additions & 36 deletions pkg/visor/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/SkycoinProject/dmsg/dmsgpty"
"github.com/SkycoinProject/skycoin/src/util/logging"

"github.com/SkycoinProject/dmsg/httputil"
"github.com/SkycoinProject/skywire-mainnet/internal/skyenv"

"github.com/SkycoinProject/skywire-mainnet/pkg/app/appcommon"
"github.com/SkycoinProject/skywire-mainnet/pkg/app/appnet"
Expand Down Expand Up @@ -92,8 +92,8 @@ type Visor struct {

pidMu sync.Mutex

rpcListener net.Listener
rpcDialers []*RPCClientDialer
cliL net.Listener
hvE map[cipher.PubKey]chan error

procManager appserver.ProcManager
appRPCServer *appserver.Server
Expand Down Expand Up @@ -212,18 +212,12 @@ func NewVisor(cfg *Config, logger *logging.MasterLogger, restartCtx *restart.Con
if err != nil {
return nil, fmt.Errorf("failed to setup RPC listener: %s", err)
}
visor.rpcListener = l
visor.cliL = l
}

visor.rpcDialers = make([]*RPCClientDialer, len(cfg.Hypervisors))

for i, entry := range cfg.Hypervisors {
_, rpcPort, err := httputil.SplitRPCAddr(entry.Addr)
if err != nil {
return nil, fmt.Errorf("failed to parse rpc port from rpc address: %s", err)
}

visor.rpcDialers[i] = NewRPCClientDialer(visor.n, entry.PubKey, rpcPort)
visor.hvE = make(map[cipher.PubKey]chan error, len(cfg.Hypervisors))
for _, hv := range cfg.Hypervisors {
visor.hvE[hv.PubKey] = make(chan error, 1)
}

visor.appRPCServer = appserver.New(logging.MustGetLogger("app_rpc_server"), visor.conf.AppServerSockFile)
Expand Down Expand Up @@ -298,24 +292,21 @@ func (visor *Visor) Start() error {
}(ac)
}

// CLI and RPC server.
// RPC server for CLI and Hypervisor.
rpcSvr := rpc.NewServer()
if err := rpcSvr.RegisterName(RPCPrefix, &RPC{visor: visor}); err != nil {
return fmt.Errorf("rpc server created failed: %s", err)
}

if visor.rpcListener != nil {
visor.logger.Info("Starting RPC interface on ", visor.rpcListener.Addr())

go rpcSvr.Accept(visor.rpcListener)
if visor.cliL != nil {
visor.logger.Info("Starting RPC interface on ", visor.cliL.Addr())
go rpcSvr.Accept(visor.cliL)
}

for _, dialer := range visor.rpcDialers {
go func(dialer *RPCClientDialer) {
if err := dialer.Run(rpcSvr, time.Second); err != nil {
visor.logger.Errorf("Hypervisor Dmsg Dial exited with error: %v", err)
}
}(dialer)
if visor.hvE != nil {
for hvPK, hvErrs := range visor.hvE {
log := visor.Logger.PackageLogger("hypervisor:" + hvPK.String())
addr := dmsg.Addr{PK: hvPK, Port: skyenv.DmsgHypervisorPort}
go ServeRPCClient(ctx, log, visor.n, rpcSvr, addr, hvErrs)
}
}

visor.logger.Info("Starting packet router")
Expand Down Expand Up @@ -392,19 +383,19 @@ func (visor *Visor) Close() (err error) {
return nil
}

if visor.rpcListener != nil {
if err = visor.rpcListener.Close(); err != nil {
visor.logger.WithError(err).Error("failed to stop RPC interface")
if visor.cliL != nil {
if err = visor.cliL.Close(); err != nil {
visor.logger.WithError(err).Error("failed to close CLI listener")
} else {
visor.logger.Info("RPC interface stopped successfully")
visor.logger.Info("CLI listener closed successfully")
}
}

for i, dialer := range visor.rpcDialers {
if err = dialer.Close(); err != nil {
visor.logger.WithError(err).Errorf("(%d) failed to stop RPC dialer", i)
} else {
visor.logger.Infof("(%d) RPC dialer closed successfully", i)
if visor.hvE != nil {
for hvPK, hvErr := range visor.hvE {
visor.logger.
WithError(<-hvErr).
WithField("hypervisor_pk", hvPK).
Info("Closed hypervisor connection.")
}
}

Expand Down

0 comments on commit 68c8de2

Please sign in to comment.