From 608cc38824c9f8d530278b0c1bde3a714124ed65 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Thu, 17 Oct 2019 23:11:09 +0300 Subject: [PATCH] Refactor a bit --- pkg/app2/app.go | 21 ++++++++++++++++++--- pkg/app2/app_manager.go | 13 ++++++++++++- pkg/app2/appnet/addr.go | 2 ++ pkg/app2/appnet/skywire_networker.go | 1 - pkg/app2/appserver/rpc_gateway.go | 3 +-- pkg/app2/appserver/server.go | 6 ++++-- pkg/app2/client.go | 4 +--- pkg/app2/config.go | 2 +- pkg/app2/errors.go | 6 ------ pkg/app2/idmanager/util.go | 4 ++-- pkg/app2/key.go | 3 +++ pkg/app2/listener.go | 4 ++-- pkg/app2/proc.go | 6 ++++++ pkg/app2/rpc_client.go | 3 +-- 14 files changed, 53 insertions(+), 25 deletions(-) diff --git a/pkg/app2/app.go b/pkg/app2/app.go index f7f8c8504c..405c90e8fa 100644 --- a/pkg/app2/app.go +++ b/pkg/app2/app.go @@ -3,32 +3,43 @@ package app2 import ( "fmt" - "github.com/skycoin/skywire/pkg/app2/appserver" - "github.com/skycoin/skycoin/src/util/logging" + + "github.com/skycoin/skywire/pkg/app2/appserver" ) +// App defines a skywire application. It encapsulates +// running process and the RPC server to communicate with +// the app. type App struct { + key Key config Config log *logging.Logger proc *Proc rpcS *appserver.Server } +// New constructs `App`. func New(log *logging.Logger, c Config, args []string) *App { - p := NewProc(c, args) + k := GenerateAppKey() + + p := NewProc(c, args, k) return &App{ + key: k, config: c, log: log, proc: p, } } +// PID returns app PID. func (a *App) PID() ProcID { return a.proc.ID() } +// Run sets the app running. It starts app process +// and sets up the RPC server. func (a *App) Run() error { appKey := GenerateAppKey() @@ -54,18 +65,22 @@ func (a *App) Run() error { return nil } +// Stop stops application (process and the RPC server). func (a *App) Stop() error { a.closeRPCServer() return a.proc.Stop() } +// Wait waits for the app to exit. Shuts down the +// RPC server. func (a *App) Wait() error { err := a.proc.Wait() a.closeRPCServer() return err } +// closeRPCServer closes RPC server and logs error if any. func (a *App) closeRPCServer() { if err := a.rpcS.Close(); err != nil { a.log.WithError(err).Error("error closing RPC server") diff --git a/pkg/app2/app_manager.go b/pkg/app2/app_manager.go index 9b19ea80b2..499246d7e1 100644 --- a/pkg/app2/app_manager.go +++ b/pkg/app2/app_manager.go @@ -7,18 +7,23 @@ import ( ) var ( - ErrAppExists = errors.New("app with such pid already exists") + // ErrAppExists is returned when trying to add + // app to the manager which already exists. + ErrAppExists = errors.New("app with such name already exists") ) +// AppManager allows to store and retrieve skywire apps. type AppManager struct { apps map[string]*App mx sync.RWMutex } +// NewAppManager constructs `AppManager`. func NewAppManager() *AppManager { return &AppManager{} } +// Add adds app `a` to the manager instance. func (m *AppManager) Add(a *App) error { m.mx.Lock() defer m.mx.Unlock() @@ -32,6 +37,8 @@ func (m *AppManager) Add(a *App) error { return nil } +// App gets the app from the manager if it exists. Returns bool +// flag to indicate operation success. func (m *AppManager) App(name string) (*App, bool) { m.mx.RLock() defer m.mx.RUnlock() @@ -41,6 +48,7 @@ func (m *AppManager) App(name string) (*App, bool) { return a, ok } +// Exists checks whether app exists in the manager instance. func (m *AppManager) Exists(name string) bool { m.mx.RLock() defer m.mx.RUnlock() @@ -50,6 +58,7 @@ func (m *AppManager) Exists(name string) bool { return ok } +// Remove removes app with the name `name` from the manager instance. func (m *AppManager) Remove(name string) { m.mx.Lock() defer m.mx.Unlock() @@ -57,6 +66,8 @@ func (m *AppManager) Remove(name string) { delete(m.apps, name) } +// Range allows to iterate over stored apps. Calls `next` on each iteration. +// Stops execution once `next` returns false. func (m *AppManager) Range(next func(name string, app *App) bool) { m.mx.RLock() defer m.mx.RUnlock() diff --git a/pkg/app2/appnet/addr.go b/pkg/app2/appnet/addr.go index 2f2cd77046..85b51faa31 100644 --- a/pkg/app2/appnet/addr.go +++ b/pkg/app2/appnet/addr.go @@ -12,6 +12,8 @@ import ( ) var ( + // ErrUnknownAddrType is returned when trying to convert the + // unknown addr type. ErrUnknownAddrType = errors.New("addr type is unknown") ) diff --git a/pkg/app2/appnet/skywire_networker.go b/pkg/app2/appnet/skywire_networker.go index 56bd28e9f1..3ec363b774 100644 --- a/pkg/app2/appnet/skywire_networker.go +++ b/pkg/app2/appnet/skywire_networker.go @@ -7,7 +7,6 @@ import ( "sync/atomic" "github.com/pkg/errors" - "github.com/skycoin/dmsg/netutil" "github.com/skycoin/skycoin/src/util/logging" diff --git a/pkg/app2/appserver/rpc_gateway.go b/pkg/app2/appserver/rpc_gateway.go index a59d1ab3b0..e2e2d5710f 100644 --- a/pkg/app2/appserver/rpc_gateway.go +++ b/pkg/app2/appserver/rpc_gateway.go @@ -5,11 +5,10 @@ import ( "net" "github.com/pkg/errors" - "github.com/skycoin/skywire/pkg/app2/idmanager" - "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/skywire/pkg/app2/appnet" + "github.com/skycoin/skywire/pkg/app2/idmanager" "github.com/skycoin/skywire/pkg/routing" ) diff --git a/pkg/app2/appserver/server.go b/pkg/app2/appserver/server.go index 7d9f9bc549..2f9838caca 100644 --- a/pkg/app2/appserver/server.go +++ b/pkg/app2/appserver/server.go @@ -7,10 +7,9 @@ import ( "sync" "github.com/pkg/errors" + "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/skywire/pkg/app2" - - "github.com/skycoin/skycoin/src/util/logging" ) // Server is a server for app/visor communication. @@ -60,12 +59,15 @@ func (s *Server) ListenAndServe() error { } } +// Close closes the server. func (s *Server) Close() error { err := s.lis.Close() close(s.stopCh) + s.done.Wait() return err } +// serveConn serves RPC on a single connection. func (s *Server) serveConn(conn net.Conn) { go s.rpcS.ServeConn(conn) <-s.stopCh diff --git a/pkg/app2/client.go b/pkg/app2/client.go index 1b63156656..4b56f36e3a 100644 --- a/pkg/app2/client.go +++ b/pkg/app2/client.go @@ -5,13 +5,11 @@ import ( "net/rpc" "github.com/pkg/errors" - - "github.com/skycoin/skywire/pkg/app2/idmanager" - "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/skywire/pkg/app2/appnet" + "github.com/skycoin/skywire/pkg/app2/idmanager" "github.com/skycoin/skywire/pkg/routing" ) diff --git a/pkg/app2/config.go b/pkg/app2/config.go index 33b942402a..4bda661289 100644 --- a/pkg/app2/config.go +++ b/pkg/app2/config.go @@ -1,6 +1,6 @@ package app2 -// Config defines configuration parameters for App +// Config defines configuration parameters for `App`. type Config struct { Name string `json:"name"` Version string `json:"version"` diff --git a/pkg/app2/errors.go b/pkg/app2/errors.go index 39b50ecd1a..3dd1920481 100644 --- a/pkg/app2/errors.go +++ b/pkg/app2/errors.go @@ -4,12 +4,6 @@ import ( "errors" ) -var ( - // ErrPortAlreadyBound is being returned when trying to bind to the port - // which is already bound to. - ErrPortAlreadyBound = errors.New("port is already bound") -) - var ( // errMethodNotImplemented serves as a return value for non-implemented funcs (stubs). errMethodNotImplemented = errors.New("method not implemented") diff --git a/pkg/app2/idmanager/util.go b/pkg/app2/idmanager/util.go index 48c856c7cb..d3a9e33420 100644 --- a/pkg/app2/idmanager/util.go +++ b/pkg/app2/idmanager/util.go @@ -5,7 +5,7 @@ import ( "net" ) -// assertListener asserts that `v` is of type `net.Listener`. +// AssertListener asserts that `v` is of type `net.Listener`. func AssertListener(v interface{}) (net.Listener, error) { lis, ok := v.(net.Listener) if !ok { @@ -15,7 +15,7 @@ func AssertListener(v interface{}) (net.Listener, error) { return lis, nil } -// assertConn asserts that `v` is of type `net.Conn`. +// AssertConn asserts that `v` is of type `net.Conn`. func AssertConn(v interface{}) (net.Conn, error) { conn, ok := v.(net.Conn) if !ok { diff --git a/pkg/app2/key.go b/pkg/app2/key.go index 2b6ba6e590..b34a0c9c87 100644 --- a/pkg/app2/key.go +++ b/pkg/app2/key.go @@ -2,8 +2,11 @@ package app2 import "github.com/skycoin/dmsg/cipher" +// Key is an app key to authenticate within the +// app server. type Key string +// GenerateAppKey generates new app key. func GenerateAppKey() Key { raw, _ := cipher.GenerateKeyPair() return Key(raw.Hex()) diff --git a/pkg/app2/listener.go b/pkg/app2/listener.go index 6cfb00eb66..b05ca11d0b 100644 --- a/pkg/app2/listener.go +++ b/pkg/app2/listener.go @@ -5,10 +5,10 @@ import ( "net" "sync" - "github.com/skycoin/skywire/pkg/app2/idmanager" - "github.com/skycoin/skycoin/src/util/logging" + "github.com/skycoin/skywire/pkg/app2/appnet" + "github.com/skycoin/skywire/pkg/app2/idmanager" ) // Listener is a listener for app server connections. diff --git a/pkg/app2/proc.go b/pkg/app2/proc.go index 5e6cd9dfa7..7c730955af 100644 --- a/pkg/app2/proc.go +++ b/pkg/app2/proc.go @@ -7,12 +7,14 @@ import ( "sync" ) +// Proc is a wrapper for skywire app process. type Proc struct { id ProcID cmd *exec.Cmd mx sync.RWMutex } +// NewProc constructs `Proc`. func NewProc(c Config, args []string, key Key) *Proc { binaryPath := getBinaryPath(c.BinaryDir, c.Name, c.Version) @@ -35,6 +37,7 @@ func NewProc(c Config, args []string, key Key) *Proc { } } +// ID returns pid of the app. func (p *Proc) ID() ProcID { p.mx.RLock() id := p.id @@ -42,6 +45,7 @@ func (p *Proc) ID() ProcID { return id } +// Run runs the app process. func (p *Proc) Run() error { if err := p.cmd.Run(); err != nil { return err @@ -54,10 +58,12 @@ func (p *Proc) Run() error { return nil } +// Stop stops the app process. func (p *Proc) Stop() error { return p.cmd.Process.Kill() } +// Wait waits for the app process to exit. func (p *Proc) Wait() error { return p.cmd.Wait() } diff --git a/pkg/app2/rpc_client.go b/pkg/app2/rpc_client.go index 4a80b228e9..18e8dc2892 100644 --- a/pkg/app2/rpc_client.go +++ b/pkg/app2/rpc_client.go @@ -4,9 +4,8 @@ import ( "fmt" "net/rpc" - "github.com/skycoin/skywire/pkg/app2/appserver" - "github.com/skycoin/skywire/pkg/app2/appnet" + "github.com/skycoin/skywire/pkg/app2/appserver" "github.com/skycoin/skywire/pkg/routing" )