From c9eea7adf09c8658c522d2bbad99e2bdff1259eb Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 1 Oct 2019 14:05:27 +0300 Subject: [PATCH] Add app server --- go.mod | 2 -- pkg/app2/rpc_client.go | 29 +++++++++------ pkg/app2/rpc_client_test.go | 2 +- pkg/app2/server.go | 72 +++++++++++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 13 deletions(-) create mode 100644 pkg/app2/server.go diff --git a/go.mod b/go.mod index 40d333f709..c941ff5438 100644 --- a/go.mod +++ b/go.mod @@ -11,8 +11,6 @@ require ( github.com/gorilla/securecookie v1.1.1 github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d github.com/mitchellh/go-homedir v1.1.0 - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect github.com/pkg/errors v0.8.1 github.com/pkg/profile v1.3.0 github.com/prometheus/client_golang v1.1.0 diff --git a/pkg/app2/rpc_client.go b/pkg/app2/rpc_client.go index 843137801a..10a70ffabb 100644 --- a/pkg/app2/rpc_client.go +++ b/pkg/app2/rpc_client.go @@ -1,6 +1,7 @@ package app2 import ( + "fmt" "net/rpc" "github.com/skycoin/skywire/pkg/app2/appnet" @@ -22,20 +23,22 @@ type RPCClient interface { // rpcClient implements `RPCClient`. type rpcCLient struct { - rpc *rpc.Client + rpc *rpc.Client + appKey string } // NewRPCClient constructs new `rpcClient`. -func NewRPCClient(rpc *rpc.Client) RPCClient { +func NewRPCClient(rpc *rpc.Client, appKey string) RPCClient { return &rpcCLient{ - rpc: rpc, + rpc: rpc, + appKey: appKey, } } // Dial sends `Dial` command to the server. func (c *rpcCLient) Dial(remote appnet.Addr) (connID uint16, localPort routing.Port, err error) { var resp DialResp - if err := c.rpc.Call("RPCGateway.Dial", &remote, &resp); err != nil { + if err := c.rpc.Call(c.formatMethod("Dial"), &remote, &resp); err != nil { return 0, 0, err } @@ -45,7 +48,7 @@ func (c *rpcCLient) Dial(remote appnet.Addr) (connID uint16, localPort routing.P // Listen sends `Listen` command to the server. func (c *rpcCLient) Listen(local appnet.Addr) (uint16, error) { var lisID uint16 - if err := c.rpc.Call("RPCGateway.Listen", &local, &lisID); err != nil { + if err := c.rpc.Call(c.formatMethod("Listen"), &local, &lisID); err != nil { return 0, err } @@ -55,7 +58,7 @@ func (c *rpcCLient) Listen(local appnet.Addr) (uint16, error) { // Accept sends `Accept` command to the server. func (c *rpcCLient) Accept(lisID uint16) (connID uint16, remote appnet.Addr, err error) { var acceptResp AcceptResp - if err := c.rpc.Call("RPCGateway.Accept", &lisID, &acceptResp); err != nil { + if err := c.rpc.Call(c.formatMethod("Accept"), &lisID, &acceptResp); err != nil { return 0, appnet.Addr{}, err } @@ -70,7 +73,7 @@ func (c *rpcCLient) Write(connID uint16, b []byte) (int, error) { } var n int - if err := c.rpc.Call("RPCGateway.Write", &req, &n); err != nil { + if err := c.rpc.Call(c.formatMethod("Write"), &req, &n); err != nil { return n, err } @@ -85,7 +88,7 @@ func (c *rpcCLient) Read(connID uint16, b []byte) (int, error) { } var resp ReadResp - if err := c.rpc.Call("RPCGateway.Read", &req, &resp); err != nil { + if err := c.rpc.Call(c.formatMethod("Read"), &req, &resp); err != nil { return 0, err } @@ -96,10 +99,16 @@ func (c *rpcCLient) Read(connID uint16, b []byte) (int, error) { // CloseConn sends `CloseConn` command to the server. func (c *rpcCLient) CloseConn(id uint16) error { - return c.rpc.Call("RPCGateway.CloseConn", &id, nil) + return c.rpc.Call(c.formatMethod("CloseConn"), &id, nil) } // CloseListener sends `CloseListener` command to the server. func (c *rpcCLient) CloseListener(id uint16) error { - return c.rpc.Call("RPCGateway.CloseListener", &id, nil) + return c.rpc.Call(c.formatMethod("CloseListener"), &id, nil) +} + +// formatMethod formats complete RPC method signature. +func (c *rpcCLient) formatMethod(method string) string { + const methodFmt = "%s.%s" + return fmt.Sprintf(methodFmt, c.appKey, method) } diff --git a/pkg/app2/rpc_client_test.go b/pkg/app2/rpc_client_test.go index 9c3ea6dfaf..c8475a03d3 100644 --- a/pkg/app2/rpc_client_test.go +++ b/pkg/app2/rpc_client_test.go @@ -490,5 +490,5 @@ func prepClient(t *testing.T, network, addr string) RPCClient { rpcCl, err := rpc.Dial(network, addr) require.NoError(t, err) - return NewRPCClient(rpcCl) + return NewRPCClient(rpcCl, "RPCGateway") } diff --git a/pkg/app2/server.go b/pkg/app2/server.go new file mode 100644 index 0000000000..2ec24ab197 --- /dev/null +++ b/pkg/app2/server.go @@ -0,0 +1,72 @@ +package app2 + +import ( + "fmt" + "io" + "net" + "net/rpc" + + "github.com/skycoin/dmsg/cipher" + "github.com/skycoin/skycoin/src/util/logging" +) + +// Server is a server for app/visor communication. +type Server struct { + log *logging.Logger + sockFile string + rpcS *rpc.Server +} + +// NewServer constructs server. +func NewServer(log *logging.Logger, sockFile string) *Server { + return &Server{ + log: log, + sockFile: sockFile, + rpcS: rpc.NewServer(), + } +} + +// ListenAndServe starts listening for incoming app connections via unix socket. +func (s *Server) ListenAndServe() error { + l, err := net.Listen("unix", s.sockFile) + if err != nil { + return err + } + + for { + conn, err := l.Accept() + if err != nil { + return err + } + + go s.serveConn(conn) + } +} + +// serveConn instantiates RPC gateway for an application. +func (s *Server) serveConn(conn net.Conn) { + var appKey cipher.PubKey + if _, err := io.ReadFull(conn, appKey[:]); err != nil { + s.closeConn(conn) + s.log.WithError(err).Error("error reading app key") + return + } + + appKeyHex := appKey.Hex() + + gateway := newRPCGateway(logging.MustGetLogger(fmt.Sprintf("rpc_gateway_%s", appKeyHex))) + if err := s.rpcS.RegisterName(appKeyHex, gateway); err != nil { + s.closeConn(conn) + s.log.WithError(err).Errorf("error registering rpc gateway for app with key %s", appKeyHex) + return + } + + go s.rpcS.ServeConn(conn) +} + +// closeConn closes connection and logs error if any. +func (s *Server) closeConn(conn net.Conn) { + if err := conn.Close(); err != nil { + s.log.WithError(err).Error("error closing conn") + } +}