Skip to content

Commit

Permalink
Adjust visor code to new ProcManager
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Oct 18, 2019
1 parent 7c6cd4c commit fc14d32
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 54 deletions.
70 changes: 61 additions & 9 deletions pkg/app2/appserver/proc_manager.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,72 @@
package appserver

import (
"fmt"
"os/exec"
"sync"

"github.com/skycoin/skywire/pkg/app2"

"github.com/pkg/errors"

"github.com/skycoin/skycoin/src/util/logging"
)

var (
errAppAlreadyExists = errors.New("app already exists")
)

// ProcManager allows to manage skywire applications.
type ProcManager struct {
log *logging.Logger
procs map[string]*Proc
mx sync.RWMutex
}

// NewProcManager constructs `ProcManager`.
func NewProcManager() *ProcManager {
func NewProcManager(log *logging.Logger) *ProcManager {
return &ProcManager{
log: log,
procs: make(map[string]*Proc),
}
}

// Run runs the application according to its config and additional args.
func (m *ProcManager) Run(log *logging.Logger, c Config, args []string) error {
// TODO: pass another logging instance?
func (m *ProcManager) Run(log *logging.Logger, c Config, args []string) (app2.ProcID, error) {
if m.Exists(c.Name) {
return 0, errAppAlreadyExists
}

p, err := NewProc(log, c, args)
if err != nil {
return err
return 0, err
}

if err := p.Run(); err != nil {
return err
return 0, err
}

m.mx.Lock()
if _, ok := m.procs[c.Name]; ok {
m.mx.Unlock()
if err := p.Stop(); err != nil {
m.log.WithError(err).Error("error stopping app")
}
return 0, errAppAlreadyExists
}
m.procs[c.Name] = p
m.mx.Unlock()

return nil
return app2.ProcID(p.cmd.Process.Pid), nil
}

// Exists check whether app exists in the manager instance.
func (m *ProcManager) Exists(name string) bool {
m.mx.RUnlock()
defer m.mx.RUnlock()

_, ok := m.procs[name]
return ok
}

// Stop stops the application.
Expand All @@ -57,18 +86,41 @@ func (m *ProcManager) Wait(name string) error {
return err
}

return p.Wait()
if err := p.Wait(); err != nil {
if _, ok := err.(*exec.ExitError); !ok {
err = fmt.Errorf("failed to run app executable: %s", err)
}

return err
}

return nil
}

// Range allows to iterate over running skywire apps. Calls `next` on
// each iteration. If `next` returns falls - stops iteration.
func (m *ProcManager) Range(next func(name string, proc *Proc) bool) {
m.mx.RLock()
defer m.mx.RUnlock()

for name, proc := range m.procs {
if !next(name, proc) {
break
}
}
}

// pop removes application from the manager instance and returns it.
func (m *ProcManager) pop(name string) (*Proc, error) {
m.mx.Lock()
defer m.mx.Unlock()

p, ok := m.procs[name]
if !ok {
m.mx.Unlock()
return nil, errors.New("no such app")
}

delete(m.procs, name)
m.mx.Unlock()

return p, nil
}
68 changes: 23 additions & 45 deletions pkg/visor/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package visor
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net"
Expand All @@ -20,9 +19,12 @@ import (
"time"

"github.com/skycoin/skywire/pkg/app2"
"github.com/skycoin/skywire/pkg/app2/appserver"

"github.com/skycoin/skywire/pkg/snet"

"github.com/pkg/errors"

"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/dmsg/noise"
Expand Down Expand Up @@ -96,16 +98,16 @@ type Node struct {
rpcListener net.Listener
rpcDialers []*noise.RPCClientDialer

appManager *app2.AppManager
procManager *appserver.ProcManager
}

// NewNode constructs new Node.
func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error) {
ctx := context.Background()

node := &Node{
config: config,
appManager: app2.NewAppManager(),
config: config,
procManager: appserver.NewProcManager(logging.MustGetLogger("proc_manager")),
}

node.Logger = masterLogger
Expand Down Expand Up @@ -329,20 +331,19 @@ func (node *Node) Close() (err error) {
}
}

apps := make(map[string]*app2.App)
node.appManager.Range(func(name string, app *app2.App) bool {
apps[name] = app
var procs []string
node.procManager.Range(func(name string, _ *appserver.Proc) bool {
procs = append(procs, name)
return true
})

for name, a := range apps {
node.appManager.Remove(name)
if err := a.Stop(); err != nil {
node.logger.WithError(err).Errorf("(%s) failed to stop app", a)
for _, name := range procs {
if err := node.procManager.Stop(name); err != nil {
node.logger.WithError(err).Errorf("(%s) failed to stop app", name)
break
}

node.logger.Infof("(%s) app stopped successfully", a)
node.logger.Infof("(%s) app stopped successfully", name)
}

if err = node.router.Close(); err != nil {
Expand All @@ -367,7 +368,7 @@ func (node *Node) Apps() []*AppState {
for _, app := range node.appsConf {
state := &AppState{app.App, app.AutoStart, app.Port, AppStatusStopped}

if node.appManager.Exists(app.App) {
if node.procManager.Exists(app.App) {
state.Status = AppStatusRunning
}

Expand Down Expand Up @@ -405,7 +406,7 @@ func (node *Node) SpawnApp(config *AppConfig, startCh chan<- struct{}) (err erro
return fmt.Errorf("can't bind to reserved port %d", config.Port)
}

appCfg := app2.Config{
appCfg := appserver.Config{
Name: config.App,
Version: config.Version,
BinaryDir: node.appsPath,
Expand All @@ -429,46 +430,28 @@ func (node *Node) SpawnApp(config *AppConfig, startCh chan<- struct{}) (err erro
cmd.Stderr = logger
*/

app := app2.New(
logging.MustGetLogger(fmt.Sprintf("app_%s", config.App)),
appCfg,
append([]string{filepath.Join(node.dir(), config.App)}, config.Args...),
)

if err := node.appManager.Add(app); err != nil {
return fmt.Errorf("app %s is already started", config.App)
}
defer node.appManager.Remove(appCfg.Name)

if err := app.Run(); err != nil {
return node.wrapAppErr(err)
pid, err := node.procManager.Run(logging.MustGetLogger(fmt.Sprintf("app_%s", config.App)),
appCfg, append([]string{filepath.Join(node.dir(), config.App)}, config.Args...))
if err != nil {
return fmt.Errorf("error running app %s: %v", config.App, err)
}

if startCh != nil {
startCh <- struct{}{}
}

pid := app.PID()
node.pidMu.Lock()
node.logger.Infof("storing app %s pid %d", config.App, pid)
node.persistPID(config.App, app.PID())
node.persistPID(config.App, pid)
node.pidMu.Unlock()

if err := app.Wait(); err != nil {
return node.wrapAppErr(err)
if err := node.procManager.Wait(config.App); err != nil {
return err
}

return nil
}

func (node *Node) wrapAppErr(err error) error {
if _, ok := err.(*exec.ExitError); !ok {
return fmt.Errorf("failed to run app executable: %s", err)
}

return err
}

func (node *Node) persistPID(name string, pid app2.ProcID) {
pidF := node.pidFile()
pidFName := pidF.Name()
Expand All @@ -483,12 +466,7 @@ func (node *Node) persistPID(name string, pid app2.ProcID) {
func (node *Node) StopApp(appName string) error {
node.logger.Infof("Stopping app %s and closing ports", appName)

app, ok := node.appManager.App(appName)
if !ok {
return ErrUnknownApp
}

if err := app.Stop(); err != nil {
if err := node.procManager.Stop(appName); err != nil {
node.logger.Warn("Failed to stop app: ", err)
return err
}
Expand Down

0 comments on commit fc14d32

Please sign in to comment.