diff --git a/pkg/app2/appserver/app.go b/pkg/app2/appserver/app.go new file mode 100644 index 000000000..76c2b0a3c --- /dev/null +++ b/pkg/app2/appserver/app.go @@ -0,0 +1,70 @@ +package appserver + +import ( + "fmt" + + "github.com/skycoin/skycoin/src/util/logging" + "github.com/skycoin/skywire/pkg/app2" +) + +type App struct { + config app2.Config + log *logging.Logger + proc *app2.Proc + rpcS *Server +} + +func NewApp(log *logging.Logger, c app2.Config, dir string, args []string) (*App, error) { + appKey := app2.GenerateAppKey() + + rpcS, err := New(logging.MustGetLogger(fmt.Sprintf("app_rpc_server_%s", appKey)), c.SockFile, appKey) + if err != nil { + return nil, err + } + + p := app2.NewProc(c, dir, args) + + return &App{ + config: c, + log: log, + proc: p, + rpcS: rpcS, + }, nil +} + +func (a *App) PID() app2.ProcID { + return a.proc.ID() +} + +func (a *App) Run() error { + go func() { + if err := a.rpcS.ListenAndServe(); err != nil { + a.log.WithError(err).Error("error serving RPC") + } + }() + + err := a.proc.Run() + if err != nil { + a.closeRPCServer() + return err + } + + return nil +} + +func (a *App) Stop() error { + a.closeRPCServer() + + return a.proc.Stop() +} + +func (a *App) Wait() error { + + return a.proc.Wait() +} + +func (a *App) closeRPCServer() { + if err := a.rpcS.Close(); err != nil { + a.log.WithError(err).Error("error closing RPC server") + } +} diff --git a/pkg/app2/appserver/app_manager.go b/pkg/app2/appserver/app_manager.go new file mode 100644 index 000000000..ac1e17067 --- /dev/null +++ b/pkg/app2/appserver/app_manager.go @@ -0,0 +1,39 @@ +package appserver + +import ( + "sync" + + "github.com/pkg/errors" +) + +var ( + ErrAppExists = errors.New("app with such pid already exists") +) + +type AppManager struct { + apps map[string]*App + mx sync.RWMutex +} + +func NewAppManager() *AppManager { + return &AppManager{} +} + +func (m *AppManager) Add(a *App) error { + m.mx.Lock() + if _, ok := m.apps[a.config.Name]; ok { + m.mx.Unlock() + return ErrAppExists + } + m.apps[a.config.Name] = a + m.mx.Unlock() + + return nil +} + +func (m *AppManager) App(name string) (*App, bool) { + m.mx.RLock() + a, ok := m.apps[name] + m.mx.RUnlock() + return a, ok +} diff --git a/pkg/app2/appserver/proc_manager.go b/pkg/app2/appserver/proc_manager.go deleted file mode 100644 index f907ce814..000000000 --- a/pkg/app2/appserver/proc_manager.go +++ /dev/null @@ -1,49 +0,0 @@ -package appserver - -import ( - "os" - "os/exec" - "sync" - "syscall" -) - -type ProcManager struct { - processes []*os.Process - mu sync.Mutex -} - -func NewProcManager() *ProcManager { - return &ProcManager{processes: make([]*os.Process, 0)} -} - -func (m *ProcManager) Start(cmd *exec.Cmd) (int, error) { - if err := cmd.Start(); err != nil { - return -1, err - } - m.mu.Lock() - m.processes = append(m.processes, cmd.Process) - m.mu.Unlock() - - return cmd.Process.Pid, nil -} - -func (m *ProcManager) Stop(pid int) (err error) { - m.mu.Lock() - defer m.mu.Unlock() - - for _, process := range m.processes { - if process.Pid != pid { - continue - } - - if sigErr := process.Signal(syscall.SIGKILL); sigErr != nil && err == nil { - err = sigErr - } - } - - return err -} - -func (m *ProcManager) Wait(cmd *exec.Cmd) error { - return cmd.Wait() -} diff --git a/pkg/app2/appserver/server.go b/pkg/app2/appserver/server.go index 92e3baeb8..e56e2a2bf 100644 --- a/pkg/app2/appserver/server.go +++ b/pkg/app2/appserver/server.go @@ -4,6 +4,11 @@ import ( "fmt" "net" "net/rpc" + "sync" + + "github.com/pkg/errors" + + "github.com/skycoin/skywire/pkg/app2" "github.com/skycoin/skycoin/src/util/logging" ) @@ -11,17 +16,28 @@ import ( // Server is a server for app/visor communication. type Server struct { log *logging.Logger + lis net.Listener sockFile string rpcS *rpc.Server + apps map[app2.Key]*App + done sync.WaitGroup + stopCh chan struct{} } // NewServer constructs server. -func New(log *logging.Logger, sockFile string) *Server { +func New(log *logging.Logger, sockFile string, appKey app2.Key) (*Server, error) { + rpcS := rpc.NewServer() + gateway := newRPCGateway(logging.MustGetLogger(fmt.Sprintf("rpc_server_%s", appKey))) + if err := rpcS.RegisterName(string(appKey), gateway); err != nil { + return nil, errors.Wrap(err, "error registering RPC server for app") + } + return &Server{ log: log, sockFile: sockFile, - rpcS: rpc.NewServer(), - } + rpcS: rpcS, + stopCh: make(chan struct{}), + }, nil } // ListenAndServe starts listening for incoming app connections via unix socket. @@ -31,13 +47,30 @@ func (s *Server) ListenAndServe() error { return err } - s.rpcS.Accept(l) + s.lis = l - return nil + for { + conn, err := l.Accept() + if err != nil { + return err + } + + s.done.Add(1) + go s.serveConn(conn) + } } -// AllowApp allows app with the key `appKey` to do RPC calls. -func (s *Server) AllowApp(appKey string) error { - gateway := newRPCGateway(logging.MustGetLogger(fmt.Sprintf("rpc_gateway_%s", appKey))) - return s.rpcS.RegisterName(appKey, gateway) +func (s *Server) Close() error { + err := s.lis.Close() + close(s.stopCh) + return err +} + +func (s *Server) serveConn(conn net.Conn) { + go s.rpcS.ServeConn(conn) + <-s.stopCh + if err := conn.Close(); err != nil { + s.log.WithError(err).Error("error closing conn") + } + s.done.Done() } diff --git a/pkg/app2/client.go b/pkg/app2/client.go index a6bec3c56..4f53d24ea 100644 --- a/pkg/app2/client.go +++ b/pkg/app2/client.go @@ -34,7 +34,7 @@ type Client struct { func NewClient(log *logging.Logger, localPK cipher.PubKey, pid ProcID, sockFile, appKey string) (*Client, error) { rpcCl, err := rpc.Dial("unix", sockFile) if err != nil { - return nil, errors.Wrap(err, "error connectin to the app server") + return nil, errors.Wrap(err, "error connecting to the app server") } return &Client{ diff --git a/pkg/app2/config.go b/pkg/app2/config.go new file mode 100644 index 000000000..d2826bc1a --- /dev/null +++ b/pkg/app2/config.go @@ -0,0 +1,8 @@ +package app2 + +// Config defines configuration parameters for App +type Config struct { + Name string `json:"name"` + Version string `json:"version"` + SockFile string `json:"sock_file"` +} diff --git a/pkg/app2/key.go b/pkg/app2/key.go new file mode 100644 index 000000000..2b6ba6e59 --- /dev/null +++ b/pkg/app2/key.go @@ -0,0 +1,10 @@ +package app2 + +import "github.com/skycoin/dmsg/cipher" + +type Key string + +func GenerateAppKey() Key { + raw, _ := cipher.GenerateKeyPair() + return Key(raw.Hex()) +} diff --git a/pkg/app2/proc.go b/pkg/app2/proc.go new file mode 100644 index 000000000..87f8267c2 --- /dev/null +++ b/pkg/app2/proc.go @@ -0,0 +1,72 @@ +package app2 + +import ( + "fmt" + "os/exec" + "path/filepath" + "sync" +) + +type Proc struct { + id ProcID + cmd *exec.Cmd + mx sync.RWMutex +} + +func NewProc(c Config, dir string, args []string) *Proc { + cmd := cmd(c, dir, args) + + return &Proc{ + cmd: cmd, + } +} + +func (p *Proc) ID() ProcID { + p.mx.RLock() + id := p.id + p.mx.RUnlock() + return id +} + +func (p *Proc) Run() error { + if err := p.cmd.Run(); err != nil { + return err + } + + p.mx.Lock() + p.id = ProcID(p.cmd.Process.Pid) + p.mx.Unlock() + + return nil +} + +func (p *Proc) Stop() error { + return p.cmd.Process.Kill() +} + +func (p *Proc) Wait() error { + return p.cmd.Wait() +} + +func cmd(config Config, dir string, args []string) *exec.Cmd { + binaryPath := getBinaryPath(dir, config.Name, config.Version) + cmd := exec.Command(binaryPath, args...) // nolint:gosec + + const ( + appKeyEnvFormat = "APP_KEY=%s" + sockFileEnvFormat = "SW_UNIX=%s" + ) + + env := make([]string, 0, 2) + env = append(env, fmt.Sprintf(appKeyEnvFormat, config.Key)) + env = append(env, fmt.Sprintf(sockFileEnvFormat, config.SockFile)) + + cmd.Env = env + + return cmd +} + +func getBinaryPath(dir, name, ver string) string { + const binaryNameFormat = "%s.v%s" + return filepath.Join(dir, fmt.Sprintf(binaryNameFormat, name, ver)) +} diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index 208196d26..69c8df96e 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -28,7 +28,6 @@ import ( "github.com/skycoin/dmsg/noise" "github.com/skycoin/skycoin/src/util/logging" - "github.com/skycoin/skywire/pkg/app" "github.com/skycoin/skywire/pkg/routefinder/rfclient" "github.com/skycoin/skywire/pkg/router" "github.com/skycoin/skywire/pkg/routing" @@ -67,17 +66,6 @@ type AppState struct { Status AppStatus `json:"status"` } -type appExecuter interface { - Start(cmd *exec.Cmd) (int, error) - Stop(pid int) error - Wait(cmd *exec.Cmd) error -} - -type appBind struct { - conn net.Conn - pid int -} - // PacketRouter performs routing of the skywire packets. type PacketRouter interface { io.Closer @@ -85,15 +73,16 @@ type PacketRouter interface { SetupIsTrusted(sPK cipher.PubKey) bool } +type appKey string + // Node provides messaging runtime for Apps by setting up all // necessary connections and performing messaging gateway functions. type Node struct { - config *Config - router router.Router - n *snet.Network - tm *transport.Manager - rt routing.Table - executer appExecuter + config *Config + router router.Router + n *snet.Network + tm *transport.Manager + rt routing.Table Logger *logging.MasterLogger logger *logging.Logger @@ -103,7 +92,7 @@ type Node struct { appsConf []AppConfig startedMu sync.RWMutex - startedApps map[string]*appBind + startedApps map[string]appKey startedAt time.Time @@ -112,7 +101,7 @@ type Node struct { rpcListener net.Listener rpcDialers []*noise.RPCClientDialer - appServer *appserver.Server + appManager *appserver.AppManager } // NewNode constructs new Node. @@ -121,8 +110,7 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error) node := &Node{ config: config, - executer: appserver.NewProcManager(), - startedApps: make(map[string]*appBind), + startedApps: make(map[string]appKey), } node.Logger = masterLogger @@ -220,7 +208,7 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error) }) } - node.appServer = appserver.New(logging.MustGetLogger("app_server"), node.config.AppServerSockFile) + node.appManager = appserver.NewAppManager() return node, err } @@ -233,11 +221,6 @@ func (node *Node) Start() error { pathutil.EnsureDir(node.dir()) node.closePreviousApps() - node.logger.Info("Starting app server") - if err := node.appServer.ListenAndServe(); err != nil { - return fmt.Errorf("failed to start app server: %s", err) - } - for _, ac := range node.appsConf { if !ac.AutoStart { continue @@ -353,8 +336,8 @@ func (node *Node) Close() (err error) { } } node.startedMu.Lock() - for a, bind := range node.startedApps { - if err = node.stopApp(a, bind); err != nil { + for a, key := range node.startedApps { + if err = node.stopApp(a, key); err != nil { node.logger.WithError(err).Errorf("(%s) failed to stop app", a) } else { node.logger.Infof("(%s) app stopped successfully", a) @@ -417,36 +400,18 @@ func (node *Node) SpawnApp(config *AppConfig, startCh chan<- struct{}) (err erro node.logger.Infof("Starting %s.v%s", config.App, config.Version) node.logger.Warnf("here: config.Args: %+v, with len %d", config.Args, len(config.Args)) - appKey := node.generateAppKey() - if err := node.appServer.AllowApp(appKey); err != nil { - return fmt.Errorf("failed to start communication server: %s", err) - } - - conn, cmd, err := app.Command( - &app.Config{ProtocolVersion: supportedProtocolVersion, AppName: config.App, AppVersion: config.Version}, - node.appsPath, - append([]string{filepath.Join(node.dir(), config.App)}, config.Args...), - []string{ - fmt.Sprintf("APP_KEY=%s", appKey), - fmt.Sprintf("SW_UNIX=%s", node.config.AppServerSockFile), - }, - ) - if err != nil { - return fmt.Errorf("failed to initialize App server: %s", err) - } - - bind := &appBind{conn, -1} if app, ok := reservedPorts[config.Port]; ok && app != config.App { return fmt.Errorf("can't bind to reserved port %d", config.Port) } - node.startedMu.Lock() - if node.startedApps[config.App] != nil { - node.startedMu.Unlock() + app := appserver.NewApp(logging.MustGetLogger(fmt.Sprintf("app_%s", config.App))) + + if err := node.appManager.Add() + if node.appManager.Exists(config.App) { return fmt.Errorf("app %s is already started", config.App) } - node.startedApps[config.App] = bind + node.startedApps[config.App] = appKey node.startedMu.Unlock() // TODO: make PackageLogger return *RuleEntry. FieldLogger doesn't expose Writer. @@ -472,9 +437,10 @@ func (node *Node) SpawnApp(config *AppConfig, startCh chan<- struct{}) (err erro return } - node.startedMu.Lock() + // TODO: control pid the other way + /*node.startedMu.Lock() bind.pid = pid - node.startedMu.Unlock() + node.startedMu.Unlock()*/ node.pidMu.Lock() node.logger.Infof("storing app %s pid %d", config.App, pid) @@ -514,14 +480,14 @@ func (node *Node) persistPID(name string, pid int) { // StopApp stops running App. func (node *Node) StopApp(appName string) error { node.startedMu.Lock() - bind := node.startedApps[appName] + appKey, ok := node.startedApps[appName] node.startedMu.Unlock() - if bind == nil { + if !ok { return ErrUnknownApp } - return node.stopApp(appName, bind) + return node.stopApp(appName, appKey) } // SetAutoStart sets an app to auto start or not. @@ -535,23 +501,14 @@ func (node *Node) SetAutoStart(appName string, autoStart bool) error { return ErrUnknownApp } -func (node *Node) stopApp(app string, bind *appBind) (err error) { +func (node *Node) stopApp(app string, key appKey) (err error) { node.logger.Infof("Stopping app %s and closing ports", app) - if excErr := node.executer.Stop(bind.pid); excErr != nil { + // TODO: stop app, move this func + /*if excErr := node.executer.Stop(bind.pid); excErr != nil { node.logger.Warn("Failed to stop app: ", excErr) err = excErr - } - - if srvErr := bind.conn.Close(); srvErr != nil && err == nil { - node.logger.Warnf("Failed to close App conn: %s", srvErr) - err = srvErr - } + }*/ return err } - -func (node *Node) generateAppKey() string { - raw, _ := cipher.GenerateKeyPair() - return raw.Hex() -}