From 8e096b0c2479abe35532a2f0e9ed8c79a2744f64 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Fri, 18 Oct 2019 00:52:36 +0300 Subject: [PATCH] Refactor --- pkg/app2/app.go | 88 -------------------------- pkg/app2/app_manager.go | 80 ------------------------ pkg/app2/{ => appserver}/config.go | 4 +- pkg/app2/{ => appserver}/key.go | 2 +- pkg/app2/appserver/proc.go | 99 ++++++++++++++++++++++++++++++ pkg/app2/appserver/proc_manager.go | 74 ++++++++++++++++++++++ pkg/app2/appserver/server.go | 5 +- pkg/app2/proc.go | 74 ---------------------- 8 files changed, 177 insertions(+), 249 deletions(-) delete mode 100644 pkg/app2/app.go delete mode 100644 pkg/app2/app_manager.go rename pkg/app2/{ => appserver}/config.go (73%) rename pkg/app2/{ => appserver}/key.go (93%) create mode 100644 pkg/app2/appserver/proc.go create mode 100644 pkg/app2/appserver/proc_manager.go delete mode 100644 pkg/app2/proc.go diff --git a/pkg/app2/app.go b/pkg/app2/app.go deleted file mode 100644 index 405c90e8fa..0000000000 --- a/pkg/app2/app.go +++ /dev/null @@ -1,88 +0,0 @@ -package app2 - -import ( - "fmt" - - "github.com/skycoin/skycoin/src/util/logging" - - "github.com/skycoin/skywire/pkg/app2/appserver" -) - -// App defines a skywire application. It encapsulates -// running process and the RPC server to communicate with -// the app. -type App struct { - key Key - config Config - log *logging.Logger - proc *Proc - rpcS *appserver.Server -} - -// New constructs `App`. -func New(log *logging.Logger, c Config, args []string) *App { - k := GenerateAppKey() - - p := NewProc(c, args, k) - - return &App{ - key: k, - config: c, - log: log, - proc: p, - } -} - -// PID returns app PID. -func (a *App) PID() ProcID { - return a.proc.ID() -} - -// Run sets the app running. It starts app process -// and sets up the RPC server. -func (a *App) Run() error { - appKey := GenerateAppKey() - - rpcS, err := appserver.New(logging.MustGetLogger(fmt.Sprintf("app_rpc_server_%s", appKey)), - a.config.SockFile, appKey) - if err != nil { - return err - } - - a.rpcS = rpcS - - go func() { - if err := a.rpcS.ListenAndServe(); err != nil { - a.log.WithError(err).Error("error serving RPC") - } - }() - - if err := a.proc.Run(); err != nil { - a.closeRPCServer() - return err - } - - return nil -} - -// Stop stops application (process and the RPC server). -func (a *App) Stop() error { - a.closeRPCServer() - - return a.proc.Stop() -} - -// Wait waits for the app to exit. Shuts down the -// RPC server. -func (a *App) Wait() error { - err := a.proc.Wait() - a.closeRPCServer() - return err -} - -// closeRPCServer closes RPC server and logs error if any. -func (a *App) closeRPCServer() { - if err := a.rpcS.Close(); err != nil { - a.log.WithError(err).Error("error closing RPC server") - } -} diff --git a/pkg/app2/app_manager.go b/pkg/app2/app_manager.go deleted file mode 100644 index 499246d7e1..0000000000 --- a/pkg/app2/app_manager.go +++ /dev/null @@ -1,80 +0,0 @@ -package app2 - -import ( - "sync" - - "github.com/pkg/errors" -) - -var ( - // ErrAppExists is returned when trying to add - // app to the manager which already exists. - ErrAppExists = errors.New("app with such name already exists") -) - -// AppManager allows to store and retrieve skywire apps. -type AppManager struct { - apps map[string]*App - mx sync.RWMutex -} - -// NewAppManager constructs `AppManager`. -func NewAppManager() *AppManager { - return &AppManager{} -} - -// Add adds app `a` to the manager instance. -func (m *AppManager) Add(a *App) error { - m.mx.Lock() - defer m.mx.Unlock() - - if _, ok := m.apps[a.config.Name]; ok { - return ErrAppExists - } - - m.apps[a.config.Name] = a - - return nil -} - -// App gets the app from the manager if it exists. Returns bool -// flag to indicate operation success. -func (m *AppManager) App(name string) (*App, bool) { - m.mx.RLock() - defer m.mx.RUnlock() - - a, ok := m.apps[name] - - return a, ok -} - -// Exists checks whether app exists in the manager instance. -func (m *AppManager) Exists(name string) bool { - m.mx.RLock() - defer m.mx.RUnlock() - - _, ok := m.apps[name] - - return ok -} - -// Remove removes app with the name `name` from the manager instance. -func (m *AppManager) Remove(name string) { - m.mx.Lock() - defer m.mx.Unlock() - - delete(m.apps, name) -} - -// Range allows to iterate over stored apps. Calls `next` on each iteration. -// Stops execution once `next` returns false. -func (m *AppManager) Range(next func(name string, app *App) bool) { - m.mx.RLock() - defer m.mx.RUnlock() - - for name, app := range m.apps { - if !next(name, app) { - return - } - } -} diff --git a/pkg/app2/config.go b/pkg/app2/appserver/config.go similarity index 73% rename from pkg/app2/config.go rename to pkg/app2/appserver/config.go index 4bda661289..0fa82db446 100644 --- a/pkg/app2/config.go +++ b/pkg/app2/appserver/config.go @@ -1,6 +1,6 @@ -package app2 +package appserver -// Config defines configuration parameters for `App`. +// Config defines configuration parameters for `Proc`. type Config struct { Name string `json:"name"` Version string `json:"version"` diff --git a/pkg/app2/key.go b/pkg/app2/appserver/key.go similarity index 93% rename from pkg/app2/key.go rename to pkg/app2/appserver/key.go index b34a0c9c87..dbd51fb5fe 100644 --- a/pkg/app2/key.go +++ b/pkg/app2/appserver/key.go @@ -1,4 +1,4 @@ -package app2 +package appserver import "github.com/skycoin/dmsg/cipher" diff --git a/pkg/app2/appserver/proc.go b/pkg/app2/appserver/proc.go new file mode 100644 index 0000000000..758312b0f7 --- /dev/null +++ b/pkg/app2/appserver/proc.go @@ -0,0 +1,99 @@ +package appserver + +import ( + "fmt" + "os/exec" + "path/filepath" + + "github.com/skycoin/skycoin/src/util/logging" +) + +// Proc is a wrapper for a skywire app. Encapsulates +// the running proccess itself and the RPC server for +// app/visor communication. +type Proc struct { + key Key + config Config + log *logging.Logger + rpcS *Server + cmd *exec.Cmd +} + +// NewProc constructs `Proc`. +func NewProc(log *logging.Logger, c Config, args []string) (*Proc, error) { + key := GenerateAppKey() + + binaryPath := getBinaryPath(c.BinaryDir, c.Name, c.Version) + + const ( + appKeyEnvFormat = "APP_KEY=%s" + sockFileEnvFormat = "SW_UNIX=%s" + ) + + env := make([]string, 0, 2) + env = append(env, fmt.Sprintf(appKeyEnvFormat, key)) + env = append(env, fmt.Sprintf(sockFileEnvFormat, c.SockFile)) + + cmd := exec.Command(binaryPath, args...) // nolint:gosec + + cmd.Env = env + cmd.Dir = c.WorkDir + + rpcS, err := New(logging.MustGetLogger(fmt.Sprintf("app_rpc_server_%s", key)), + c.SockFile, key) + if err != nil { + return nil, err + } + + return &Proc{ + key: key, + config: c, + log: log, + cmd: cmd, + rpcS: rpcS, + }, nil +} + +// Run runs the application. It starts the process and runs the +// RPC communication server. +func (p *Proc) Run() error { + go func() { + if err := p.rpcS.ListenAndServe(); err != nil { + p.log.WithError(err).Error("error serving RPC") + } + }() + + if err := p.cmd.Run(); err != nil { + p.closeRPCServer() + return err + } + + return nil +} + +// Stop stops the applicacation. It stops the process and +// shuts down the RPC server. +func (p *Proc) Stop() error { + p.closeRPCServer() + return p.cmd.Process.Kill() +} + +// Wait shuts down the RPC server and waits for the +// application cmd to exit. +func (p *Proc) Wait() error { + p.closeRPCServer() + return p.cmd.Wait() +} + +// closeRPCServer closes RPC server and logs error if any. +func (p *Proc) closeRPCServer() { + if err := p.rpcS.Close(); err != nil { + p.log.WithError(err).Error("error closing RPC server") + } +} + +// getBinaryPath formats binary path using app dir, name and version. +func getBinaryPath(dir, name, ver string) string { + const binaryNameFormat = "%s.v%s" + return filepath.Join(dir, fmt.Sprintf(binaryNameFormat, name, ver)) +} diff --git a/pkg/app2/appserver/proc_manager.go b/pkg/app2/appserver/proc_manager.go new file mode 100644 index 0000000000..d69f06a6a4 --- /dev/null +++ b/pkg/app2/appserver/proc_manager.go @@ -0,0 +1,74 @@ +package appserver + +import ( + "sync" + + "github.com/pkg/errors" + + "github.com/skycoin/skycoin/src/util/logging" +) + +// ProcManager allows to manage skywire applications. +type ProcManager struct { + procs map[string]*Proc + mx sync.RWMutex +} + +// NewProcManager constructs `ProcManager`. +func NewProcManager() *ProcManager { + return &ProcManager{ + procs: make(map[string]*Proc), + } +} + +// Run runs the application according to its config and additional args. +func (m *ProcManager) Run(log *logging.Logger, c Config, args []string) error { + // TODO: pass another logging instance? + p, err := NewProc(log, c, args) + if err != nil { + return err + } + + if err := p.Run(); err != nil { + return err + } + + m.mx.Lock() + m.procs[c.Name] = p + m.mx.Unlock() + + return nil +} + +// Stop stops the application. +func (m *ProcManager) Stop(name string) error { + p, err := m.pop(name) + if err != nil { + return err + } + + return p.Stop() +} + +// Wait waits for the application to exit. +func (m *ProcManager) Wait(name string) error { + p, err := m.pop(name) + if err != nil { + return err + } + + return p.Wait() +} + +func (m *ProcManager) pop(name string) (*Proc, error) { + m.mx.Lock() + p, ok := m.procs[name] + if !ok { + m.mx.Unlock() + return nil, errors.New("no such app") + } + delete(m.procs, name) + m.mx.Unlock() + + return p, nil +} diff --git a/pkg/app2/appserver/server.go b/pkg/app2/appserver/server.go index 2f9838caca..f442e20d47 100644 --- a/pkg/app2/appserver/server.go +++ b/pkg/app2/appserver/server.go @@ -8,8 +8,6 @@ import ( "github.com/pkg/errors" "github.com/skycoin/skycoin/src/util/logging" - - "github.com/skycoin/skywire/pkg/app2" ) // Server is a server for app/visor communication. @@ -18,13 +16,12 @@ type Server struct { lis net.Listener sockFile string rpcS *rpc.Server - apps map[app2.Key]*app2.App done sync.WaitGroup stopCh chan struct{} } // NewServer constructs server. -func New(log *logging.Logger, sockFile string, appKey app2.Key) (*Server, error) { +func New(log *logging.Logger, sockFile string, appKey Key) (*Server, error) { rpcS := rpc.NewServer() gateway := newRPCGateway(logging.MustGetLogger(fmt.Sprintf("rpc_server_%s", appKey))) if err := rpcS.RegisterName(string(appKey), gateway); err != nil { diff --git a/pkg/app2/proc.go b/pkg/app2/proc.go deleted file mode 100644 index 7c730955af..0000000000 --- a/pkg/app2/proc.go +++ /dev/null @@ -1,74 +0,0 @@ -package app2 - -import ( - "fmt" - "os/exec" - "path/filepath" - "sync" -) - -// Proc is a wrapper for skywire app process. -type Proc struct { - id ProcID - cmd *exec.Cmd - mx sync.RWMutex -} - -// NewProc constructs `Proc`. -func NewProc(c Config, args []string, key Key) *Proc { - binaryPath := getBinaryPath(c.BinaryDir, c.Name, c.Version) - - const ( - appKeyEnvFormat = "APP_KEY=%s" - sockFileEnvFormat = "SW_UNIX=%s" - ) - - env := make([]string, 0, 2) - env = append(env, fmt.Sprintf(appKeyEnvFormat, key)) - env = append(env, fmt.Sprintf(sockFileEnvFormat, c.SockFile)) - - cmd := exec.Command(binaryPath, args...) // nolint:gosec - - cmd.Env = env - cmd.Dir = c.WorkDir - - return &Proc{ - cmd: cmd, - } -} - -// ID returns pid of the app. -func (p *Proc) ID() ProcID { - p.mx.RLock() - id := p.id - p.mx.RUnlock() - return id -} - -// Run runs the app process. -func (p *Proc) Run() error { - if err := p.cmd.Run(); err != nil { - return err - } - - p.mx.Lock() - p.id = ProcID(p.cmd.Process.Pid) - p.mx.Unlock() - - return nil -} - -// Stop stops the app process. -func (p *Proc) Stop() error { - return p.cmd.Process.Kill() -} - -// Wait waits for the app process to exit. -func (p *Proc) Wait() error { - return p.cmd.Wait() -} - -func getBinaryPath(dir, name, ver string) string { - const binaryNameFormat = "%s.v%s" - return filepath.Join(dir, fmt.Sprintf(binaryNameFormat, name, ver)) -}