Skip to content

Commit

Permalink
Implement visor restart from hypervisor
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Dec 17, 2019
1 parent f8c40e7 commit 95425b7
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 19 deletions.
11 changes: 10 additions & 1 deletion cmd/skywire-visor/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/spf13/cobra"

"github.com/SkycoinProject/skywire-mainnet/internal/utclient"
"github.com/SkycoinProject/skywire-mainnet/pkg/restart"
"github.com/SkycoinProject/skywire-mainnet/pkg/util/pathutil"
"github.com/SkycoinProject/skywire-mainnet/pkg/visor"
)
Expand All @@ -46,6 +47,7 @@ type runCfg struct {
masterLogger *logging.MasterLogger
conf visor.Config
node *visor.Node
restartCtx *restart.Context
}

var cfg *runCfg
Expand Down Expand Up @@ -73,6 +75,13 @@ func init() {
rootCmd.Flags().BoolVarP(&cfg.cfgFromStdin, "stdin", "i", false, "read config from STDIN")
rootCmd.Flags().StringVarP(&cfg.profileMode, "profile", "p", "none", "enable profiling with pprof. Mode: none or one of: [cpu, mem, mutex, block, trace, http]")
rootCmd.Flags().StringVarP(&cfg.port, "port", "", "6060", "port for http-mode of pprof")

restartCtx, err := restart.CaptureContext()
if err != nil {
log.Printf("Failed to capture context: %v", err)
} else {
cfg.restartCtx = restartCtx
}
}

// Execute executes root CLI command.
Expand Down Expand Up @@ -148,7 +157,7 @@ func (cfg *runCfg) readConfig() *runCfg {
}

func (cfg *runCfg) runNode() *runCfg {
node, err := visor.NewNode(&cfg.conf, cfg.masterLogger)
node, err := visor.NewNode(&cfg.conf, cfg.masterLogger, cfg.restartCtx)
if err != nil {
cfg.logger.Fatal("Failed to initialize node: ", err)
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/hypervisor/hypervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (m *Node) ServeHTTP(w http.ResponseWriter, req *http.Request) {
r.Put("/nodes/{pk}/routes/{rid}", m.putRoute())
r.Delete("/nodes/{pk}/routes/{rid}", m.deleteRoute())
r.Get("/nodes/{pk}/loops", m.getLoops())
r.Get("/nodes/{pk}/restart", m.restart())
})
})
r.ServeHTTP(w, req)
Expand Down Expand Up @@ -569,6 +570,18 @@ func (m *Node) getLoops() http.HandlerFunc {
})
}

// NOTE: Reply comes with a delay, because of check if new executable is started successfully.
func (m *Node) restart() http.HandlerFunc {
return m.withCtx(m.nodeCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) {
if err := ctx.RPC.Restart(); err != nil {
httputil.WriteJSON(w, r, http.StatusInternalServerError, err)
return
}

httputil.WriteJSON(w, r, http.StatusOK, true)
})
}

/*
<<< Helper functions >>>
*/
Expand Down
139 changes: 139 additions & 0 deletions pkg/restart/restart.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package restart

import (
"errors"
"log"
"os"
"os/exec"
"path/filepath"
"time"

"github.com/SkycoinProject/skycoin/src/util/logging"
)

var (
// ErrMalformedArgs is returned when executable args are malformed.
ErrMalformedArgs = errors.New("malformed args")
)

const defaultCheckDelay = 5 * time.Second

// Context describes data required for restarting visor.
type Context struct {
log *logging.Logger
checkDelay time.Duration
workingDirectory string
args []string
}

// CaptureContext captures data required for restarting visor.
// Data used by CaptureContext must not be modified before,
// therefore calling CaptureContext immediately after starting executable is recommended.
func CaptureContext() (*Context, error) {
wd, err := os.Getwd()
if err != nil {
return nil, err
}

args := os.Args

context := &Context{
checkDelay: defaultCheckDelay,
workingDirectory: wd,
args: args,
}

return context, nil
}

// RegisterLogger registers a logger instead of standard one.
func (c *Context) RegisterLogger(logger *logging.Logger) {
c.log = logger
}

// SetCheckDelay sets a check delay instead of standard one.
func (c *Context) SetCheckDelay(delay time.Duration) {
c.checkDelay = delay
}

// Restart restarts executable using Context.
// Should not be called from a goroutine.
func (c *Context) Restart() error {
if len(c.args) == 0 {
return ErrMalformedArgs
}

executableRelPath := c.args[0]
executableAbsPath := filepath.Join(c.workingDirectory, executableRelPath)

c.infoLogger()("Starting new instance of executable (path: %q)", executableAbsPath)

errCh := c.start(executableAbsPath)

ticker := time.NewTicker(c.checkDelay)
defer ticker.Stop()

select {
case err := <-errCh:
c.errorLogger()("Failed to start new instance: %v", err)
return err
case <-ticker.C:
c.infoLogger()("New instance started successfully, exiting")
os.Exit(0)

// unreachable
return nil
}
}

func (c *Context) start(path string) chan error {
errCh := make(chan error, 1)

go func(path string) {
normalizedPath, err := exec.LookPath(path)
if err != nil {
errCh <- err
return
}

if len(c.args) == 0 {
errCh <- ErrMalformedArgs
return
}

args := c.args[1:]
cmd := exec.Command(normalizedPath, args...)

if err := cmd.Start(); err != nil {
errCh <- err
return
}

if err := cmd.Wait(); err != nil {
errCh <- err
return
}
}(path)

return errCh
}

func (c *Context) infoLogger() func(string, ...interface{}) {
if c.log != nil {
return c.log.Infof
}

logger := log.New(os.Stdout, "[INFO] ", log.LstdFlags)

return logger.Printf
}

func (c *Context) errorLogger() func(string, ...interface{}) {
if c.log != nil {
return c.log.Errorf
}

logger := log.New(os.Stdout, "[ERROR] ", log.LstdFlags)

return logger.Printf
}
2 changes: 1 addition & 1 deletion pkg/router/routerclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const rpcName = "RPCGateway"

// Client is an RPC client for router.
type Client struct {
tr *dmsg.Transport
tr *dmsg.Stream
rpc *rpc.Client
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/visor/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ var (

// ErrNotFound is returned when a requested resource is not found.
ErrNotFound = errors.New("not found")

// ErrMalformedRestartContext is returned when restart context is malformed.
ErrMalformedRestartContext = errors.New("restart context is malformed")
)

// RPC defines RPC methods for Node.
Expand Down Expand Up @@ -390,3 +393,16 @@ func (r *RPC) Loops(_ *struct{}, out *[]LoopInfo) error {
*out = loops
return nil
}

/*
<<< VISOR MANAGEMENT >>>
*/

// Restart restarts visor.
func (r *RPC) Restart(_ *struct{}, _ *struct{}) error {
if r.node.restartCtx == nil {
return ErrMalformedRestartContext
}

return r.node.restartCtx.Restart()
}
17 changes: 14 additions & 3 deletions pkg/visor/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (
"sync"
"time"

"github.com/SkycoinProject/skywire-mainnet/pkg/app2"
"github.com/SkycoinProject/skywire-mainnet/pkg/router"

"github.com/SkycoinProject/dmsg/cipher"
"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/google/uuid"

"github.com/SkycoinProject/skywire-mainnet/pkg/app2"
"github.com/SkycoinProject/skywire-mainnet/pkg/router"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
"github.com/SkycoinProject/skywire-mainnet/pkg/transport"
)
Expand Down Expand Up @@ -49,6 +48,8 @@ type RPCClient interface {
RemoveRoutingRule(key routing.RouteID) error

Loops() ([]LoopInfo, error)

Restart() error
}

// RPCClient provides methods to call an RPC Server.
Expand Down Expand Up @@ -221,6 +222,11 @@ func (rc *rpcClient) Loops() ([]LoopInfo, error) {
return loops, err
}

// Restart calls Restart.
func (rc *rpcClient) Restart() error {
return rc.Call("Restart", &struct{}{}, &struct{}{})
}

// MockRPCClient mocks RPCClient.
type mockRPCClient struct {
startedAt time.Time
Expand Down Expand Up @@ -528,3 +534,8 @@ func (mc *mockRPCClient) Loops() ([]LoopInfo, error) {

return loops, nil
}

// Restart implements RPCClient.
func (mc *mockRPCClient) Restart() error {
return nil
}
22 changes: 8 additions & 14 deletions pkg/visor/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"net/rpc"
"os"
Expand All @@ -20,14 +19,14 @@ import (
"time"

"github.com/SkycoinProject/dmsg"
"github.com/SkycoinProject/dmsg/cipher"
"github.com/SkycoinProject/dmsg/noise"
"github.com/SkycoinProject/skycoin/src/util/logging"

"github.com/SkycoinProject/skywire-mainnet/pkg/app2/appcommon"
"github.com/SkycoinProject/skywire-mainnet/pkg/app2/appnet"
"github.com/SkycoinProject/skywire-mainnet/pkg/app2/appserver"
"github.com/SkycoinProject/skywire-mainnet/pkg/dmsgpty"
"github.com/SkycoinProject/skywire-mainnet/pkg/restart"
"github.com/SkycoinProject/skywire-mainnet/pkg/routefinder/rfclient"
"github.com/SkycoinProject/skywire-mainnet/pkg/router"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
Expand Down Expand Up @@ -67,18 +66,11 @@ type AppState struct {
Status AppStatus `json:"status"`
}

// PacketRouter performs routing of the skywire packets.
type PacketRouter interface {
io.Closer
Serve(ctx context.Context) error
SetupIsTrusted(sPK cipher.PubKey) bool
}

// Node provides messaging runtime for Apps by setting up all
// necessary connections and performing messaging gateway functions.
type Node struct {
conf *Config
router PacketRouter
router router.Router
n *snet.Network
tm *transport.Manager
rt routing.Table
Expand All @@ -91,7 +83,8 @@ type Node struct {
localPath string
appsConf []AppConfig

startedAt time.Time
startedAt time.Time
restartCtx *restart.Context

pidMu sync.Mutex

Expand All @@ -102,12 +95,13 @@ type Node struct {
}

// NewNode constructs new Node.
func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error) {
func NewNode(config *Config, masterLogger *logging.MasterLogger, restartCtx *restart.Context) (*Node, error) {
ctx := context.Background()

node := &Node{
conf: config,
procManager: appserver.NewProcManager(logging.MustGetLogger("proc_manager")),
restartCtx: restartCtx,
}

node.Logger = masterLogger
Expand Down Expand Up @@ -430,8 +424,8 @@ func (node *Node) SpawnApp(config *AppConfig, startCh chan<- struct{}) (err erro
appCfg := appcommon.Config{
Name: config.App,
Version: config.Version,
SockFile: node.config.AppServerSockFile,
VisorPK: node.config.Node.StaticPubKey.Hex(),
SockFile: node.conf.AppServerSockFile,
VisorPK: node.conf.Node.StaticPubKey.Hex(),
BinaryDir: node.appsPath,
WorkDir: filepath.Join(node.localPath, config.App, fmt.Sprintf("v%s", config.Version)),
}
Expand Down

0 comments on commit 95425b7

Please sign in to comment.