diff --git a/cmd/skywire-visor/commands/root.go b/cmd/skywire-visor/commands/root.go index ed47be729..aa4034fa1 100644 --- a/cmd/skywire-visor/commands/root.go +++ b/cmd/skywire-visor/commands/root.go @@ -42,6 +42,7 @@ type runCfg struct { port string startDelay string args []string + configPath *string profileStop func() logger *logging.Logger @@ -90,6 +91,7 @@ func Execute() { func (cfg *runCfg) startProfiler() *runCfg { var option func(*profile.Profile) + switch cfg.profileMode { case "none": cfg.profileStop = func() {} @@ -98,7 +100,9 @@ func (cfg *runCfg) startProfiler() *runCfg { go func() { log.Println(http.ListenAndServe(fmt.Sprintf("localhost:%v", cfg.port), nil)) }() + cfg.profileStop = func() {} + return cfg case "cpu": option = profile.CPUProfile @@ -111,7 +115,9 @@ func (cfg *runCfg) startProfiler() *runCfg { case "trace": option = profile.TraceProfile } + cfg.profileStop = profile.Start(profile.ProfilePath("./logs/"+cfg.tag), option).Stop + return cfg } @@ -128,18 +134,32 @@ func (cfg *runCfg) startLogger() *runCfg { cfg.masterLogger.Out = ioutil.Discard } } + return cfg } func (cfg *runCfg) readConfig() *runCfg { var rdr io.Reader - var err error + if !cfg.cfgFromStdin { configPath := pathutil.FindConfigPath(cfg.args, 0, configEnv, pathutil.NodeDefaults()) - rdr, err = os.Open(filepath.Clean(configPath)) + configPath = filepath.Clean(configPath) + + file, err := os.Open(configPath) if err != nil { cfg.logger.Fatalf("Failed to open config: %s", err) } + + defer func() { + if err := file.Close(); err != nil { + cfg.logger.Warnf("Failed to close config file: %v", err) + } + }() + + cfg.logger.Info("Reading config from %v", configPath) + + rdr = file + cfg.configPath = &configPath } else { cfg.logger.Info("Reading config from STDIN") rdr = bufio.NewReader(os.Stdin) @@ -149,7 +169,9 @@ func (cfg *runCfg) readConfig() *runCfg { if err := json.NewDecoder(rdr).Decode(&cfg.conf); err != nil { cfg.logger.Fatalf("Failed to decode %s: %s", rdr, err) } + fmt.Println("TCP Factory conf:", cfg.conf.STCP) + return cfg } @@ -157,6 +179,7 @@ func (cfg *runCfg) runNode() *runCfg { startDelay, err := time.ParseDuration(cfg.startDelay) if err != nil { cfg.logger.Warnf("Using no visor start delay due to parsing failure: %v", err) + startDelay = time.Duration(0) } @@ -166,7 +189,7 @@ func (cfg *runCfg) runNode() *runCfg { time.Sleep(startDelay) - node, err := visor.NewNode(&cfg.conf, cfg.masterLogger, cfg.restartCtx) + node, err := visor.NewNode(&cfg.conf, cfg.masterLogger, cfg.restartCtx, cfg.configPath) if err != nil { cfg.logger.Fatal("Failed to initialize node: ", err) } @@ -207,11 +230,13 @@ func (cfg *runCfg) runNode() *runCfg { func (cfg *runCfg) stopNode() *runCfg { defer cfg.profileStop() + if err := cfg.node.Close(); err != nil { if !strings.Contains(err.Error(), "closed") { cfg.logger.Fatal("Failed to close node: ", err) } } + return cfg } @@ -219,6 +244,7 @@ func (cfg *runCfg) waitOsSignals() *runCfg { ch := make(chan os.Signal, 2) signal.Notify(ch, []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT}...) <-ch + go func() { select { case <-time.After(time.Duration(cfg.conf.ShutdownTimeout)): @@ -227,5 +253,6 @@ func (cfg *runCfg) waitOsSignals() *runCfg { cfg.logger.Fatalf("Received signal %s: terminating", s) } }() + return cfg } diff --git a/pkg/visor/config_test.go b/pkg/visor/config_test.go index e0363a8e8..caf00744c 100644 --- a/pkg/visor/config_test.go +++ b/pkg/visor/config_test.go @@ -41,6 +41,7 @@ func TestTransportDiscovery(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.NoError(t, json.NewEncoder(w).Encode(&httpauth.NextNonceResponse{Edge: pk, NextNonce: 1})) })) + defer srv.Close() conf := Config{} @@ -54,6 +55,7 @@ func TestTransportDiscovery(t *testing.T) { func TestTransportLogStore(t *testing.T) { dir := filepath.Join(os.TempDir(), "foo") + defer func() { require.NoError(t, os.RemoveAll(dir)) }() diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index 7ddd95834..da3f7655d 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -279,6 +279,7 @@ type AddTransportIn struct { // AddTransport creates a transport for the node. func (r *RPC) AddTransport(in *AddTransportIn, out *TransportSummary) error { ctx := context.Background() + if in.Timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, time.Second*20) @@ -289,6 +290,7 @@ func (r *RPC) AddTransport(in *AddTransportIn, out *TransportSummary) error { if err != nil { return err } + *out = *newTransportSummary(r.node.tm, tp, false, r.node.router.SetupIsTrusted(tp.Remote())) return nil } @@ -309,10 +311,12 @@ func (r *RPC) DiscoverTransportsByPK(pk *cipher.PubKey, out *[]*transport.EntryW if err != nil { return err } + entries, err := tpD.GetTransportsByEdge(context.Background(), *pk) if err != nil { return err } + *out = entries return nil } @@ -323,10 +327,12 @@ func (r *RPC) DiscoverTransportByID(id *uuid.UUID, out *transport.EntryWithStatu if err != nil { return err } + entry, err := tpD.GetTransportByID(context.Background(), *id) if err != nil { return err } + *out = *entry return nil } @@ -385,6 +391,7 @@ func (r *RPC) Loops(_ *struct{}, out *[]LoopInfo) error { if err != nil { return err } + loops = append(loops, LoopInfo{ ConsumeRule: rule, FwdRule: rule, diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 92882bacb..c90c49e46 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -531,6 +531,7 @@ func (mc *mockRPCClient) RemoveRoutingRule(key routing.RouteID) error { // Loops implements RPCClient. func (mc *mockRPCClient) Loops() ([]LoopInfo, error) { var loops []LoopInfo + rules := mc.rt.AllRules() for _, rule := range rules { if rule.Type() != routing.RuleConsume { diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index 114c26c92..3b0d0868d 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -4,8 +4,10 @@ package visor import ( "bufio" "context" + "encoding/json" "errors" "fmt" + "io/ioutil" "net" "net/rpc" "os" @@ -48,8 +50,12 @@ const ( AppStatusRunning ) -// ErrUnknownApp represents lookup error for App related calls. -var ErrUnknownApp = errors.New("unknown app") +var ( + // ErrUnknownApp represents lookup error for App related calls. + ErrUnknownApp = errors.New("unknown app") + // ErrNoConfigPath is returned on attempt to read/write config when node contains no config path. + ErrNoConfigPath = errors.New("no config path") +) // Version is the node version. const Version = "0.0.1" @@ -79,6 +85,7 @@ type Node struct { Logger *logging.MasterLogger logger *logging.Logger + confPath *string appsPath string localPath string appsConf map[string]AppConfig @@ -95,18 +102,19 @@ type Node struct { } // NewNode constructs new Node. -func NewNode(config *Config, masterLogger *logging.MasterLogger, restartCtx *restart.Context) (*Node, error) { +func NewNode(cfg *Config, logger *logging.MasterLogger, restartCtx *restart.Context, cfgPath *string) (*Node, error) { ctx := context.Background() node := &Node{ - conf: config, + conf: cfg, + confPath: cfgPath, procManager: appserver.NewProcManager(logging.MustGetLogger("proc_manager")), } - node.Logger = masterLogger + node.Logger = logger node.logger = node.Logger.PackageLogger("skywire") - restartCheckDelay, err := time.ParseDuration(config.RestartCheckDelay) + restartCheckDelay, err := time.ParseDuration(cfg.RestartCheckDelay) if err == nil { restartCtx.SetCheckDelay(restartCheckDelay) } @@ -115,44 +123,45 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger, restartCtx *res node.restartCtx = restartCtx - pk := config.Node.StaticPubKey - sk := config.Node.StaticSecKey + pk := cfg.Node.StaticPubKey + sk := cfg.Node.StaticSecKey - fmt.Println("min servers:", config.Messaging.ServerCount) + fmt.Println("min servers:", cfg.Messaging.ServerCount) node.n = snet.New(snet.Config{ PubKey: pk, SecKey: sk, TpNetworks: []string{dmsg.Type, snet.STcpType}, // TODO: Have some way to configure this. - DmsgDiscAddr: config.Messaging.Discovery, - DmsgMinSrvs: config.Messaging.ServerCount, - STCPLocalAddr: config.STCP.LocalAddr, - STCPTable: config.STCP.PubKeyTable, + DmsgDiscAddr: cfg.Messaging.Discovery, + DmsgMinSrvs: cfg.Messaging.ServerCount, + STCPLocalAddr: cfg.STCP.LocalAddr, + STCPTable: cfg.STCP.PubKeyTable, }) if err := node.n.Init(ctx); err != nil { return nil, fmt.Errorf("failed to init network: %v", err) } - if config.DmsgPty != nil { - pty, err := config.DmsgPtyHost(node.n.Dmsg()) + if cfg.DmsgPty != nil { + pty, err := cfg.DmsgPtyHost(node.n.Dmsg()) if err != nil { return nil, fmt.Errorf("failed to setup pty: %v", err) } node.pty = pty } - masterLogger.Info("'dmsgpty' is not configured, skipping...") - trDiscovery, err := config.TransportDiscovery() + logger.Info("'dmsgpty' is not configured, skipping...") + + trDiscovery, err := cfg.TransportDiscovery() if err != nil { return nil, fmt.Errorf("invalid MessagingConfig: %s", err) } - logStore, err := config.TransportLogStore() + logStore, err := cfg.TransportLogStore() if err != nil { return nil, fmt.Errorf("invalid TransportLogStore: %s", err) } tmConfig := &transport.ManagerConfig{ PubKey: pk, SecKey: sk, - DefaultNodes: config.TrustedNodes, + DefaultNodes: cfg.TrustedNodes, DiscoveryClient: trDiscovery, LogStore: logStore, } @@ -161,53 +170,57 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger, restartCtx *res return nil, fmt.Errorf("transport manager: %s", err) } - node.rt, err = config.RoutingTable() + node.rt, err = cfg.RoutingTable() if err != nil { return nil, fmt.Errorf("routing table: %s", err) } + rConfig := &router.Config{ Logger: node.Logger.PackageLogger("router"), PubKey: pk, SecKey: sk, TransportManager: node.tm, RoutingTable: node.rt, - RouteFinder: rfclient.NewHTTP(config.Routing.RouteFinder, time.Duration(config.Routing.RouteFinderTimeout)), - SetupNodes: config.Routing.SetupNodes, + RouteFinder: rfclient.NewHTTP(cfg.Routing.RouteFinder, time.Duration(cfg.Routing.RouteFinderTimeout)), + SetupNodes: cfg.Routing.SetupNodes, } + r, err := router.New(node.n, rConfig) if err != nil { return nil, fmt.Errorf("failed to setup router: %v", err) } node.router = r - node.appsConf, err = config.AppsConfig() + node.appsConf, err = cfg.AppsConfig() if err != nil { return nil, fmt.Errorf("invalid AppsConfig: %s", err) } - node.appsPath, err = config.AppsDir() + node.appsPath, err = cfg.AppsDir() if err != nil { return nil, fmt.Errorf("invalid AppsPath: %s", err) } - node.localPath, err = config.LocalDir() + node.localPath, err = cfg.LocalDir() if err != nil { return nil, fmt.Errorf("invalid LocalPath: %s", err) } - if lvl, err := logging.LevelFromString(config.LogLevel); err == nil { + if lvl, err := logging.LevelFromString(cfg.LogLevel); err == nil { node.Logger.SetLevel(lvl) } - if config.Interfaces.RPCAddress != "" { - l, err := net.Listen("tcp", config.Interfaces.RPCAddress) + if cfg.Interfaces.RPCAddress != "" { + l, err := net.Listen("tcp", cfg.Interfaces.RPCAddress) if err != nil { return nil, fmt.Errorf("failed to setup RPC listener: %s", err) } node.rpcListener = l } - node.rpcDialers = make([]*noise.RPCClientDialer, len(config.Hypervisors)) - for i, entry := range config.Hypervisors { + + node.rpcDialers = make([]*noise.RPCClientDialer, len(cfg.Hypervisors)) + + for i, entry := range cfg.Hypervisors { node.rpcDialers[i] = noise.NewRPCClientDialer(entry.Addr, noise.HandshakeXK, noise.Config{ LocalPK: pk, LocalSK: sk, @@ -244,6 +257,7 @@ func (node *Node) Start() error { if !ac.AutoStart { continue } + go func(a AppConfig) { if err := node.SpawnApp(&a, nil); err != nil { node.logger.Warnf("Failed to start %s: %s\n", a.App, err) @@ -255,10 +269,13 @@ func (node *Node) Start() error { if err := rpcSvr.RegisterName(RPCPrefix, &RPC{node: node}); err != nil { return fmt.Errorf("rpc server created failed: %s", err) } + if node.rpcListener != nil { node.logger.Info("Starting RPC interface on ", node.rpcListener.Addr()) + go rpcSvr.Accept(node.rpcListener) } + for _, dialer := range node.rpcDialers { go func(dialer *noise.RPCClientDialer) { if err := dialer.Run(rpcSvr, time.Second); err != nil { @@ -268,6 +285,7 @@ func (node *Node) Start() error { } node.logger.Info("Starting packet router") + if err := node.router.Serve(ctx); err != nil { return fmt.Errorf("failed to start Node: %s", err) } @@ -339,6 +357,7 @@ func (node *Node) Close() (err error) { if node == nil { return nil } + if node.rpcListener != nil { if err = node.rpcListener.Close(); err != nil { node.logger.WithError(err).Error("failed to stop RPC interface") @@ -346,6 +365,7 @@ func (node *Node) Close() (err error) { node.logger.Info("RPC interface stopped successfully") } } + for i, dialer := range node.rpcDialers { if err = dialer.Close(); err != nil { node.logger.WithError(err).Errorf("(%d) failed to stop RPC dialer", i) @@ -375,6 +395,7 @@ func (node *Node) Exec(command string) ([]byte, error) { func (node *Node) Apps() []*AppState { // TODO: move app states to the app module res := make([]*AppState, 0) + for _, app := range node.appsConf { state := &AppState{app.App, app.AutoStart, app.Port, AppStatusStopped} @@ -393,6 +414,7 @@ func (node *Node) StartApp(appName string) error { for _, app := range node.appsConf { if app.App == appName { startCh := make(chan struct{}) + go func(app AppConfig) { if err := node.SpawnApp(&app, startCh); err != nil { node.logger.Warnf("Failed to start app %s: %s", appName, err) @@ -431,6 +453,7 @@ func (node *Node) SpawnApp(config *AppConfig, startCh chan<- struct{}) (err erro // TODO: make PackageLogger return *RuleEntry. FieldLogger doesn't expose Writer. logger := node.logger.WithField("_module", fmt.Sprintf("%s.v%s", config.App, config.Version)).Writer() + defer func() { if logErr := logger.Close(); err == nil && logErr != nil { err = logErr @@ -494,5 +517,68 @@ func (node *Node) SetAutoStart(appName string, autoStart bool) error { appConf.AutoStart = autoStart node.appsConf[appName] = appConf - return nil + + return node.updateAutoStartConfig(appName, autoStart) +} + +func (node *Node) updateAutoStartConfig(appName string, autoStart bool) error { + if node.confPath == nil { + return nil + } + + config, err := node.readConfig() + if err != nil { + return err + } + + changed := false + + for _, app := range config.Apps { + if app.App == appName { + app.AutoStart = autoStart + changed = true + } + } + + if !changed { + return nil + } + + return node.writeConfig(config) +} + +func (node *Node) readConfig() (*Config, error) { + if node.confPath == nil { + return nil, ErrNoConfigPath + } + + configPath := *node.confPath + + bytes, err := ioutil.ReadFile(configPath) + if err != nil { + return nil, err + } + + var config Config + if err := json.Unmarshal(bytes, &config); err != nil { + return nil, err + } + + return &config, nil +} + +func (node *Node) writeConfig(config *Config) error { + if node.confPath == nil { + return ErrNoConfigPath + } + + configPath := *node.confPath + + bytes, err := json.Marshal(config) + if err != nil { + return err + } + + const filePerm = 0644 + return ioutil.WriteFile(configPath, bytes, filePerm) }