Skip to content

Commit

Permalink
Add basic proc management infrastructure
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Oct 13, 2019
1 parent 1fe21cb commit a395d28
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 130 deletions.
70 changes: 70 additions & 0 deletions pkg/app2/appserver/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package appserver

import (
"fmt"

"github.com/skycoin/skycoin/src/util/logging"
"github.com/skycoin/skywire/pkg/app2"
)

type App struct {
config app2.Config
log *logging.Logger
proc *app2.Proc
rpcS *Server
}

func NewApp(log *logging.Logger, c app2.Config, dir string, args []string) (*App, error) {
appKey := app2.GenerateAppKey()

rpcS, err := New(logging.MustGetLogger(fmt.Sprintf("app_rpc_server_%s", appKey)), c.SockFile, appKey)
if err != nil {
return nil, err
}

p := app2.NewProc(c, dir, args)

return &App{
config: c,
log: log,
proc: p,
rpcS: rpcS,
}, nil
}

func (a *App) PID() app2.ProcID {
return a.proc.ID()
}

func (a *App) Run() error {
go func() {
if err := a.rpcS.ListenAndServe(); err != nil {
a.log.WithError(err).Error("error serving RPC")
}
}()

err := a.proc.Run()
if err != nil {
a.closeRPCServer()
return err
}

return nil
}

func (a *App) Stop() error {
a.closeRPCServer()

return a.proc.Stop()
}

func (a *App) Wait() error {

return a.proc.Wait()
}

func (a *App) closeRPCServer() {
if err := a.rpcS.Close(); err != nil {
a.log.WithError(err).Error("error closing RPC server")
}
}
39 changes: 39 additions & 0 deletions pkg/app2/appserver/app_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package appserver

import (
"sync"

"github.com/pkg/errors"
)

var (
ErrAppExists = errors.New("app with such pid already exists")
)

type AppManager struct {
apps map[string]*App
mx sync.RWMutex
}

func NewAppManager() *AppManager {
return &AppManager{}
}

func (m *AppManager) Add(a *App) error {
m.mx.Lock()
if _, ok := m.apps[a.config.Name]; ok {
m.mx.Unlock()
return ErrAppExists
}
m.apps[a.config.Name] = a
m.mx.Unlock()

return nil
}

func (m *AppManager) App(name string) (*App, bool) {
m.mx.RLock()
a, ok := m.apps[name]
m.mx.RUnlock()
return a, ok
}
49 changes: 0 additions & 49 deletions pkg/app2/appserver/proc_manager.go

This file was deleted.

51 changes: 42 additions & 9 deletions pkg/app2/appserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,40 @@ import (
"fmt"
"net"
"net/rpc"
"sync"

"github.com/pkg/errors"

"github.com/skycoin/skywire/pkg/app2"

"github.com/skycoin/skycoin/src/util/logging"
)

// Server is a server for app/visor communication.
type Server struct {
log *logging.Logger
lis net.Listener
sockFile string
rpcS *rpc.Server
apps map[app2.Key]*App
done sync.WaitGroup
stopCh chan struct{}
}

// NewServer constructs server.
func New(log *logging.Logger, sockFile string) *Server {
func New(log *logging.Logger, sockFile string, appKey app2.Key) (*Server, error) {
rpcS := rpc.NewServer()
gateway := newRPCGateway(logging.MustGetLogger(fmt.Sprintf("rpc_server_%s", appKey)))
if err := rpcS.RegisterName(string(appKey), gateway); err != nil {
return nil, errors.Wrap(err, "error registering RPC server for app")
}

return &Server{
log: log,
sockFile: sockFile,
rpcS: rpc.NewServer(),
}
rpcS: rpcS,
stopCh: make(chan struct{}),
}, nil
}

// ListenAndServe starts listening for incoming app connections via unix socket.
Expand All @@ -31,13 +47,30 @@ func (s *Server) ListenAndServe() error {
return err
}

s.rpcS.Accept(l)
s.lis = l

return nil
for {
conn, err := l.Accept()
if err != nil {
return err
}

s.done.Add(1)
go s.serveConn(conn)
}
}

// AllowApp allows app with the key `appKey` to do RPC calls.
func (s *Server) AllowApp(appKey string) error {
gateway := newRPCGateway(logging.MustGetLogger(fmt.Sprintf("rpc_gateway_%s", appKey)))
return s.rpcS.RegisterName(appKey, gateway)
func (s *Server) Close() error {
err := s.lis.Close()
close(s.stopCh)
return err
}

func (s *Server) serveConn(conn net.Conn) {
go s.rpcS.ServeConn(conn)
<-s.stopCh
if err := conn.Close(); err != nil {
s.log.WithError(err).Error("error closing conn")
}
s.done.Done()
}
2 changes: 1 addition & 1 deletion pkg/app2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Client struct {
func NewClient(log *logging.Logger, localPK cipher.PubKey, pid ProcID, sockFile, appKey string) (*Client, error) {
rpcCl, err := rpc.Dial("unix", sockFile)
if err != nil {
return nil, errors.Wrap(err, "error connectin to the app server")
return nil, errors.Wrap(err, "error connecting to the app server")
}

return &Client{
Expand Down
8 changes: 8 additions & 0 deletions pkg/app2/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package app2

// Config defines configuration parameters for App
type Config struct {
Name string `json:"name"`
Version string `json:"version"`
SockFile string `json:"sock_file"`
}
10 changes: 10 additions & 0 deletions pkg/app2/key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package app2

import "github.com/skycoin/dmsg/cipher"

type Key string

func GenerateAppKey() Key {
raw, _ := cipher.GenerateKeyPair()
return Key(raw.Hex())
}
72 changes: 72 additions & 0 deletions pkg/app2/proc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package app2

import (
"fmt"
"os/exec"
"path/filepath"
"sync"
)

type Proc struct {
id ProcID
cmd *exec.Cmd
mx sync.RWMutex
}

func NewProc(c Config, dir string, args []string) *Proc {
cmd := cmd(c, dir, args)

return &Proc{
cmd: cmd,
}
}

func (p *Proc) ID() ProcID {
p.mx.RLock()
id := p.id
p.mx.RUnlock()
return id
}

func (p *Proc) Run() error {
if err := p.cmd.Run(); err != nil {
return err
}

p.mx.Lock()
p.id = ProcID(p.cmd.Process.Pid)
p.mx.Unlock()

return nil
}

func (p *Proc) Stop() error {
return p.cmd.Process.Kill()
}

func (p *Proc) Wait() error {
return p.cmd.Wait()
}

func cmd(config Config, dir string, args []string) *exec.Cmd {
binaryPath := getBinaryPath(dir, config.Name, config.Version)
cmd := exec.Command(binaryPath, args...) // nolint:gosec

const (
appKeyEnvFormat = "APP_KEY=%s"
sockFileEnvFormat = "SW_UNIX=%s"
)

env := make([]string, 0, 2)
env = append(env, fmt.Sprintf(appKeyEnvFormat, config.Key))
env = append(env, fmt.Sprintf(sockFileEnvFormat, config.SockFile))

cmd.Env = env

return cmd
}

func getBinaryPath(dir, name, ver string) string {
const binaryNameFormat = "%s.v%s"
return filepath.Join(dir, fmt.Sprintf(binaryNameFormat, name, ver))
}
Loading

0 comments on commit a395d28

Please sign in to comment.