Skip to content

Commit

Permalink
Integrate app2 with skywire visor. Probably
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Oct 2, 2019
1 parent 5c83168 commit 292d65b
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 92 deletions.
2 changes: 2 additions & 0 deletions cmd/skywire-cli/commands/node/gen-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,7 @@ func defaultConfig() *visor.Config {

conf.Interfaces.RPCAddress = "localhost:3435"

conf.AppServerSockFile = "app_server.sock"

return conf
}
3 changes: 2 additions & 1 deletion pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type App struct {

// Command setups pipe connection and returns *exec.Cmd for an App
// with initialized connection.
func Command(config *Config, appsPath string, args []string) (net.Conn, *exec.Cmd, error) {
func Command(config *Config, appsPath string, args []string, env []string) (net.Conn, *exec.Cmd, error) {
srvConn, clientConn, err := OpenPipeConn()
if err != nil {
return nil, nil, fmt.Errorf("failed to open piped connection: %s", err)
Expand All @@ -62,6 +62,7 @@ func Command(config *Config, appsPath string, args []string) (net.Conn, *exec.Cm
binaryPath := filepath.Join(appsPath, fmt.Sprintf("%s.v%s", config.AppName, config.AppVersion))
cmd := exec.Command(binaryPath, args...) // nolint:gosec
cmd.ExtraFiles = []*os.File{clientConn.inFile, clientConn.outFile}
cmd.Env = env

return srvConn, cmd, nil
}
Expand Down
75 changes: 0 additions & 75 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,46 +353,6 @@ func (r *Router) GetRule(routeID routing.RouteID) (routing.Rule, error) {
return rule, nil
}

// ServeApp handles App packets from the App connection on provided port.
func (r *Router) ServeApp(conn net.Conn, port routing.Port, appConf *app.Config) error {
r.wg.Add(1)
defer r.wg.Done()

appProto := app.NewProtocol(conn)
if err := r.pm.Open(port, appProto); err != nil {
return err
}

r.mx.Lock()
r.staticPorts[port] = struct{}{}
r.mx.Unlock()

callbacks := &appCallbacks{
CreateLoop: r.requestLoop,
CloseLoop: r.closeLoop,
Forward: r.forwardAppPacket,
}
am := &appManager{r.Logger, appProto, appConf, callbacks}
err := am.Serve()

for _, port := range r.pm.AppPorts(appProto) {
for _, addr := range r.pm.Close(port) {
if err := r.closeLoop(context.TODO(), appProto, routing.Loop{Local: routing.Addr{Port: port}, Remote: addr}); err != nil {
log.WithError(err).Warn("Failed to close loop")
}
}
}

r.mx.Lock()
delete(r.staticPorts, port)
r.mx.Unlock()

if err == io.EOF {
return nil
}
return err
}

// Close safely stops Router.
func (r *Router) Close() error {
if r == nil {
Expand Down Expand Up @@ -446,41 +406,6 @@ func (r *Router) consumePacket(payload []byte, rule routing.Rule) error {
return nil
}

func (r *Router) forwardAppPacket(ctx context.Context, appConn *app.Protocol, packet *app.Packet) error {
if packet.Loop.Remote.PubKey == r.conf.PubKey {
return r.forwardLocalAppPacket(packet)
}

l, err := r.pm.GetLoop(packet.Loop.Local.Port, packet.Loop.Remote)
if err != nil {
return err
}

tr := r.tm.Transport(l.trID)
if tr == nil {
return errors.New("unknown transport")
}

r.Logger.Infof("Forwarded App packet from LocalPort %d using route ID %d", packet.Loop.Local.Port, l.routeID)
return tr.WritePacket(ctx, l.routeID, packet.Payload)
}

func (r *Router) forwardLocalAppPacket(packet *app.Packet) error {
b, err := r.pm.Get(packet.Loop.Remote.Port)
if err != nil {
return nil
}

p := &app.Packet{
Loop: routing.Loop{
Local: routing.Addr{Port: packet.Loop.Remote.Port},
Remote: routing.Addr{PubKey: packet.Loop.Remote.PubKey, Port: packet.Loop.Local.Port},
},
Payload: packet.Payload,
}
return b.conn.Send(app.FrameSend, p, nil)
}

func (r *Router) requestLoop(ctx context.Context, appConn *app.Protocol, raddr routing.Addr) (routing.Addr, error) {
lport := r.pm.Alloc(appConn)
if err := r.pm.SetLoop(lport, raddr, &loop{}); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/visor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type Config struct {
ShutdownTimeout Duration `json:"shutdown_timeout"` // time value, examples: 10s, 1m, etc

Interfaces InterfaceConfig `json:"interfaces"`

AppServerSockFile string `json:"app_server_sock_file"`
}

// MessagingConfig returns config for dmsg client.
Expand Down
46 changes: 30 additions & 16 deletions pkg/visor/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"syscall"
"time"

"github.com/skycoin/skywire/pkg/app2"

"github.com/skycoin/skywire/pkg/snet"

"github.com/skycoin/dmsg"
Expand Down Expand Up @@ -80,7 +82,6 @@ type appBind struct {
type PacketRouter interface {
io.Closer
Serve(ctx context.Context) error
ServeApp(conn net.Conn, port routing.Port, appConf *app.Config) error
SetupIsTrusted(sPK cipher.PubKey) bool
}

Expand Down Expand Up @@ -110,6 +111,8 @@ type Node struct {

rpcListener net.Listener
rpcDialers []*noise.RPCClientDialer

appServer *app2.Server
}

// NewNode constructs new Node.
Expand Down Expand Up @@ -217,6 +220,8 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error)
})
}

node.appServer = app2.NewServer(logging.MustGetLogger("app_server"), node.config.AppServerSockFile)

return node, err
}

Expand All @@ -227,6 +232,12 @@ func (node *Node) Start() error {

pathutil.EnsureDir(node.dir())
node.closePreviousApps()

node.logger.Info("Starting app server")
if err := node.appServer.ListenAndServe(); err != nil {
return fmt.Errorf("failed to start app server: %s", err)
}

for _, ac := range node.appsConf {
if !ac.AutoStart {
continue
Expand Down Expand Up @@ -405,10 +416,20 @@ func (node *Node) StartApp(appName string) error {
func (node *Node) SpawnApp(config *AppConfig, startCh chan<- struct{}) (err error) {
node.logger.Infof("Starting %s.v%s", config.App, config.Version)
node.logger.Warnf("here: config.Args: %+v, with len %d", config.Args, len(config.Args))

appKey := node.generateAppKey()
if err := node.appServer.AllowApp(appKey); err != nil {
return fmt.Errorf("failed to start communication server: %s", err)
}

conn, cmd, err := app.Command(
&app.Config{ProtocolVersion: supportedProtocolVersion, AppName: config.App, AppVersion: config.Version},
node.appsPath,
append([]string{filepath.Join(node.dir(), config.App)}, config.Args...),
[]string{
fmt.Sprintf("APP_KEY=%s", appKey),
fmt.Sprintf("SW_UNIX=%s", node.config.AppServerSockFile),
},
)
if err != nil {
return fmt.Errorf("failed to initialize App server: %s", err)
Expand Down Expand Up @@ -462,26 +483,14 @@ func (node *Node) SpawnApp(config *AppConfig, startCh chan<- struct{}) (err erro
appCh <- node.executer.Wait(cmd)
}()

srvCh := make(chan error)
go func() {
srvCh <- node.router.ServeApp(conn, config.Port, &app.Config{AppName: config.App, AppVersion: config.Version})
}()

if startCh != nil {
startCh <- struct{}{}
}

var appErr error
select {
case err := <-appCh:
if err != nil {
if _, ok := err.(*exec.ExitError); !ok {
appErr = fmt.Errorf("failed to run app executable: %s", err)
}
}
case err := <-srvCh:
if err != nil {
appErr = fmt.Errorf("failed to start communication server: %s", err)
if err := <-appCh; err != nil {
if _, ok := err.(*exec.ExitError); !ok {
appErr = fmt.Errorf("failed to run app executable: %s", err)
}
}

Expand Down Expand Up @@ -541,3 +550,8 @@ func (node *Node) stopApp(app string, bind *appBind) (err error) {

return err
}

func (node *Node) generateAppKey() string {
raw, _ := cipher.GenerateKeyPair()
return raw.Hex()
}

0 comments on commit 292d65b

Please sign in to comment.