Skip to content

Commit

Permalink
Refactor a lot
Browse files Browse the repository at this point in the history
  • Loading branch information
Sir Darkrengarius authored and Sir Darkrengarius committed Oct 10, 2019
1 parent 8104ad3 commit 1fe21cb
Show file tree
Hide file tree
Showing 23 changed files with 790 additions and 187 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/gorilla/securecookie v1.1.1
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.8.1
github.com/pkg/profile v1.3.0
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/common v0.7.0
Expand Down
49 changes: 49 additions & 0 deletions pkg/app2/appserver/proc_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package appserver

import (
"os"
"os/exec"
"sync"
"syscall"
)

type ProcManager struct {
processes []*os.Process
mu sync.Mutex
}

func NewProcManager() *ProcManager {
return &ProcManager{processes: make([]*os.Process, 0)}
}

func (m *ProcManager) Start(cmd *exec.Cmd) (int, error) {
if err := cmd.Start(); err != nil {
return -1, err
}
m.mu.Lock()
m.processes = append(m.processes, cmd.Process)
m.mu.Unlock()

return cmd.Process.Pid, nil
}

func (m *ProcManager) Stop(pid int) (err error) {
m.mu.Lock()
defer m.mu.Unlock()

for _, process := range m.processes {
if process.Pid != pid {
continue
}

if sigErr := process.Signal(syscall.SIGKILL); sigErr != nil && err == nil {
err = sigErr
}
}

return err
}

func (m *ProcManager) Wait(cmd *exec.Cmd) error {
return cmd.Wait()
}
42 changes: 22 additions & 20 deletions pkg/app2/rpc_gateway.go → pkg/app2/appserver/rpc_gateway.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package app2
package appserver

import (
"errors"
"fmt"
"net"

"github.com/pkg/errors"
"github.com/skycoin/skywire/pkg/app2/idmanager"

"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/pkg/app2/appnet"
Expand All @@ -13,16 +15,16 @@ import (

// RPCGateway is a RPC interface for the app server.
type RPCGateway struct {
lm *idManager // contains listeners associated with their IDs
cm *idManager // contains connections associated with their IDs
lm *idmanager.Manager // contains listeners associated with their IDs
cm *idmanager.Manager // contains connections associated with their IDs
log *logging.Logger
}

// newRPCGateway constructs new server RPC interface.
func newRPCGateway(log *logging.Logger) *RPCGateway {
return &RPCGateway{
lm: newIDManager(),
cm: newIDManager(),
lm: idmanager.New(),
cm: idmanager.New(),
log: log,
}
}
Expand All @@ -35,7 +37,7 @@ type DialResp struct {

// Dial dials to the remote.
func (r *RPCGateway) Dial(remote *appnet.Addr, resp *DialResp) error {
reservedConnID, free, err := r.cm.reserveNextID()
reservedConnID, free, err := r.cm.ReserveNextID()
if err != nil {
return err
}
Expand All @@ -52,7 +54,7 @@ func (r *RPCGateway) Dial(remote *appnet.Addr, resp *DialResp) error {
return err
}

if err := r.cm.set(*reservedConnID, wrappedConn); err != nil {
if err := r.cm.Set(*reservedConnID, wrappedConn); err != nil {
if err := wrappedConn.Close(); err != nil {
r.log.WithError(err).Error("error closing conn")
}
Expand All @@ -71,7 +73,7 @@ func (r *RPCGateway) Dial(remote *appnet.Addr, resp *DialResp) error {

// Listen starts listening.
func (r *RPCGateway) Listen(local *appnet.Addr, lisID *uint16) error {
nextLisID, free, err := r.lm.reserveNextID()
nextLisID, free, err := r.lm.ReserveNextID()
if err != nil {
return err
}
Expand All @@ -82,7 +84,7 @@ func (r *RPCGateway) Listen(local *appnet.Addr, lisID *uint16) error {
return err
}

if err := r.lm.set(*nextLisID, l); err != nil {
if err := r.lm.Set(*nextLisID, l); err != nil {
if err := l.Close(); err != nil {
r.log.WithError(err).Error("error closing listener")
}
Expand All @@ -109,7 +111,7 @@ func (r *RPCGateway) Accept(lisID *uint16, resp *AcceptResp) error {
return err
}

connID, free, err := r.cm.reserveNextID()
connID, free, err := r.cm.ReserveNextID()
if err != nil {
return err
}
Expand All @@ -126,7 +128,7 @@ func (r *RPCGateway) Accept(lisID *uint16, resp *AcceptResp) error {
return err
}

if err := r.cm.set(*connID, wrappedConn); err != nil {
if err := r.cm.Set(*connID, wrappedConn); err != nil {
if err := wrappedConn.Close(); err != nil {
r.log.WithError(err).Error("error closing DMSG transport")
}
Expand Down Expand Up @@ -218,41 +220,41 @@ func (r *RPCGateway) CloseListener(lisID *uint16, _ *struct{}) error {
// popListener gets listener from the manager by `lisID` and removes it.
// Handles type assertion.
func (r *RPCGateway) popListener(lisID uint16) (net.Listener, error) {
lisIfc, err := r.lm.pop(lisID)
lisIfc, err := r.lm.Pop(lisID)
if err != nil {
return nil, errors.Wrap(err, "no listener")
}

return assertListener(lisIfc)
return idmanager.AssertListener(lisIfc)
}

// popConn gets conn from the manager by `connID` and removes it.
// Handles type assertion.
func (r *RPCGateway) popConn(connID uint16) (net.Conn, error) {
connIfc, err := r.cm.pop(connID)
connIfc, err := r.cm.Pop(connID)
if err != nil {
return nil, errors.Wrap(err, "no conn")
}

return assertConn(connIfc)
return idmanager.AssertConn(connIfc)
}

// getListener gets listener from the manager by `lisID`. Handles type assertion.
func (r *RPCGateway) getListener(lisID uint16) (net.Listener, error) {
lisIfc, ok := r.lm.get(lisID)
lisIfc, ok := r.lm.Get(lisID)
if !ok {
return nil, fmt.Errorf("no listener with key %d", lisID)
}

return assertListener(lisIfc)
return idmanager.AssertListener(lisIfc)
}

// getConn gets conn from the manager by `connID`. Handles type assertion.
func (r *RPCGateway) getConn(connID uint16) (net.Conn, error) {
connIfc, ok := r.cm.get(connID)
connIfc, ok := r.cm.Get(connID)
if !ok {
return nil, fmt.Errorf("no conn with key %d", connID)
}

return assertConn(connIfc)
return idmanager.AssertConn(connIfc)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package app2
package appserver

import (
"context"
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestRPCGateway_Dial(t *testing.T) {

var resp DialResp
err := rpc.Dial(&dialAddr, &resp)
require.Equal(t, err, errNoMoreAvailableValues)
require.Equal(t, err, idmanager.errNoMoreAvailableValues)
})

t.Run("dial error", func(t *testing.T) {
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestRPCGateway_Listen(t *testing.T) {
var lisID uint16

err := rpc.Listen(&listenAddr, &lisID)
require.Equal(t, err, errNoMoreAvailableValues)
require.Equal(t, err, idmanager.errNoMoreAvailableValues)
})

t.Run("listen error", func(t *testing.T) {
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestRPCGateway_Accept(t *testing.T) {

var resp AcceptResp
err := rpc.Accept(&lisID, &resp)
require.Equal(t, err, errNoMoreAvailableValues)
require.Equal(t, err, idmanager.errNoMoreAvailableValues)
})

t.Run("error wrapping conn", func(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/app2/server.go → pkg/app2/appserver/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package app2
package appserver

import (
"fmt"
Expand All @@ -16,7 +16,7 @@ type Server struct {
}

// NewServer constructs server.
func NewServer(log *logging.Logger, sockFile string) *Server {
func New(log *logging.Logger, sockFile string) *Server {
return &Server{
log: log,
sockFile: sockFile,
Expand Down
41 changes: 26 additions & 15 deletions pkg/app2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package app2

import (
"net"
"net/rpc"

"github.com/pkg/errors"

"github.com/skycoin/skywire/pkg/app2/idmanager"

"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"
Expand All @@ -16,24 +21,30 @@ type Client struct {
pk cipher.PubKey
pid ProcID
rpc RPCClient
lm *idManager // contains listeners associated with their IDs
cm *idManager // contains connections associated with their IDs
lm *idmanager.Manager // contains listeners associated with their IDs
cm *idmanager.Manager // contains connections associated with their IDs
}

// NewClient creates a new `Client`. The `Client` needs to be provided with:
// - log: logger instance
// - localPK: The local public key of the parent skywire visor.
// - pid: The procID assigned for the process that Client is being used by.
// - rpc: RPC client to communicate with the server.
func NewClient(log *logging.Logger, localPK cipher.PubKey, pid ProcID, rpc RPCClient) *Client {
// - sockFile: unix socket file to connect to the app server.
// - appKey: application key to authenticate within app server.
func NewClient(log *logging.Logger, localPK cipher.PubKey, pid ProcID, sockFile, appKey string) (*Client, error) {
rpcCl, err := rpc.Dial("unix", sockFile)
if err != nil {
return nil, errors.Wrap(err, "error connectin to the app server")
}

return &Client{
log: log,
pk: localPK,
pid: pid,
rpc: rpc,
lm: newIDManager(),
cm: newIDManager(),
}
rpc: NewRPCClient(rpcCl, appKey),
lm: idmanager.New(),
cm: idmanager.New(),
}, nil
}

// Dial dials the remote node using `remote`.
Expand All @@ -54,7 +65,7 @@ func (c *Client) Dial(remote appnet.Addr) (net.Conn, error) {
remote: remote,
}

free, err := c.cm.add(connID, conn)
free, err := c.cm.Add(connID, conn)
if err != nil {
if err := conn.Close(); err != nil {
c.log.WithError(err).Error("error closing conn")
Expand Down Expand Up @@ -88,10 +99,10 @@ func (c *Client) Listen(n appnet.Type, port routing.Port) (net.Listener, error)
id: lisID,
rpc: c.rpc,
addr: local,
cm: newIDManager(),
cm: idmanager.New(),
}

freeLis, err := c.lm.add(lisID, listener)
freeLis, err := c.lm.Add(lisID, listener)
if err != nil {
if err := listener.Close(); err != nil {
c.log.WithError(err).Error("error closing listener")
Expand All @@ -111,8 +122,8 @@ func (c *Client) Listen(n appnet.Type, port routing.Port) (net.Listener, error)
// listeners and connections.
func (c *Client) Close() {
var listeners []net.Listener
c.lm.doRange(func(_ uint16, v interface{}) bool {
lis, err := assertListener(v)
c.lm.DoRange(func(_ uint16, v interface{}) bool {
lis, err := idmanager.AssertListener(v)
if err != nil {
c.log.Error(err)
return true
Expand All @@ -123,8 +134,8 @@ func (c *Client) Close() {
})

var conns []net.Conn
c.cm.doRange(func(_ uint16, v interface{}) bool {
conn, err := assertConn(v)
c.cm.DoRange(func(_ uint16, v interface{}) bool {
conn, err := idmanager.AssertConn(v)
if err != nil {
c.log.Error(err)
return true
Expand Down
Loading

0 comments on commit 1fe21cb

Please sign in to comment.