Skip to content

Commit

Permalink
Add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Sep 16, 2019
1 parent f374c39 commit 7bcafc9
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 17 deletions.
33 changes: 18 additions & 15 deletions pkg/app2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,35 @@ package app2
import (
"net/rpc"

"github.com/skycoin/skywire/pkg/routing"

"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/pkg/routing"
)

// Client is used by skywire apps.
type Client struct {
PK cipher.PubKey
pid ProcID
rpc ServerRPCClient
logger *logging.Logger
pk cipher.PubKey
pid ProcID
rpc ServerRPCClient
log *logging.Logger
}

// NewClient creates a new Client. The Client needs to be provided with:
// NewClient creates a new `Client`. The `Client` needs to be provided with:
// - log: Logger instance.
// - localPK: The local public key of the parent skywire visor.
// - pid: The procID assigned for the process that Client is being used by.
// - sockAddr: The socket address to connect to Server.
func NewClient(localPK cipher.PubKey, pid ProcID, rpc *rpc.Client, l *logging.Logger) *Client {
// - rpc: RPC client to communicate with the server.
func NewClient(log *logging.Logger, localPK cipher.PubKey, pid ProcID, rpc *rpc.Client) *Client {
return &Client{
PK: localPK,
pid: pid,
rpc: newServerRPCClient(rpc),
logger: l,
pk: localPK,
pid: pid,
rpc: newServerRPCClient(rpc),
log: log,
}
}

// Dial dials the remote node using `remote`.
func (c *Client) Dial(remote routing.Addr) (*Conn, error) {
connID, err := c.rpc.Dial(remote)
if err != nil {
Expand All @@ -41,17 +43,18 @@ func (c *Client) Dial(remote routing.Addr) (*Conn, error) {
rpc: c.rpc,
// TODO: port?
local: routing.Addr{
PubKey: c.PK,
PubKey: c.pk,
},
remote: remote,
}

return conn, nil
}

// Listen listens on the specified `port` for the incoming connections.
func (c *Client) Listen(port routing.Port) (*Listener, error) {
local := routing.Addr{
PubKey: c.PK,
PubKey: c.pk,
Port: port,
}

Expand Down
1 change: 1 addition & 0 deletions pkg/app2/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/skycoin/skywire/pkg/routing"
)

// Conn is a connection from app client to the server.
type Conn struct {
id uint16
rpc ConnRPCClient
Expand Down
7 changes: 7 additions & 0 deletions pkg/app2/conns_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ import (
"github.com/pkg/errors"
)

// connsManager manages connections within the app server.
type connsManager struct {
conns map[uint16]net.Conn
mx sync.RWMutex
lstID uint16
}

// newConnsManager constructs new `connsManager`.
func newConnsManager() *connsManager {
return &connsManager{
conns: make(map[uint16]net.Conn),
}
}

// `nextID` reserves slot for the next connection and returns its id.
func (m *connsManager) nextID() (*uint16, error) {
m.mx.Lock()

Expand All @@ -42,6 +45,8 @@ func (m *connsManager) nextID() (*uint16, error) {
return &connID, nil
}

// getAndRemove removes connection specified by `connID` from the manager instance and
// returns it.
func (m *connsManager) getAndRemove(connID uint16) (net.Conn, error) {
m.mx.Lock()
conn, ok := m.conns[connID]
Expand All @@ -61,6 +66,7 @@ func (m *connsManager) getAndRemove(connID uint16) (net.Conn, error) {
return conn, nil
}

// set sets `conn` associated with `connID`.
func (m *connsManager) set(connID uint16, conn net.Conn) error {
m.mx.Lock()

Expand All @@ -75,6 +81,7 @@ func (m *connsManager) set(connID uint16, conn net.Conn) error {
return nil
}

// get gets the connection associated with the `connID`.
func (m *connsManager) get(connID uint16) (net.Conn, bool) {
m.mx.RLock()
conn, ok := m.conns[connID]
Expand Down
2 changes: 1 addition & 1 deletion pkg/app2/doc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package app2 provides facilities to establish communication
// between a visor node and a skywire application. Intended to
// replace the original `app` module
// replace the original `app` module.
package app2
2 changes: 2 additions & 0 deletions pkg/app2/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/skycoin/skywire/pkg/routing"
)

// Listener is a listener for app server connections.
type Listener struct {
id uint16
rpc ListenerRPCClient
Expand All @@ -29,6 +30,7 @@ func (l *Listener) Accept() (*Conn, error) {
return conn, nil
}

// TODO: should unblock all called `Accept`s with errors
func (l *Listener) Close() error {
return l.rpc.CloseListener(l.id)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/app2/listeners_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ import (
"github.com/skycoin/dmsg"
)

// listenersManager contains and manages all the instantiated listeners
// connsManager manages listeners within the app server.
type listenersManager struct {
listeners map[uint16]*dmsg.Listener
mx sync.RWMutex
lstID uint16
}

// newListenersManager constructs new `listenersManager`.
func newListenersManager() *listenersManager {
return &listenersManager{
listeners: make(map[uint16]*dmsg.Listener),
}
}

// `nextID` reserves slot for the next listener and returns its id.
func (m *listenersManager) nextID() (*uint16, error) {
m.mx.Lock()

Expand All @@ -43,6 +45,8 @@ func (m *listenersManager) nextID() (*uint16, error) {
return &lisID, nil
}

// getAndRemove removes listener specified by `lisID` from the manager instance and
// returns it.
func (m *listenersManager) getAndRemove(lisID uint16) (*dmsg.Listener, error) {
m.mx.Lock()
lis, ok := m.listeners[lisID]
Expand All @@ -62,6 +66,7 @@ func (m *listenersManager) getAndRemove(lisID uint16) (*dmsg.Listener, error) {
return lis, nil
}

// set sets `lis` associated with `lisID`.
func (m *listenersManager) set(lisID uint16, lis *dmsg.Listener) error {
m.mx.Lock()

Expand All @@ -76,6 +81,7 @@ func (m *listenersManager) set(lisID uint16, lis *dmsg.Listener) error {
return nil
}

// get gets the listener associated with the `lisID`.
func (m *listenersManager) get(lisID uint16) (*dmsg.Listener, bool) {
m.mx.RLock()
lis, ok := m.listeners[lisID]
Expand Down
11 changes: 11 additions & 0 deletions pkg/app2/server_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"github.com/skycoin/skywire/pkg/routing"
)

// ServerRPC is a RPC interface for the app server.
type ServerRPC struct {
dmsgC *dmsg.Client
lm *listenersManager
cm *connsManager
}

// newServerRPC constructs new server RPC interface.
func newServerRPC(dmsgC *dmsg.Client) *ServerRPC {
return &ServerRPC{
dmsgC: dmsgC,
Expand All @@ -23,6 +25,7 @@ func newServerRPC(dmsgC *dmsg.Client) *ServerRPC {
}
}

// Dial dials to the remote.
func (r *ServerRPC) Dial(remote *routing.Addr, connID *uint16) error {
connID, err := r.cm.nextID()
if err != nil {
Expand All @@ -41,6 +44,7 @@ func (r *ServerRPC) Dial(remote *routing.Addr, connID *uint16) error {
return nil
}

// Listen starts listening.
func (r *ServerRPC) Listen(local *routing.Addr, lisID *uint16) error {
lisID, err := r.lm.nextID()
if err != nil {
Expand All @@ -60,6 +64,7 @@ func (r *ServerRPC) Listen(local *routing.Addr, lisID *uint16) error {
return nil
}

// Accept accepts connection from the listener specified by `lisID`.
func (r *ServerRPC) Accept(lisID *uint16, connID *uint16) error {
lis, ok := r.lm.get(*lisID)
if !ok {
Expand All @@ -84,11 +89,13 @@ func (r *ServerRPC) Accept(lisID *uint16, connID *uint16) error {
return nil
}

// WriteReq contains arguments for `Write`.
type WriteReq struct {
ConnID uint16
B []byte
}

// Write writes to the connection.
func (r *ServerRPC) Write(req *WriteReq, n *int) error {
conn, ok := r.cm.get(req.ConnID)
if !ok {
Expand All @@ -104,11 +111,13 @@ func (r *ServerRPC) Write(req *WriteReq, n *int) error {
return nil
}

// ReadResp contains response parameters for `Read`.
type ReadResp struct {
B []byte
N int
}

// Read reads data from connection specified by `connID`.
func (r *ServerRPC) Read(connID *uint16, resp *ReadResp) error {
conn, ok := r.cm.get(*connID)
if !ok {
Expand All @@ -124,6 +133,7 @@ func (r *ServerRPC) Read(connID *uint16, resp *ReadResp) error {
return nil
}

// CloseConn closes connection specified by `connID`.
func (r *ServerRPC) CloseConn(connID *uint16, _ *struct{}) error {
conn, err := r.cm.getAndRemove(*connID)
if err != nil {
Expand All @@ -133,6 +143,7 @@ func (r *ServerRPC) CloseConn(connID *uint16, _ *struct{}) error {
return conn.Close()
}

// CloseListener closes listener specified by `lisID`.
func (r *ServerRPC) CloseListener(lisID *uint16, _ *struct{}) error {
lis, err := r.lm.getAndRemove(*lisID)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions pkg/app2/server_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/skycoin/skywire/pkg/routing"
)

// ServerRPCClient describes RPC interface to communicate with the server.
type ServerRPCClient interface {
Dial(remote routing.Addr) (uint16, error)
Listen(local routing.Addr) (uint16, error)
Expand All @@ -16,6 +17,8 @@ type ServerRPCClient interface {
CloseListener(id uint16) error
}

// ListenerRPCClient describes RPC interface to communicate with the server.
// Contains funcs for `Listener` and `Conn`.
type ListenerRPCClient interface {
Accept(id uint16) (uint16, error)
CloseListener(id uint16) error
Expand All @@ -24,22 +27,27 @@ type ListenerRPCClient interface {
CloseConn(id uint16) error
}

// ConnRPCClient describes RPC interface to communicate with the server.
// Contains funcs for `Conn`.
type ConnRPCClient interface {
Write(id uint16, b []byte) (int, error)
Read(id uint16, b []byte) (int, error)
CloseConn(id uint16) error
}

// serverRPCClient implements `ServerRPCClient`.
type serverRPCCLient struct {
rpc *rpc.Client
}

// newServerRPCClient constructs new `serverRPCClient`.
func newServerRPCClient(rpc *rpc.Client) ServerRPCClient {
return &serverRPCCLient{
rpc: rpc,
}
}

// Dial sends `Dial` command to the server.
func (c *serverRPCCLient) Dial(remote routing.Addr) (uint16, error) {
var connID uint16
if err := c.rpc.Call("Dial", &remote, &connID); err != nil {
Expand All @@ -49,6 +57,7 @@ func (c *serverRPCCLient) Dial(remote routing.Addr) (uint16, error) {
return connID, nil
}

// Listen sends `Listen` command to the server.
func (c *serverRPCCLient) Listen(local routing.Addr) (uint16, error) {
var lisID uint16
if err := c.rpc.Call("Listen", &local, &lisID); err != nil {
Expand All @@ -58,6 +67,7 @@ func (c *serverRPCCLient) Listen(local routing.Addr) (uint16, error) {
return lisID, nil
}

// Accept sends `Accept` command to the server.
func (c *serverRPCCLient) Accept(lisID uint16) (uint16, error) {
var connID uint16
if err := c.rpc.Call("Accept", &lisID, &connID); err != nil {
Expand All @@ -67,6 +77,7 @@ func (c *serverRPCCLient) Accept(lisID uint16) (uint16, error) {
return connID, nil
}

// Write sends `Write` command to the server.
func (c *serverRPCCLient) Write(connID uint16, b []byte) (int, error) {
req := WriteReq{
ConnID: connID,
Expand All @@ -81,6 +92,7 @@ func (c *serverRPCCLient) Write(connID uint16, b []byte) (int, error) {
return n, nil
}

// Read sends `Read` command to the server.
func (c *serverRPCCLient) Read(connID uint16, b []byte) (int, error) {
var resp ReadResp
if err := c.rpc.Call("Read", &connID, &resp); err != nil {
Expand All @@ -92,10 +104,12 @@ func (c *serverRPCCLient) Read(connID uint16, b []byte) (int, error) {
return resp.N, nil
}

// CloseConn sends `CloseConn` command to the server.
func (c *serverRPCCLient) CloseConn(id uint16) error {
return c.rpc.Call("CloseConn", &id, nil)
}

// CloseListener sends `CloseListener` command to the server.
func (c *serverRPCCLient) CloseListener(id uint16) error {
return c.rpc.Call("CloseListener", &id, nil)
}

0 comments on commit 7bcafc9

Please sign in to comment.