Skip to content

Commit

Permalink
Merge pull request #231 from Darkren/feature/replace-unix-pipes
Browse files Browse the repository at this point in the history
Replace unix pipes with TCP sockets
  • Loading branch information
jdknives authored Mar 23, 2020
2 parents 3ded5bf + b5629fa commit 3ba933a
Show file tree
Hide file tree
Showing 14 changed files with 103 additions and 102 deletions.
4 changes: 3 additions & 1 deletion cmd/skywire-cli/commands/visor/gen-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"path/filepath"
"time"

"github.com/SkycoinProject/skywire-mainnet/pkg/app/appcommon"

"github.com/SkycoinProject/skywire-mainnet/internal/skyenv"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"

Expand Down Expand Up @@ -147,7 +149,7 @@ func defaultConfig() *visor.Config {

conf.Interfaces.RPCAddress = "localhost:3435"

conf.AppServerSockFile = "/tmp/visor_" + pk.Hex() + ".sock"
conf.AppServerAddr = appcommon.DefaultServerAddr

return conf
}
Expand Down
10 changes: 3 additions & 7 deletions cmd/skywire-visor/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,9 @@ func (cfg *runCfg) runVisor() *runCfg {
time.Sleep(startDelay)

if cfg.conf.DmsgPty != nil {
err = visor.UnlinkSocketFiles(cfg.conf.AppServerSockFile, cfg.conf.DmsgPty.CLIAddr)
} else {
err = visor.UnlinkSocketFiles(cfg.conf.AppServerSockFile)
}

if err != nil {
cfg.logger.Fatal("failed to unlink socket files: ", err)
if err := visor.UnlinkSocketFiles(cfg.conf.DmsgPty.CLIAddr); err != nil {
cfg.logger.Fatal("failed to unlink socket files: ", err)
}
}

vis, err := visor.NewVisor(&cfg.conf, cfg.masterLogger, cfg.restartCtx, cfg.configPath)
Expand Down
12 changes: 7 additions & 5 deletions pkg/app/appcommon/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package appcommon

const DefaultServerAddr = "localhost:5505"

// Config defines configuration parameters for `Proc`.
type Config struct {
Name string `json:"name"`
SockFilePath string `json:"sock_file_path"`
VisorPK string `json:"visor_pk"`
BinaryDir string `json:"binary_dir"`
WorkDir string `json:"work_dir"`
Name string `json:"name"`
ServerAddr string `json:"server_addr"`
VisorPK string `json:"visor_pk"`
BinaryDir string `json:"binary_dir"`
WorkDir string `json:"work_dir"`
}
4 changes: 2 additions & 2 deletions pkg/app/appcommon/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package appcommon
const (
// EnvAppKey is a name for env arg containing skywire application key.
EnvAppKey = "APP_KEY"
// EnvSockFile is a name for env arg containing unix socket file name.
EnvSockFile = "SW_UNIX"
// EnvServerAddr is a name for env arg containing app server address.
EnvServerAddr = "APP_SERVER_ADDR"
// EnvVisorPK is a name for env arg containing public key of visor.
EnvVisorPK = "VISOR_PK"
)
10 changes: 5 additions & 5 deletions pkg/app/appserver/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func NewProc(log *logging.Logger, c appcommon.Config, args []string, stdout, std
binaryPath := getBinaryPath(c.BinaryDir, c.Name)

const (
appKeyEnvFormat = appcommon.EnvAppKey + "=%s"
sockFileEnvFormat = appcommon.EnvSockFile + "=%s"
visorPKEnvFormat = appcommon.EnvVisorPK + "=%s"
appKeyEnvFormat = appcommon.EnvAppKey + "=%s"
serverAddrEnvFormat = appcommon.EnvServerAddr + "=%s"
visorPKEnvFormat = appcommon.EnvVisorPK + "=%s"
)

env := make([]string, 0, 3)
env := make([]string, 0, 4)
env = append(env, fmt.Sprintf(appKeyEnvFormat, key))
env = append(env, fmt.Sprintf(sockFileEnvFormat, c.SockFilePath))
env = append(env, fmt.Sprintf(serverAddrEnvFormat, c.ServerAddr))
env = append(env, fmt.Sprintf(visorPKEnvFormat, c.VisorPK))

cmd := exec.Command(binaryPath, args...) // nolint:gosec
Expand Down
8 changes: 5 additions & 3 deletions pkg/app/appserver/proc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"sort"
"testing"

"github.com/SkycoinProject/skywire-mainnet/pkg/app/appcommon"

"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/stretchr/testify/require"
)

func TestProcManager_Exists(t *testing.T) {
srv := New(nil, "")
srv := New(nil, appcommon.DefaultServerAddr)

mIfc := NewProcManager(logging.MustGetLogger("proc_manager"), srv)
m, ok := mIfc.(*procManager)
Expand All @@ -27,7 +29,7 @@ func TestProcManager_Exists(t *testing.T) {
}

func TestProcManager_Range(t *testing.T) {
srv := New(nil, "")
srv := New(nil, appcommon.DefaultServerAddr)

mIfc := NewProcManager(logging.MustGetLogger("proc_manager"), srv)
m, ok := mIfc.(*procManager)
Expand Down Expand Up @@ -57,7 +59,7 @@ func TestProcManager_Range(t *testing.T) {
}

func TestProcManager_Pop(t *testing.T) {
srv := New(nil, "")
srv := New(nil, appcommon.DefaultServerAddr)

mIfc := NewProcManager(logging.MustGetLogger("proc_manager"), srv)
m, ok := mIfc.(*procManager)
Expand Down
24 changes: 12 additions & 12 deletions pkg/app/appserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ import (

// Server is a server for app/visor communication.
type Server struct {
log *logging.Logger
lis net.Listener
sockFile string
rpcS *rpc.Server
done sync.WaitGroup
stopCh chan struct{}
log *logging.Logger
lis net.Listener
addr string
rpcS *rpc.Server
done sync.WaitGroup
stopCh chan struct{}
}

// New constructs server.
func New(log *logging.Logger, sockFile string) *Server {
func New(log *logging.Logger, addr string) *Server {
return &Server{
log: log,
sockFile: sockFile,
rpcS: rpc.NewServer(),
stopCh: make(chan struct{}),
log: log,
addr: addr,
rpcS: rpc.NewServer(),
stopCh: make(chan struct{}),
}
}

Expand All @@ -42,7 +42,7 @@ func (s *Server) Register(appKey appcommon.Key) error {

// ListenAndServe starts listening for incoming app connections via unix socket.
func (s *Server) ListenAndServe() error {
l, err := net.Listen("unix", s.sockFile)
l, err := net.Listen("tcp", s.addr)
if err != nil {
return err
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/app/appserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,23 @@ import (
)

const (
sockFile = "/tmp/app.sock"
sleepDelay = 500 * time.Millisecond
)

func TestServer_ListenAndServe(t *testing.T) {
l := logging.MustGetLogger("app_server")

s := appserver.New(l, sockFile)
s := appserver.New(l, appcommon.DefaultServerAddr)

appKey := appcommon.GenerateAppKey()

require.NoError(t, s.Register(appKey))

visorPK, _ := cipher.GenerateKeyPair()
clientConfig := app.ClientConfig{
VisorPK: visorPK,
SockFile: sockFile,
AppKey: appKey,
VisorPK: visorPK,
ServerAddr: appcommon.DefaultServerAddr,
AppKey: appKey,
}

errCh := make(chan error, 1)
Expand Down
24 changes: 12 additions & 12 deletions pkg/app/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ var (
ErrVisorPKNotProvided = errors.New("visor PK is not provided")
// ErrVisorPKInvalid is returned when the visor PK is invalid.
ErrVisorPKInvalid = errors.New("visor PK is invalid")
// ErrSockFileNotProvided is returned when the sock file is not provided.
ErrSockFileNotProvided = errors.New("sock file is not provided")
// ErrServerAddrNotProvided is returned when app server address is not provided.
ErrServerAddrNotProvided = errors.New("server address is not provided")
// ErrAppKeyNotProvided is returned when the app key is not provided.
ErrAppKeyNotProvided = errors.New("app key is not provided")
)

// ClientConfig is a configuration for `Client`.
type ClientConfig struct {
VisorPK cipher.PubKey
SockFile string
AppKey appcommon.Key
VisorPK cipher.PubKey
ServerAddr string
AppKey appcommon.Key
}

// ClientConfigFromEnv creates client config from the ENV args.
Expand All @@ -42,9 +42,9 @@ func ClientConfigFromEnv() (ClientConfig, error) {
return ClientConfig{}, ErrAppKeyNotProvided
}

sockFile := os.Getenv(appcommon.EnvSockFile)
if sockFile == "" {
return ClientConfig{}, ErrSockFileNotProvided
serverAddr := os.Getenv(appcommon.EnvServerAddr)
if serverAddr == "" {
return ClientConfig{}, ErrServerAddrNotProvided
}

visorPKStr := os.Getenv(appcommon.EnvVisorPK)
Expand All @@ -58,9 +58,9 @@ func ClientConfigFromEnv() (ClientConfig, error) {
}

return ClientConfig{
VisorPK: visorPK,
SockFile: sockFile,
AppKey: appcommon.Key(appKey),
VisorPK: visorPK,
ServerAddr: serverAddr,
AppKey: appcommon.Key(appKey),
}, nil
}

Expand All @@ -77,7 +77,7 @@ type Client struct {
// - log: logger instance.
// - config: client configuration.
func NewClient(log *logging.Logger, config ClientConfig) (*Client, error) {
rpcCl, err := rpc.Dial("unix", config.SockFile)
rpcCl, err := rpc.Dial("tcp", config.ServerAddr)
if err != nil {
return nil, fmt.Errorf("error connecting to the app server: %v", err)
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/app/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestClientConfigFromEnv(t *testing.T) {
err := os.Setenv(appcommon.EnvAppKey, "")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvSockFile, "")
err = os.Setenv(appcommon.EnvServerAddr, "")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvVisorPK, "")
Expand All @@ -33,15 +33,15 @@ func TestClientConfigFromEnv(t *testing.T) {
visorPK, _ := cipher.GenerateKeyPair()

wantCfg := ClientConfig{
VisorPK: visorPK,
SockFile: "sock.unix",
AppKey: "key",
VisorPK: visorPK,
ServerAddr: appcommon.DefaultServerAddr,
AppKey: "key",
}

err := os.Setenv(appcommon.EnvAppKey, string(wantCfg.AppKey))
require.NoError(t, err)

err = os.Setenv(appcommon.EnvSockFile, wantCfg.SockFile)
err = os.Setenv(appcommon.EnvServerAddr, wantCfg.ServerAddr)
require.NoError(t, err)

err = os.Setenv(appcommon.EnvVisorPK, wantCfg.VisorPK.Hex())
Expand All @@ -59,14 +59,14 @@ func TestClientConfigFromEnv(t *testing.T) {
require.Equal(t, err, ErrAppKeyNotProvided)
})

t.Run("no sock file", func(t *testing.T) {
t.Run("no app server address", func(t *testing.T) {
resetEnv(t)

err := os.Setenv(appcommon.EnvAppKey, "val")
require.NoError(t, err)

_, err = ClientConfigFromEnv()
require.Equal(t, err, ErrSockFileNotProvided)
require.Equal(t, err, ErrServerAddrNotProvided)
})

t.Run("no visor PK", func(t *testing.T) {
Expand All @@ -75,7 +75,7 @@ func TestClientConfigFromEnv(t *testing.T) {
err := os.Setenv(appcommon.EnvAppKey, "val")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvSockFile, "val")
err = os.Setenv(appcommon.EnvServerAddr, appcommon.DefaultServerAddr)
require.NoError(t, err)

_, err = ClientConfigFromEnv()
Expand All @@ -88,7 +88,7 @@ func TestClientConfigFromEnv(t *testing.T) {
err := os.Setenv(appcommon.EnvAppKey, "val")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvSockFile, "val")
err = os.Setenv(appcommon.EnvServerAddr, appcommon.DefaultServerAddr)
require.NoError(t, err)

err = os.Setenv(appcommon.EnvVisorPK, "val")
Expand Down
2 changes: 1 addition & 1 deletion pkg/visor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Config struct {

Interfaces InterfaceConfig `json:"interfaces"`

AppServerSockFile string `json:"app_server_sock_file"`
AppServerAddr string `json:"app_server_addr"`

RestartCheckDelay string `json:"restart_check_delay,omitempty"`
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/visor/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func TestStartStopApp(t *testing.T) {
unknownApp := "bar"
app := apps["foo"].App

visorCfg := Config{}
visorCfg := Config{
AppServerAddr: appcommon.DefaultServerAddr,
}
visorCfg.Visor.StaticPubKey = pk

visor := &Visor{
Expand All @@ -164,10 +166,10 @@ func TestStartStopApp(t *testing.T) {
}()

appCfg1 := appcommon.Config{
Name: app,
SockFilePath: visorCfg.AppServerSockFile,
VisorPK: visorCfg.Visor.StaticPubKey.Hex(),
WorkDir: filepath.Join("", app),
Name: app,
ServerAddr: appcommon.DefaultServerAddr,
VisorPK: visorCfg.Visor.StaticPubKey.Hex(),
WorkDir: filepath.Join("", app),
}

appArgs1 := append([]string{filepath.Join(visor.dir(), app)}, apps["foo"].Args...)
Expand Down
20 changes: 6 additions & 14 deletions pkg/visor/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func NewVisor(cfg *Config, logger *logging.MasterLogger, restartCtx *restart.Con
visor.hvErrs[hv.PubKey] = make(chan error, 1)
}

visor.appRPCServer = appserver.New(logging.MustGetLogger("app_rpc_server"), visor.conf.AppServerSockFile)
visor.appRPCServer = appserver.New(logging.MustGetLogger("app_rpc_server"), visor.conf.AppServerAddr)

go func() {
if err := visor.appRPCServer.ListenAndServe(); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
Expand Down Expand Up @@ -490,14 +490,6 @@ func (visor *Visor) Close() (err error) {
visor.logger.WithError(err).Error("RPC server closed with error.")
}

if err := UnlinkSocketFiles(visor.conf.AppServerSockFile); err != nil {
visor.logger.WithError(err).WithField("file_name", visor.conf.AppServerSockFile).
Error("Failed to unlink socket file.")
} else {
visor.logger.WithField("file_name", visor.conf.AppServerSockFile).
Debug("Socket file removed successfully.")
}

return err
}

Expand Down Expand Up @@ -567,11 +559,11 @@ func (visor *Visor) SpawnApp(config *AppConfig, startCh chan<- struct{}) (err er
}

appCfg := appcommon.Config{
Name: config.App,
SockFilePath: visor.conf.AppServerSockFile,
VisorPK: visor.conf.Visor.StaticPubKey.Hex(),
BinaryDir: visor.appsPath,
WorkDir: filepath.Join(visor.localPath, config.App),
Name: config.App,
ServerAddr: visor.conf.AppServerAddr,
VisorPK: visor.conf.Visor.StaticPubKey.Hex(),
BinaryDir: visor.appsPath,
WorkDir: filepath.Join(visor.localPath, config.App),
}

if _, err := ensureDir(appCfg.WorkDir); err != nil {
Expand Down
Loading

0 comments on commit 3ba933a

Please sign in to comment.