Skip to content

Commit

Permalink
Create setup node client
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Sep 14, 2019
1 parent 5ba16e2 commit e842d62
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 57 deletions.
73 changes: 16 additions & 57 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
"sync"
"time"

"github.com/skycoin/skywire/pkg/snet"

"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/pkg/app"
routeFinder "github.com/skycoin/skywire/pkg/route-finder/client"
rfclient "github.com/skycoin/skywire/pkg/route-finder/client"
"github.com/skycoin/skywire/pkg/routing"
"github.com/skycoin/skywire/pkg/setup"
rsclient "github.com/skycoin/skywire/pkg/setup/client"
"github.com/skycoin/skywire/pkg/snet"
"github.com/skycoin/skywire/pkg/transport"
)

Expand All @@ -40,7 +40,7 @@ type Config struct {
SecKey cipher.SecKey
TransportManager *transport.Manager
RoutingTable routing.Table
RouteFinder routeFinder.Client
RouteFinder rfclient.Client
SetupNodes []cipher.PubKey
}

Expand All @@ -67,6 +67,8 @@ type Router struct {
sl *snet.Listener
rt routing.Table

rsc rsclient.Client

wg sync.WaitGroup
mx sync.Mutex

Expand Down Expand Up @@ -94,6 +96,7 @@ func New(n *snet.Network, config *Config) (*Router, error) {
staticPorts: make(map[routing.Port]struct{}),
}

r.rsc = rsclient.New(n, sl, config.SetupNodes, r.handleSetupConn)
r.OnConfirmLoop = r.confirmLoop
r.OnLoopClosed = r.loopClosed

Expand Down Expand Up @@ -122,42 +125,14 @@ func (r *Router) Serve(ctx context.Context) error {

r.wg.Add(1)
go func() {
r.ServeConnLoop()
r.wg.Done()
}()

r.tm.Serve(ctx)
return nil
}
defer r.wg.Done()

// ServeConnLoop initiates serving connections by route manager.
func (r *Router) ServeConnLoop() {
// Accept setup node request loop.
for {
if err := r.serveConn(); err != nil {
r.Logger.WithError(err).Warnf("stopped serving")
return
if err := r.rsc.Serve(); err != nil {
r.Logger.WithError(err).Warnf("setup client stopped serving")
}
}
}

func (r *Router) serveConn() error {
conn, err := r.sl.AcceptConn()
if err != nil {
r.Logger.WithError(err).Warnf("stopped serving")
return err
}
if !r.SetupIsTrusted(conn.RemotePK()) {
r.Logger.Warnf("closing conn from untrusted setup node: %v", conn.Close())
return nil
}
go func() {
r.Logger.Infof("handling setup request: setupPK(%s)", conn.RemotePK())
if err := r.handleSetupConn(conn); err != nil {
r.Logger.WithError(err).Warnf("setup request failed: setupPK(%s)", conn.RemotePK())
}
r.Logger.Infof("successfully handled setup request: setupPK(%s)", conn.RemotePK())
}()

r.tm.Serve(ctx)
return nil
}

Expand Down Expand Up @@ -504,10 +479,11 @@ func (r *Router) requestLoop(ctx context.Context, appConn *app.Protocol, raddr r
Reverse: reverseRoute,
}

sConn, err := r.dialSetupConn(ctx)
sConn, err := r.rsc.Dial(ctx)
if err != nil {
return routing.Addr{}, err
}

defer func() {
if err := sConn.Close(); err != nil {
r.Logger.Warnf("Failed to close transport: %s", err)
Expand Down Expand Up @@ -556,7 +532,7 @@ func (r *Router) confirmLoop(l routing.Loop, rule routing.Rule) error {
func (r *Router) closeLoop(ctx context.Context, appConn *app.Protocol, loop routing.Loop) error {
r.destroyLoop(loop)

sConn, err := r.dialSetupConn(ctx)
sConn, err := r.rsc.Dial(ctx)
if err != nil {
return err
}
Expand All @@ -572,18 +548,6 @@ func (r *Router) closeLoop(ctx context.Context, appConn *app.Protocol, loop rout
return nil
}

func (r *Router) dialSetupConn(_ context.Context) (*snet.Conn, error) {
for _, sPK := range r.conf.SetupNodes {
conn, err := r.n.Dial(snet.DmsgType, sPK, snet.SetupPort)
if err != nil {
r.Logger.WithError(err).Warnf("failed to dial to setup node: setupPK(%s)", sPK)
continue
}
return conn, nil
}
return nil, errors.New("failed to dial to a setup node")
}

func (r *Router) loopClosed(loop routing.Loop) error {
b, err := r.pm.Get(loop.Local.Port)
if err != nil {
Expand Down Expand Up @@ -658,10 +622,5 @@ fetchRoutesAgain:

// SetupIsTrusted checks if setup node is trusted.
func (r *Router) SetupIsTrusted(sPK cipher.PubKey) bool {
for _, pk := range r.conf.SetupNodes {
if sPK == pk {
return true
}
}
return false
return r.rsc.IsTrusted(sPK)
}
97 changes: 97 additions & 0 deletions pkg/setup/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package client

import (
"context"
"errors"
"net"

"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/pkg/snet"
)

// Client interacts with setup nodes.
type Client interface {
Dial(context.Context) (*snet.Conn, error)
Serve() error
IsTrusted(sPK cipher.PubKey) bool
}

type handlerFunc func(net.Conn) error

type client struct {
Logger *logging.Logger

network *snet.Network
sl *snet.Listener
nodes []cipher.PubKey
handler handlerFunc
}

// New returns a new setup node client instance.
func New(network *snet.Network, sl *snet.Listener, nodes []cipher.PubKey, handler handlerFunc) Client {
c := &client{
Logger: logging.MustGetLogger("setup-client"),

network: network,
sl: sl,
nodes: nodes,
handler: handler,
}

return c
}

// TODO: use context
func (c *client) Dial(ctx context.Context) (*snet.Conn, error) {
for _, sPK := range c.nodes {
conn, err := c.network.Dial(snet.DmsgType, sPK, snet.SetupPort)
if err != nil {
c.Logger.WithError(err).Warnf("failed to dial to setup node: setupPK(%s)", sPK)
continue
}
return conn, nil
}
return nil, errors.New("failed to dial to a setup node")
}

// ServeConnLoop initiates serving connections by route manager.
func (c *client) Serve() error {
// Accept setup node request loop.
for {
if err := c.serveConn(); err != nil {
return err
}
}
}

func (c *client) serveConn() error {
conn, err := c.sl.AcceptConn()
if err != nil {
c.Logger.WithError(err).Warnf("stopped serving")
return err
}
if !c.IsTrusted(conn.RemotePK()) {
c.Logger.Warnf("closing conn from untrusted setup node: %v", conn.Close())
return nil
}
go func() {
c.Logger.Infof("handling setup request: setupPK(%s)", conn.RemotePK())
if err := c.handler(conn); err != nil {
c.Logger.WithError(err).Warnf("setup request failed: setupPK(%s)", conn.RemotePK())
}
c.Logger.Infof("successfully handled setup request: setupPK(%s)", conn.RemotePK())
}()
return nil
}

// SetupIsTrusted checks if setup node is trusted.
func (c *client) IsTrusted(sPK cipher.PubKey) bool {
for _, pk := range c.nodes {
if sPK == pk {
return true
}
}
return false
}

0 comments on commit e842d62

Please sign in to comment.