Skip to content

Commit

Permalink
Finish RPC communication
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Sep 15, 2019
1 parent e2afc9e commit f374c39
Show file tree
Hide file tree
Showing 11 changed files with 385 additions and 1,015 deletions.
168 changes: 30 additions & 138 deletions pkg/app2/client.go
Original file line number Diff line number Diff line change
@@ -1,178 +1,70 @@
package app2

import (
"encoding/binary"
"net"
"net/rpc"

"github.com/hashicorp/yamux"

"github.com/pkg/errors"

"github.com/skycoin/skycoin/src/util/logging"
"github.com/skycoin/skywire/pkg/routing"

"github.com/skycoin/dmsg/cipher"
)

var (
ErrWrongHSFrameTypeReceived = errors.New("received wrong HS frame type")
"github.com/skycoin/skycoin/src/util/logging"
)

// Client is used by skywire apps.
type Client struct {
PK cipher.PubKey
pid ProcID
sockAddr string
conn net.Conn
session *yamux.Session
logger *logging.Logger
lm *listenersManager
isListening int32
PK cipher.PubKey
pid ProcID
rpc ServerRPCClient
logger *logging.Logger
}

// NewClient creates a new Client. The Client needs to be provided with:
// - localPK: The local public key of the parent skywire visor.
// - pid: The procID assigned for the process that Client is being used by.
// - sockAddr: The socket address to connect to Server.
func NewClient(localPK cipher.PubKey, pid ProcID, sockAddr string, l *logging.Logger) (*Client, error) {
conn, err := net.Dial("unix", sockAddr)
if err != nil {
return nil, errors.Wrap(err, "error connecting app server")
}

session, err := yamux.Client(conn, nil)
if err != nil {
return nil, errors.Wrap(err, "error opening yamux session")
}

lm := newListenersManager(l, pid, localPK)

func NewClient(localPK cipher.PubKey, pid ProcID, rpc *rpc.Client, l *logging.Logger) *Client {
return &Client{
PK: localPK,
pid: pid,
sockAddr: sockAddr,
conn: conn,
session: session,
lm: lm,
}, nil
PK: localPK,
pid: pid,
rpc: newServerRPCClient(rpc),
logger: l,
}
}

func (c *Client) Dial(addr routing.Addr) (net.Conn, error) {
stream, err := c.session.Open()
func (c *Client) Dial(remote routing.Addr) (*Conn, error) {
connID, err := c.rpc.Dial(remote)
if err != nil {
return nil, errors.Wrap(err, "error opening stream")
return nil, err
}

err = dialHS(stream, c.pid, routing.Loop{
Local: routing.Addr{
conn := &Conn{
id: connID,
rpc: c.rpc,
// TODO: port?
local: routing.Addr{
PubKey: c.PK,
},
Remote: addr,
})
if err != nil {
return nil, errors.Wrap(err, "error performing Dial HS")
remote: remote,
}

return stream, nil
return conn, nil
}

func (c *Client) Listen(port routing.Port) (net.Listener, error) {
if err := c.lm.reserveListener(port); err != nil {
return nil, errors.Wrap(err, "error reserving listener")
}

stream, err := c.session.Open()
if err != nil {
return nil, errors.Wrap(err, "error opening stream")
}

func (c *Client) Listen(port routing.Port) (*Listener, error) {
local := routing.Addr{
PubKey: c.PK,
Port: port,
}

err = listenHS(stream, c.pid, local)
if err != nil {
return nil, errors.Wrap(err, "error performing Listen HS")
}

c.lm.listen(c.session)

l := newListener(local, c.lm, c.pid, c.stopListening, c.logger)
if err := c.lm.set(port, l); err != nil {
return nil, errors.Wrap(err, "error setting listener")
}

return l, nil
}

func (c *Client) listen() error {
for {
stream, err := c.session.Accept()
if err != nil {
return errors.Wrap(err, "error accepting stream")
}

hsFrame, err := readHSFrame(stream)
if err != nil {
c.logger.WithError(err).Error("error reading HS frame")
continue
}

if hsFrame.FrameType() != HSFrameTypeDMSGDial {
c.logger.WithError(ErrWrongHSFrameTypeReceived).Error("on listening for Dial")
continue
}

// TODO: handle field get gracefully
remotePort := routing.Port(binary.BigEndian.Uint16(hsFrame[HSFrameHeaderLen+HSFramePKLen*2+HSFramePortLen:]))
if err := c.lm.addConn(remotePort, stream); err != nil {
c.logger.WithError(err).Error("failed to accept")
continue
}

localPort := routing.Port(binary.BigEndian.Uint16(hsFrame[HSFrameHeaderLen+HSFramePKLen:]))

var localPK cipher.PubKey
copy(localPK[:], hsFrame[HSFrameHeaderLen:HSFrameHeaderLen+HSFramePKLen])

respHSFrame := NewHSFrameDMSGAccept(c.pid, routing.Loop{
Local: routing.Addr{
PubKey: c.PK,
Port: remotePort,
},
Remote: routing.Addr{
PubKey: localPK,
Port: localPort,
},
})

if _, err := stream.Write(respHSFrame); err != nil {
c.logger.WithError(err).Error("error responding with DmsgAccept")
continue
}
}
}

func (c *Client) stopListening(port routing.Port) error {
stream, err := c.session.Open()
lisID, err := c.rpc.Listen(local)
if err != nil {
return errors.Wrap(err, "error opening stream")
}

addr := routing.Addr{
PubKey: c.PK,
Port: port,
}

hsFrame := NewHSFrameDMSGStopListening(c.pid, addr)
if _, err := stream.Write(hsFrame); err != nil {
return errors.Wrap(err, "error writing HS frame")
return nil, err
}

if err := stream.Close(); err != nil {
return errors.Wrap(err, "error closing stream")
listener := &Listener{
id: lisID,
rpc: c.rpc,
addr: local,
}

return nil
return listener, nil
}
23 changes: 0 additions & 23 deletions pkg/app2/client_conn.go

This file was deleted.

34 changes: 34 additions & 0 deletions pkg/app2/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package app2

import (
"net"

"github.com/skycoin/skywire/pkg/routing"
)

type Conn struct {
id uint16
rpc ConnRPCClient
local routing.Addr
remote routing.Addr
}

func (c *Conn) Read(b []byte) (int, error) {
return c.rpc.Read(c.id, b)
}

func (c *Conn) Write(b []byte) (int, error) {
return c.rpc.Write(c.id, b)
}

func (c *Conn) Close() error {
return c.rpc.CloseConn(c.id)
}

func (c *Conn) LocalAddr() net.Addr {
return c.local
}

func (c *Conn) RemoteAddr() net.Addr {
return c.remote
}
83 changes: 83 additions & 0 deletions pkg/app2/conns_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package app2

import (
"fmt"
"net"
"sync"

"github.com/pkg/errors"
)

type connsManager struct {
conns map[uint16]net.Conn
mx sync.RWMutex
lstID uint16
}

func newConnsManager() *connsManager {
return &connsManager{
conns: make(map[uint16]net.Conn),
}
}

func (m *connsManager) nextID() (*uint16, error) {
m.mx.Lock()

connID := m.lstID + 1
for ; connID < m.lstID; connID++ {
if _, ok := m.conns[connID]; !ok {
break
}
}

if connID == m.lstID {
m.mx.Unlock()
return nil, errors.New("no more available conns")
}

m.conns[connID] = nil
m.lstID = connID

m.mx.Unlock()
return &connID, nil
}

func (m *connsManager) getAndRemove(connID uint16) (net.Conn, error) {
m.mx.Lock()
conn, ok := m.conns[connID]
if !ok {
m.mx.Unlock()
return nil, fmt.Errorf("no conn with id %d", connID)
}

if conn == nil {
m.mx.Unlock()
return nil, fmt.Errorf("conn with id %d is not set", connID)
}

delete(m.conns, connID)

m.mx.Unlock()
return conn, nil
}

func (m *connsManager) set(connID uint16, conn net.Conn) error {
m.mx.Lock()

if c, ok := m.conns[connID]; ok && c != nil {
m.mx.Unlock()
return errors.New("conn already exists")
}

m.conns[connID] = conn

m.mx.Unlock()
return nil
}

func (m *connsManager) get(connID uint16) (net.Conn, bool) {
m.mx.RLock()
conn, ok := m.conns[connID]
m.mx.RUnlock()
return conn, ok
}
Loading

0 comments on commit f374c39

Please sign in to comment.