From e842d624e29de105357278aeaefce805cbe69c4b Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Sat, 14 Sep 2019 16:26:43 +0300 Subject: [PATCH] Create setup node client --- pkg/router/router.go | 73 +++++++--------------------- pkg/setup/client/client.go | 97 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 57 deletions(-) create mode 100644 pkg/setup/client/client.go diff --git a/pkg/router/router.go b/pkg/router/router.go index d21b31fbb..6e38b982e 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -11,15 +11,15 @@ import ( "sync" "time" - "github.com/skycoin/skywire/pkg/snet" - "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/skywire/pkg/app" - routeFinder "github.com/skycoin/skywire/pkg/route-finder/client" + rfclient "github.com/skycoin/skywire/pkg/route-finder/client" "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/setup" + rsclient "github.com/skycoin/skywire/pkg/setup/client" + "github.com/skycoin/skywire/pkg/snet" "github.com/skycoin/skywire/pkg/transport" ) @@ -40,7 +40,7 @@ type Config struct { SecKey cipher.SecKey TransportManager *transport.Manager RoutingTable routing.Table - RouteFinder routeFinder.Client + RouteFinder rfclient.Client SetupNodes []cipher.PubKey } @@ -67,6 +67,8 @@ type Router struct { sl *snet.Listener rt routing.Table + rsc rsclient.Client + wg sync.WaitGroup mx sync.Mutex @@ -94,6 +96,7 @@ func New(n *snet.Network, config *Config) (*Router, error) { staticPorts: make(map[routing.Port]struct{}), } + r.rsc = rsclient.New(n, sl, config.SetupNodes, r.handleSetupConn) r.OnConfirmLoop = r.confirmLoop r.OnLoopClosed = r.loopClosed @@ -122,42 +125,14 @@ func (r *Router) Serve(ctx context.Context) error { r.wg.Add(1) go func() { - r.ServeConnLoop() - r.wg.Done() - }() - - r.tm.Serve(ctx) - return nil -} + defer r.wg.Done() -// ServeConnLoop initiates serving connections by route manager. -func (r *Router) ServeConnLoop() { - // Accept setup node request loop. - for { - if err := r.serveConn(); err != nil { - r.Logger.WithError(err).Warnf("stopped serving") - return + if err := r.rsc.Serve(); err != nil { + r.Logger.WithError(err).Warnf("setup client stopped serving") } - } -} - -func (r *Router) serveConn() error { - conn, err := r.sl.AcceptConn() - if err != nil { - r.Logger.WithError(err).Warnf("stopped serving") - return err - } - if !r.SetupIsTrusted(conn.RemotePK()) { - r.Logger.Warnf("closing conn from untrusted setup node: %v", conn.Close()) - return nil - } - go func() { - r.Logger.Infof("handling setup request: setupPK(%s)", conn.RemotePK()) - if err := r.handleSetupConn(conn); err != nil { - r.Logger.WithError(err).Warnf("setup request failed: setupPK(%s)", conn.RemotePK()) - } - r.Logger.Infof("successfully handled setup request: setupPK(%s)", conn.RemotePK()) }() + + r.tm.Serve(ctx) return nil } @@ -504,10 +479,11 @@ func (r *Router) requestLoop(ctx context.Context, appConn *app.Protocol, raddr r Reverse: reverseRoute, } - sConn, err := r.dialSetupConn(ctx) + sConn, err := r.rsc.Dial(ctx) if err != nil { return routing.Addr{}, err } + defer func() { if err := sConn.Close(); err != nil { r.Logger.Warnf("Failed to close transport: %s", err) @@ -556,7 +532,7 @@ func (r *Router) confirmLoop(l routing.Loop, rule routing.Rule) error { func (r *Router) closeLoop(ctx context.Context, appConn *app.Protocol, loop routing.Loop) error { r.destroyLoop(loop) - sConn, err := r.dialSetupConn(ctx) + sConn, err := r.rsc.Dial(ctx) if err != nil { return err } @@ -572,18 +548,6 @@ func (r *Router) closeLoop(ctx context.Context, appConn *app.Protocol, loop rout return nil } -func (r *Router) dialSetupConn(_ context.Context) (*snet.Conn, error) { - for _, sPK := range r.conf.SetupNodes { - conn, err := r.n.Dial(snet.DmsgType, sPK, snet.SetupPort) - if err != nil { - r.Logger.WithError(err).Warnf("failed to dial to setup node: setupPK(%s)", sPK) - continue - } - return conn, nil - } - return nil, errors.New("failed to dial to a setup node") -} - func (r *Router) loopClosed(loop routing.Loop) error { b, err := r.pm.Get(loop.Local.Port) if err != nil { @@ -658,10 +622,5 @@ fetchRoutesAgain: // SetupIsTrusted checks if setup node is trusted. func (r *Router) SetupIsTrusted(sPK cipher.PubKey) bool { - for _, pk := range r.conf.SetupNodes { - if sPK == pk { - return true - } - } - return false + return r.rsc.IsTrusted(sPK) } diff --git a/pkg/setup/client/client.go b/pkg/setup/client/client.go new file mode 100644 index 000000000..55bce5b44 --- /dev/null +++ b/pkg/setup/client/client.go @@ -0,0 +1,97 @@ +package client + +import ( + "context" + "errors" + "net" + + "github.com/skycoin/dmsg/cipher" + "github.com/skycoin/skycoin/src/util/logging" + + "github.com/skycoin/skywire/pkg/snet" +) + +// Client interacts with setup nodes. +type Client interface { + Dial(context.Context) (*snet.Conn, error) + Serve() error + IsTrusted(sPK cipher.PubKey) bool +} + +type handlerFunc func(net.Conn) error + +type client struct { + Logger *logging.Logger + + network *snet.Network + sl *snet.Listener + nodes []cipher.PubKey + handler handlerFunc +} + +// New returns a new setup node client instance. +func New(network *snet.Network, sl *snet.Listener, nodes []cipher.PubKey, handler handlerFunc) Client { + c := &client{ + Logger: logging.MustGetLogger("setup-client"), + + network: network, + sl: sl, + nodes: nodes, + handler: handler, + } + + return c +} + +// TODO: use context +func (c *client) Dial(ctx context.Context) (*snet.Conn, error) { + for _, sPK := range c.nodes { + conn, err := c.network.Dial(snet.DmsgType, sPK, snet.SetupPort) + if err != nil { + c.Logger.WithError(err).Warnf("failed to dial to setup node: setupPK(%s)", sPK) + continue + } + return conn, nil + } + return nil, errors.New("failed to dial to a setup node") +} + +// ServeConnLoop initiates serving connections by route manager. +func (c *client) Serve() error { + // Accept setup node request loop. + for { + if err := c.serveConn(); err != nil { + return err + } + } +} + +func (c *client) serveConn() error { + conn, err := c.sl.AcceptConn() + if err != nil { + c.Logger.WithError(err).Warnf("stopped serving") + return err + } + if !c.IsTrusted(conn.RemotePK()) { + c.Logger.Warnf("closing conn from untrusted setup node: %v", conn.Close()) + return nil + } + go func() { + c.Logger.Infof("handling setup request: setupPK(%s)", conn.RemotePK()) + if err := c.handler(conn); err != nil { + c.Logger.WithError(err).Warnf("setup request failed: setupPK(%s)", conn.RemotePK()) + } + c.Logger.Infof("successfully handled setup request: setupPK(%s)", conn.RemotePK()) + }() + return nil +} + +// SetupIsTrusted checks if setup node is trusted. +func (c *client) IsTrusted(sPK cipher.PubKey) bool { + for _, pk := range c.nodes { + if sPK == pk { + return true + } + } + return false +}