Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace unix pipes with TCP sockets #231

Merged
merged 3 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/skywire-cli/commands/visor/gen-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"path/filepath"
"time"

"github.com/SkycoinProject/skywire-mainnet/pkg/app/appcommon"

"github.com/SkycoinProject/skywire-mainnet/internal/skyenv"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"

Expand Down Expand Up @@ -147,7 +149,7 @@ func defaultConfig() *visor.Config {

conf.Interfaces.RPCAddress = "localhost:3435"

conf.AppServerSockFile = "/tmp/visor_" + pk.Hex() + ".sock"
conf.AppServerAddr = appcommon.DefaultServerAddr

return conf
}
Expand Down
10 changes: 3 additions & 7 deletions cmd/skywire-visor/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,9 @@ func (cfg *runCfg) runVisor() *runCfg {
time.Sleep(startDelay)

if cfg.conf.DmsgPty != nil {
err = visor.UnlinkSocketFiles(cfg.conf.AppServerSockFile, cfg.conf.DmsgPty.CLIAddr)
} else {
err = visor.UnlinkSocketFiles(cfg.conf.AppServerSockFile)
}

if err != nil {
cfg.logger.Fatal("failed to unlink socket files: ", err)
if err := visor.UnlinkSocketFiles(cfg.conf.DmsgPty.CLIAddr); err != nil {
cfg.logger.Fatal("failed to unlink socket files: ", err)
}
}

vis, err := visor.NewVisor(&cfg.conf, cfg.masterLogger, cfg.restartCtx, cfg.configPath)
Expand Down
12 changes: 7 additions & 5 deletions pkg/app/appcommon/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package appcommon

const DefaultServerAddr = "localhost:5505"

// Config defines configuration parameters for `Proc`.
type Config struct {
Name string `json:"name"`
SockFilePath string `json:"sock_file_path"`
VisorPK string `json:"visor_pk"`
BinaryDir string `json:"binary_dir"`
WorkDir string `json:"work_dir"`
Name string `json:"name"`
ServerAddr string `json:"server_addr"`
VisorPK string `json:"visor_pk"`
BinaryDir string `json:"binary_dir"`
WorkDir string `json:"work_dir"`
}
4 changes: 2 additions & 2 deletions pkg/app/appcommon/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package appcommon
const (
// EnvAppKey is a name for env arg containing skywire application key.
EnvAppKey = "APP_KEY"
// EnvSockFile is a name for env arg containing unix socket file name.
EnvSockFile = "SW_UNIX"
// EnvServerAddr is a name for env arg containing app server address.
EnvServerAddr = "APP_SERVER_ADDR"
// EnvVisorPK is a name for env arg containing public key of visor.
EnvVisorPK = "VISOR_PK"
)
10 changes: 5 additions & 5 deletions pkg/app/appserver/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func NewProc(log *logging.Logger, c appcommon.Config, args []string, stdout, std
binaryPath := getBinaryPath(c.BinaryDir, c.Name)

const (
appKeyEnvFormat = appcommon.EnvAppKey + "=%s"
sockFileEnvFormat = appcommon.EnvSockFile + "=%s"
visorPKEnvFormat = appcommon.EnvVisorPK + "=%s"
appKeyEnvFormat = appcommon.EnvAppKey + "=%s"
serverAddrEnvFormat = appcommon.EnvServerAddr + "=%s"
visorPKEnvFormat = appcommon.EnvVisorPK + "=%s"
)

env := make([]string, 0, 3)
env := make([]string, 0, 4)
env = append(env, fmt.Sprintf(appKeyEnvFormat, key))
env = append(env, fmt.Sprintf(sockFileEnvFormat, c.SockFilePath))
env = append(env, fmt.Sprintf(serverAddrEnvFormat, c.ServerAddr))
env = append(env, fmt.Sprintf(visorPKEnvFormat, c.VisorPK))

cmd := exec.Command(binaryPath, args...) // nolint:gosec
Expand Down
8 changes: 5 additions & 3 deletions pkg/app/appserver/proc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"sort"
"testing"

"github.com/SkycoinProject/skywire-mainnet/pkg/app/appcommon"

"github.com/SkycoinProject/skycoin/src/util/logging"
"github.com/stretchr/testify/require"
)

func TestProcManager_Exists(t *testing.T) {
srv := New(nil, "")
srv := New(nil, appcommon.DefaultServerAddr)

mIfc := NewProcManager(logging.MustGetLogger("proc_manager"), srv)
m, ok := mIfc.(*procManager)
Expand All @@ -27,7 +29,7 @@ func TestProcManager_Exists(t *testing.T) {
}

func TestProcManager_Range(t *testing.T) {
srv := New(nil, "")
srv := New(nil, appcommon.DefaultServerAddr)

mIfc := NewProcManager(logging.MustGetLogger("proc_manager"), srv)
m, ok := mIfc.(*procManager)
Expand Down Expand Up @@ -57,7 +59,7 @@ func TestProcManager_Range(t *testing.T) {
}

func TestProcManager_Pop(t *testing.T) {
srv := New(nil, "")
srv := New(nil, appcommon.DefaultServerAddr)

mIfc := NewProcManager(logging.MustGetLogger("proc_manager"), srv)
m, ok := mIfc.(*procManager)
Expand Down
24 changes: 12 additions & 12 deletions pkg/app/appserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ import (

// Server is a server for app/visor communication.
type Server struct {
log *logging.Logger
lis net.Listener
sockFile string
rpcS *rpc.Server
done sync.WaitGroup
stopCh chan struct{}
log *logging.Logger
lis net.Listener
addr string
rpcS *rpc.Server
done sync.WaitGroup
stopCh chan struct{}
}

// New constructs server.
func New(log *logging.Logger, sockFile string) *Server {
func New(log *logging.Logger, addr string) *Server {
return &Server{
log: log,
sockFile: sockFile,
rpcS: rpc.NewServer(),
stopCh: make(chan struct{}),
log: log,
addr: addr,
rpcS: rpc.NewServer(),
stopCh: make(chan struct{}),
}
}

Expand All @@ -42,7 +42,7 @@ func (s *Server) Register(appKey appcommon.Key) error {

// ListenAndServe starts listening for incoming app connections via unix socket.
func (s *Server) ListenAndServe() error {
l, err := net.Listen("unix", s.sockFile)
l, err := net.Listen("tcp", s.addr)
if err != nil {
return err
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/app/appserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,23 @@ import (
)

const (
sockFile = "/tmp/app.sock"
sleepDelay = 500 * time.Millisecond
)

func TestServer_ListenAndServe(t *testing.T) {
l := logging.MustGetLogger("app_server")

s := appserver.New(l, sockFile)
s := appserver.New(l, appcommon.DefaultServerAddr)

appKey := appcommon.GenerateAppKey()

require.NoError(t, s.Register(appKey))

visorPK, _ := cipher.GenerateKeyPair()
clientConfig := app.ClientConfig{
VisorPK: visorPK,
SockFile: sockFile,
AppKey: appKey,
VisorPK: visorPK,
ServerAddr: appcommon.DefaultServerAddr,
AppKey: appKey,
}

errCh := make(chan error, 1)
Expand Down
24 changes: 12 additions & 12 deletions pkg/app/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ var (
ErrVisorPKNotProvided = errors.New("visor PK is not provided")
// ErrVisorPKInvalid is returned when the visor PK is invalid.
ErrVisorPKInvalid = errors.New("visor PK is invalid")
// ErrSockFileNotProvided is returned when the sock file is not provided.
ErrSockFileNotProvided = errors.New("sock file is not provided")
// ErrServerAddrNotProvided is returned when app server address is not provided.
ErrServerAddrNotProvided = errors.New("server address is not provided")
// ErrAppKeyNotProvided is returned when the app key is not provided.
ErrAppKeyNotProvided = errors.New("app key is not provided")
)

// ClientConfig is a configuration for `Client`.
type ClientConfig struct {
VisorPK cipher.PubKey
SockFile string
AppKey appcommon.Key
VisorPK cipher.PubKey
ServerAddr string
AppKey appcommon.Key
}

// ClientConfigFromEnv creates client config from the ENV args.
Expand All @@ -42,9 +42,9 @@ func ClientConfigFromEnv() (ClientConfig, error) {
return ClientConfig{}, ErrAppKeyNotProvided
}

sockFile := os.Getenv(appcommon.EnvSockFile)
if sockFile == "" {
return ClientConfig{}, ErrSockFileNotProvided
serverAddr := os.Getenv(appcommon.EnvServerAddr)
if serverAddr == "" {
return ClientConfig{}, ErrServerAddrNotProvided
}

visorPKStr := os.Getenv(appcommon.EnvVisorPK)
Expand All @@ -58,9 +58,9 @@ func ClientConfigFromEnv() (ClientConfig, error) {
}

return ClientConfig{
VisorPK: visorPK,
SockFile: sockFile,
AppKey: appcommon.Key(appKey),
VisorPK: visorPK,
ServerAddr: serverAddr,
AppKey: appcommon.Key(appKey),
}, nil
}

Expand All @@ -77,7 +77,7 @@ type Client struct {
// - log: logger instance.
// - config: client configuration.
func NewClient(log *logging.Logger, config ClientConfig) (*Client, error) {
rpcCl, err := rpc.Dial("unix", config.SockFile)
rpcCl, err := rpc.Dial("tcp", config.ServerAddr)
if err != nil {
return nil, fmt.Errorf("error connecting to the app server: %v", err)
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/app/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestClientConfigFromEnv(t *testing.T) {
err := os.Setenv(appcommon.EnvAppKey, "")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvSockFile, "")
err = os.Setenv(appcommon.EnvServerAddr, "")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvVisorPK, "")
Expand All @@ -33,15 +33,15 @@ func TestClientConfigFromEnv(t *testing.T) {
visorPK, _ := cipher.GenerateKeyPair()

wantCfg := ClientConfig{
VisorPK: visorPK,
SockFile: "sock.unix",
AppKey: "key",
VisorPK: visorPK,
ServerAddr: appcommon.DefaultServerAddr,
AppKey: "key",
}

err := os.Setenv(appcommon.EnvAppKey, string(wantCfg.AppKey))
require.NoError(t, err)

err = os.Setenv(appcommon.EnvSockFile, wantCfg.SockFile)
err = os.Setenv(appcommon.EnvServerAddr, wantCfg.ServerAddr)
require.NoError(t, err)

err = os.Setenv(appcommon.EnvVisorPK, wantCfg.VisorPK.Hex())
Expand All @@ -59,14 +59,14 @@ func TestClientConfigFromEnv(t *testing.T) {
require.Equal(t, err, ErrAppKeyNotProvided)
})

t.Run("no sock file", func(t *testing.T) {
t.Run("no app server address", func(t *testing.T) {
resetEnv(t)

err := os.Setenv(appcommon.EnvAppKey, "val")
require.NoError(t, err)

_, err = ClientConfigFromEnv()
require.Equal(t, err, ErrSockFileNotProvided)
require.Equal(t, err, ErrServerAddrNotProvided)
})

t.Run("no visor PK", func(t *testing.T) {
Expand All @@ -75,7 +75,7 @@ func TestClientConfigFromEnv(t *testing.T) {
err := os.Setenv(appcommon.EnvAppKey, "val")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvSockFile, "val")
err = os.Setenv(appcommon.EnvServerAddr, appcommon.DefaultServerAddr)
require.NoError(t, err)

_, err = ClientConfigFromEnv()
Expand All @@ -88,7 +88,7 @@ func TestClientConfigFromEnv(t *testing.T) {
err := os.Setenv(appcommon.EnvAppKey, "val")
require.NoError(t, err)

err = os.Setenv(appcommon.EnvSockFile, "val")
err = os.Setenv(appcommon.EnvServerAddr, appcommon.DefaultServerAddr)
require.NoError(t, err)

err = os.Setenv(appcommon.EnvVisorPK, "val")
Expand Down
2 changes: 1 addition & 1 deletion pkg/visor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Config struct {

Interfaces InterfaceConfig `json:"interfaces"`

AppServerSockFile string `json:"app_server_sock_file"`
AppServerAddr string `json:"app_server_addr"`

RestartCheckDelay string `json:"restart_check_delay,omitempty"`
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/visor/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func TestStartStopApp(t *testing.T) {
unknownApp := "bar"
app := apps["foo"].App

visorCfg := Config{}
visorCfg := Config{
AppServerAddr: appcommon.DefaultServerAddr,
}
visorCfg.Visor.StaticPubKey = pk

visor := &Visor{
Expand All @@ -164,10 +166,10 @@ func TestStartStopApp(t *testing.T) {
}()

appCfg1 := appcommon.Config{
Name: app,
SockFilePath: visorCfg.AppServerSockFile,
VisorPK: visorCfg.Visor.StaticPubKey.Hex(),
WorkDir: filepath.Join("", app),
Name: app,
ServerAddr: appcommon.DefaultServerAddr,
VisorPK: visorCfg.Visor.StaticPubKey.Hex(),
WorkDir: filepath.Join("", app),
}

appArgs1 := append([]string{filepath.Join(visor.dir(), app)}, apps["foo"].Args...)
Expand Down
20 changes: 6 additions & 14 deletions pkg/visor/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func NewVisor(cfg *Config, logger *logging.MasterLogger, restartCtx *restart.Con
visor.hvErrs[hv.PubKey] = make(chan error, 1)
}

visor.appRPCServer = appserver.New(logging.MustGetLogger("app_rpc_server"), visor.conf.AppServerSockFile)
visor.appRPCServer = appserver.New(logging.MustGetLogger("app_rpc_server"), visor.conf.AppServerAddr)

go func() {
if err := visor.appRPCServer.ListenAndServe(); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
Expand Down Expand Up @@ -490,14 +490,6 @@ func (visor *Visor) Close() (err error) {
visor.logger.WithError(err).Error("RPC server closed with error.")
}

if err := UnlinkSocketFiles(visor.conf.AppServerSockFile); err != nil {
visor.logger.WithError(err).WithField("file_name", visor.conf.AppServerSockFile).
Error("Failed to unlink socket file.")
} else {
visor.logger.WithField("file_name", visor.conf.AppServerSockFile).
Debug("Socket file removed successfully.")
}

return err
}

Expand Down Expand Up @@ -567,11 +559,11 @@ func (visor *Visor) SpawnApp(config *AppConfig, startCh chan<- struct{}) (err er
}

appCfg := appcommon.Config{
Name: config.App,
SockFilePath: visor.conf.AppServerSockFile,
VisorPK: visor.conf.Visor.StaticPubKey.Hex(),
BinaryDir: visor.appsPath,
WorkDir: filepath.Join(visor.localPath, config.App),
Name: config.App,
ServerAddr: visor.conf.AppServerAddr,
VisorPK: visor.conf.Visor.StaticPubKey.Hex(),
BinaryDir: visor.appsPath,
WorkDir: filepath.Join(visor.localPath, config.App),
}

if _, err := ensureDir(appCfg.WorkDir); err != nil {
Expand Down
Loading