Skip to content

Commit

Permalink
Merge branch 'mainnet' into feature/dmsg
Browse files Browse the repository at this point in the history
  • Loading branch information
志宇 authored Jun 19, 2019
2 parents ab70929 + 46f5095 commit 25f677d
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 3 deletions.
80 changes: 79 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package node

import (
"bufio"
"context"
"errors"
"fmt"
Expand All @@ -11,11 +12,15 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/skycoin/skywire/pkg/dmsg"
"github.com/skycoin/skywire/pkg/util/pathutil"

"github.com/skycoin/skycoin/src/util/logging"

Expand Down Expand Up @@ -95,6 +100,8 @@ type Node struct {
startedMu sync.RWMutex
startedApps map[string]*appBind

pidMu sync.Mutex

rpcListener net.Listener
rpcDialers []*noise.RPCClientDialer
}
Expand Down Expand Up @@ -204,6 +211,8 @@ func (node *Node) Start() error {
}
node.logger.Info("Connected to messaging servers")

pathutil.EnsureDir(node.dir())
node.closePreviousApps()
for _, ac := range node.appsConf {
if !ac.AutoStart {
continue
Expand Down Expand Up @@ -240,6 +249,61 @@ func (node *Node) Start() error {
return nil
}

func (node *Node) dir() string {
return pathutil.NodeDir(node.config.Node.StaticPubKey)
}

func (node *Node) pidFile() *os.File {
f, err := os.OpenFile(filepath.Join(node.dir(), "apps-pid.txt"), os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
panic(err)
}

return f
}

func (node *Node) closePreviousApps() {
node.logger.Info("killing previously ran apps if any...")

pids := node.pidFile()
defer pids.Close() // nocheck: err

scanner := bufio.NewScanner(pids)
for scanner.Scan() {
appInfo := strings.Split(scanner.Text(), " ")
if len(appInfo) != 2 {
node.logger.Fatal("error parsing %s. Err: %s", pids.Name(), errors.New("line should be: [app name] [pid]"))
}

pid, err := strconv.Atoi(appInfo[1])
if err != nil {
node.logger.Fatal("error parsing %s. Err: %s", pids.Name(), err)
}

node.stopUnhandledApp(appInfo[0], pid)
}

// empty file
pathutil.AtomicWriteFile(pids.Name(), []byte{})
}

func (node *Node) stopUnhandledApp(name string, pid int) {
p, err := os.FindProcess(pid)
if err != nil {
if runtime.GOOS != "windows" {
node.logger.Infof("Previous app %s ran by this node with pid: %d not found", name, pid)
}
return
}

err = p.Signal(syscall.SIGKILL)
if err != nil {
return
}

node.logger.Infof("Found and killed hanged app %s with pid %d previously ran by this node", name, pid)
}

// Close safely stops spawned Apps and messaging Node.
func (node *Node) Close() (err error) {
if node.rpcListener != nil {
Expand Down Expand Up @@ -357,6 +421,11 @@ func (node *Node) SpawnApp(config *AppConfig, startCh chan<- struct{}) error {
node.startedMu.Lock()
bind.pid = pid
node.startedMu.Unlock()

node.pidMu.Lock()
node.logger.Infof("storing app %s pid %d", config.App, pid)
node.persistPID(config.App, pid)
node.pidMu.Unlock()
appCh <- node.executer.Wait(cmd)
}()

Expand Down Expand Up @@ -390,6 +459,14 @@ func (node *Node) SpawnApp(config *AppConfig, startCh chan<- struct{}) error {
return appErr
}

func (node *Node) persistPID(name string, pid int) {
pidF := node.pidFile()
pidFName := pidF.Name()
pidF.Close()

pathutil.AtomicAppendToFile(pidFName, []byte(fmt.Sprintf("%s %d\n", name, pid)))
}

// StopApp stops running App.
func (node *Node) StopApp(appName string) error {
node.startedMu.Lock()
Expand Down Expand Up @@ -446,6 +523,7 @@ func (exc *osExecuter) Start(cmd *exec.Cmd) (int, error) {
exc.mu.Lock()
exc.processes = append(exc.processes, cmd.Process)
exc.mu.Unlock()

return cmd.Process.Pid, nil
}

Expand All @@ -458,7 +536,7 @@ func (exc *osExecuter) Stop(pid int) (err error) {
continue
}

if sigErr := process.Signal(syscall.SIGTERM); sigErr != nil && err == nil {
if sigErr := process.Signal(syscall.SIGKILL); sigErr != nil && err == nil {
err = sigErr
}
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/skycoin/skywire/pkg/dmsg"
"github.com/skycoin/skywire/pkg/util/pathutil"

"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -100,11 +101,16 @@ func TestNodeStartClose(t *testing.T) {
}

func TestNodeSpawnApp(t *testing.T) {
pk, _ := cipher.GenerateKeyPair()
r := new(mockRouter)
executer := &MockExecuter{}
defer os.RemoveAll("skychat")
apps := []AppConfig{{App: "skychat", Version: "1.0", AutoStart: false, Port: 10, Args: []string{"foo"}}}
node := &Node{router: r, executer: executer, appsConf: apps, startedApps: map[string]*appBind{}, logger: logging.MustGetLogger("test")}
node := &Node{router: r, executer: executer, appsConf: apps, startedApps: map[string]*appBind{}, logger: logging.MustGetLogger("test"),
config: &Config{}}
node.config.Node.StaticPubKey = pk
pathutil.EnsureDir(node.dir())
defer os.RemoveAll(node.dir())

require.NoError(t, node.StartApp("skychat"))
time.Sleep(100 * time.Millisecond)
Expand Down
12 changes: 11 additions & 1 deletion pkg/node/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"testing"
"time"

"github.com/skycoin/skywire/pkg/cipher"
"github.com/skycoin/skywire/pkg/util/pathutil"

"github.com/google/uuid"
"github.com/skycoin/skycoin/src/util/logging"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -47,12 +50,16 @@ func TestListApps(t *testing.T) {
}

func TestStartStopApp(t *testing.T) {
pk, _ := cipher.GenerateKeyPair()
router := new(mockRouter)
executer := new(MockExecuter)
defer os.RemoveAll("skychat")

apps := []AppConfig{{App: "foo", Version: "1.0", AutoStart: false, Port: 10}}
node := &Node{router: router, executer: executer, appsConf: apps, startedApps: map[string]*appBind{}, logger: logging.MustGetLogger("test")}
node := &Node{router: router, executer: executer, appsConf: apps, startedApps: map[string]*appBind{}, logger: logging.MustGetLogger("test"), config: &Config{}}
node.config.Node.StaticPubKey = pk
pathutil.EnsureDir(node.dir())
defer os.RemoveAll(node.dir())

rpc := &RPC{node: node}
unknownApp := "bar"
Expand Down Expand Up @@ -119,10 +126,13 @@ func TestRPC(t *testing.T) {
startedApps: map[string]*appBind{},
logger: logging.MustGetLogger("test"),
}
pathutil.EnsureDir(node.dir())
defer os.RemoveAll(node.dir())

require.NoError(t, node.StartApp("foo"))
require.NoError(t, node.StartApp("bar"))

time.Sleep(time.Second)
gateway := &RPC{node: node}

sConn, cConn := net.Pipe()
Expand Down
59 changes: 59 additions & 0 deletions pkg/util/pathutil/homedir.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package pathutil

import (
"io/ioutil"
"os"
"path"
"path/filepath"
"runtime"

"github.com/skycoin/skywire/pkg/cipher"
)

// HomeDir obtains the path to the user's home directory via ENVs.
Expand All @@ -17,3 +22,57 @@ func HomeDir() string {
}
return os.Getenv("HOME")
}

// NodeDir returns a path to a directory used to store specific node configuration. Such dir is ~/.skywire/{PK}
func NodeDir(pk cipher.PubKey) string {
return filepath.Join(HomeDir(), ".skycoin", "skywire", pk.String())
}

// EnsureDir attempts to create given directory, panics if it fails to do so
func EnsureDir(path string) {
if _, err := os.Stat(path); os.IsNotExist(err) {
err := os.MkdirAll(path, 0750)
if err != nil {
panic(err)
}
}
}

// AtomicWriteFile creates a temp file in which to write data, then calls syscall.Rename to swap it and write it on
// filename for an atomic write. On failure temp file is removed and panics.
func AtomicWriteFile(filename string, data []byte) {
dir, name := path.Split(filename)
f, err := ioutil.TempFile(dir, name)
if err != nil {
panic(err)
}

_, err = f.Write(data)
if err == nil {
err = f.Sync()
}
if closeErr := f.Close(); err == nil {
err = closeErr
}
if permErr := os.Chmod(f.Name(), 0600); err == nil {
err = permErr
}
if err == nil {
err = os.Rename(f.Name(), filename)
}

if err != nil {
os.Remove(f.Name()) // nolint: errcheck
panic(err)
}
}

// AtomicAppendToFile calls AtomicWriteFile but appends new data to destiny file
func AtomicAppendToFile(filename string, data []byte) {
oldFile, err := ioutil.ReadFile(filename)
if err != nil {
panic(err)
}

AtomicWriteFile(filename, append(oldFile, data...))
}

0 comments on commit 25f677d

Please sign in to comment.