Skip to content

Commit

Permalink
Add app server
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Oct 1, 2019
1 parent d060e6b commit c9eea7a
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 13 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ require (
github.com/gorilla/securecookie v1.1.1
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d
github.com/mitchellh/go-homedir v1.1.0
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pkg/errors v0.8.1
github.com/pkg/profile v1.3.0
github.com/prometheus/client_golang v1.1.0
Expand Down
29 changes: 19 additions & 10 deletions pkg/app2/rpc_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app2

import (
"fmt"
"net/rpc"

"github.com/skycoin/skywire/pkg/app2/appnet"
Expand All @@ -22,20 +23,22 @@ type RPCClient interface {

// rpcClient implements `RPCClient`.
type rpcCLient struct {
rpc *rpc.Client
rpc *rpc.Client
appKey string
}

// NewRPCClient constructs new `rpcClient`.
func NewRPCClient(rpc *rpc.Client) RPCClient {
func NewRPCClient(rpc *rpc.Client, appKey string) RPCClient {
return &rpcCLient{
rpc: rpc,
rpc: rpc,
appKey: appKey,
}
}

// Dial sends `Dial` command to the server.
func (c *rpcCLient) Dial(remote appnet.Addr) (connID uint16, localPort routing.Port, err error) {
var resp DialResp
if err := c.rpc.Call("RPCGateway.Dial", &remote, &resp); err != nil {
if err := c.rpc.Call(c.formatMethod("Dial"), &remote, &resp); err != nil {
return 0, 0, err
}

Expand All @@ -45,7 +48,7 @@ func (c *rpcCLient) Dial(remote appnet.Addr) (connID uint16, localPort routing.P
// Listen sends `Listen` command to the server.
func (c *rpcCLient) Listen(local appnet.Addr) (uint16, error) {
var lisID uint16
if err := c.rpc.Call("RPCGateway.Listen", &local, &lisID); err != nil {
if err := c.rpc.Call(c.formatMethod("Listen"), &local, &lisID); err != nil {
return 0, err
}

Expand All @@ -55,7 +58,7 @@ func (c *rpcCLient) Listen(local appnet.Addr) (uint16, error) {
// Accept sends `Accept` command to the server.
func (c *rpcCLient) Accept(lisID uint16) (connID uint16, remote appnet.Addr, err error) {
var acceptResp AcceptResp
if err := c.rpc.Call("RPCGateway.Accept", &lisID, &acceptResp); err != nil {
if err := c.rpc.Call(c.formatMethod("Accept"), &lisID, &acceptResp); err != nil {
return 0, appnet.Addr{}, err
}

Expand All @@ -70,7 +73,7 @@ func (c *rpcCLient) Write(connID uint16, b []byte) (int, error) {
}

var n int
if err := c.rpc.Call("RPCGateway.Write", &req, &n); err != nil {
if err := c.rpc.Call(c.formatMethod("Write"), &req, &n); err != nil {
return n, err
}

Expand All @@ -85,7 +88,7 @@ func (c *rpcCLient) Read(connID uint16, b []byte) (int, error) {
}

var resp ReadResp
if err := c.rpc.Call("RPCGateway.Read", &req, &resp); err != nil {
if err := c.rpc.Call(c.formatMethod("Read"), &req, &resp); err != nil {
return 0, err
}

Expand All @@ -96,10 +99,16 @@ func (c *rpcCLient) Read(connID uint16, b []byte) (int, error) {

// CloseConn sends `CloseConn` command to the server.
func (c *rpcCLient) CloseConn(id uint16) error {
return c.rpc.Call("RPCGateway.CloseConn", &id, nil)
return c.rpc.Call(c.formatMethod("CloseConn"), &id, nil)
}

// CloseListener sends `CloseListener` command to the server.
func (c *rpcCLient) CloseListener(id uint16) error {
return c.rpc.Call("RPCGateway.CloseListener", &id, nil)
return c.rpc.Call(c.formatMethod("CloseListener"), &id, nil)
}

// formatMethod formats complete RPC method signature.
func (c *rpcCLient) formatMethod(method string) string {
const methodFmt = "%s.%s"
return fmt.Sprintf(methodFmt, c.appKey, method)
}
2 changes: 1 addition & 1 deletion pkg/app2/rpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,5 +490,5 @@ func prepClient(t *testing.T, network, addr string) RPCClient {
rpcCl, err := rpc.Dial(network, addr)
require.NoError(t, err)

return NewRPCClient(rpcCl)
return NewRPCClient(rpcCl, "RPCGateway")
}
72 changes: 72 additions & 0 deletions pkg/app2/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package app2

import (
"fmt"
"io"
"net"
"net/rpc"

"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"
)

// Server is a server for app/visor communication.
type Server struct {
log *logging.Logger
sockFile string
rpcS *rpc.Server
}

// NewServer constructs server.
func NewServer(log *logging.Logger, sockFile string) *Server {
return &Server{
log: log,
sockFile: sockFile,
rpcS: rpc.NewServer(),
}
}

// ListenAndServe starts listening for incoming app connections via unix socket.
func (s *Server) ListenAndServe() error {
l, err := net.Listen("unix", s.sockFile)
if err != nil {
return err
}

for {
conn, err := l.Accept()
if err != nil {
return err
}

go s.serveConn(conn)
}
}

// serveConn instantiates RPC gateway for an application.
func (s *Server) serveConn(conn net.Conn) {
var appKey cipher.PubKey
if _, err := io.ReadFull(conn, appKey[:]); err != nil {
s.closeConn(conn)
s.log.WithError(err).Error("error reading app key")
return
}

appKeyHex := appKey.Hex()

gateway := newRPCGateway(logging.MustGetLogger(fmt.Sprintf("rpc_gateway_%s", appKeyHex)))
if err := s.rpcS.RegisterName(appKeyHex, gateway); err != nil {
s.closeConn(conn)
s.log.WithError(err).Errorf("error registering rpc gateway for app with key %s", appKeyHex)
return
}

go s.rpcS.ServeConn(conn)
}

// closeConn closes connection and logs error if any.
func (s *Server) closeConn(conn net.Conn) {
if err := conn.Close(); err != nil {
s.log.WithError(err).Error("error closing conn")
}
}

0 comments on commit c9eea7a

Please sign in to comment.