From eea3cc827be610f9b57413c4d791f5351c1928c9 Mon Sep 17 00:00:00 2001 From: ivcosla Date: Wed, 6 Mar 2019 07:03:15 +0100 Subject: [PATCH 1/5] config default rpc port and rule creation --- cmd/skywire-cli/commands/config.go | 2 +- pkg/router/route_manager.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/skywire-cli/commands/config.go b/cmd/skywire-cli/commands/config.go index f46e5ce675..469e2afb6b 100644 --- a/cmd/skywire-cli/commands/config.go +++ b/cmd/skywire-cli/commands/config.go @@ -75,7 +75,7 @@ func defaultConfig() *node.Config { conf.LogLevel = "info" - conf.Interfaces.RPCAddress = "localhost:3436" + conf.Interfaces.RPCAddress = "localhost:3435" return conf } diff --git a/pkg/router/route_manager.go b/pkg/router/route_manager.go index 417155f7a3..d691e5e7ea 100644 --- a/pkg/router/route_manager.go +++ b/pkg/router/route_manager.go @@ -53,7 +53,9 @@ func (rm *routeManager) RemoveLoopRule(addr *app.LoopAddr) error { } appRouteID = routeID - appRule = rule + appRule = make(routing.Rule, len(rule)) + copy(appRule, rule) + return false }) if err != nil { From d7114c72d1a65ea3b46af218dfd56bfa351bf205 Mon Sep 17 00:00:00 2001 From: ivcosla Date: Wed, 6 Mar 2019 08:13:15 +0100 Subject: [PATCH 2/5] mclient retries --- pkg/messaging/client.go | 43 +++++++++++++++++++++++++++++++----- pkg/messaging/client_test.go | 22 ++++++++++++++---- pkg/node/config.go | 15 ++++++++++--- pkg/node/config_test.go | 12 ++++++++-- pkg/node/node.go | 6 ++--- pkg/node/node_test.go | 3 ++- 6 files changed, 82 insertions(+), 19 deletions(-) diff --git a/pkg/messaging/client.go b/pkg/messaging/client.go index 795b384a9e..ad79609f54 100644 --- a/pkg/messaging/client.go +++ b/pkg/messaging/client.go @@ -10,6 +10,7 @@ import ( "fmt" "net" "sync" + "time" "github.com/skycoin/skycoin/src/util/logging" @@ -34,9 +35,20 @@ var ( type clientLink struct { link *Link + addr string chans *chanList } +// Config configures Client. +type Config struct { + PubKey cipher.PubKey + SecKey cipher.SecKey + Discovery client.APIClient + Retries int + RetryDelay time.Duration +} + + // Client sends messages to remote client nodes via relay Server. type Client struct { Logger *logging.Logger @@ -46,6 +58,9 @@ type Client struct { dc client.APIClient pool *Pool + retries int + retryDelay time.Duration + links map[cipher.PubKey]*clientLink mu sync.RWMutex @@ -54,12 +69,14 @@ type Client struct { } // NewClient constructs a new Client. -func NewClient(pubKey cipher.PubKey, secKey cipher.SecKey, discoveryClient client.APIClient) *Client { +func NewClient(conf *Config) *Client { c := &Client{ Logger: logging.MustGetLogger("messenger"), - pubKey: pubKey, - secKey: secKey, - dc: discoveryClient, + pubKey: conf.PubKey, + secKey: conf.SecKey, + dc: conf.Discovery, + retries: conf.Retries, + retryDelay: conf.RetryDelay, links: make(map[cipher.PubKey]*clientLink), newChan: make(chan *channel), doneChan: make(chan struct{}), @@ -222,7 +239,7 @@ func (c *Client) link(remotePK cipher.PubKey, addr string) (*clientLink, error) } c.Logger.Infof("Opened new link with the server %s", remotePK) - clientLink := &clientLink{l, newChanList()} + clientLink := &clientLink{l, addr, newChanList()} c.mu.Lock() c.links[remotePK] = clientLink c.mu.Unlock() @@ -329,12 +346,26 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error { func (c *Client) onClose(l *Link, remote bool) { remotePK := l.Remote() - c.Logger.Infof("Closing link with the server %s", remotePK) c.mu.RLock() chanLink := c.links[remotePK] c.mu.RUnlock() + select { + case <-c.doneChan: + default: + c.Logger.Infof("Disconnected from the server %s. Trying to re-connect...", remotePK) + for attemp := 0; attemp < c.retries; attemp++ { + if _, err := c.link(remotePK, chanLink.addr); err == nil { + c.Logger.Infof("Re-connected to the server %s", remotePK) + return + } + time.Sleep(c.retryDelay) + } + } + + c.Logger.Infof("Closing link with the server %s", remotePK) + for _, channel := range chanLink.chans.dropAll() { channel.close() } diff --git a/pkg/messaging/client_test.go b/pkg/messaging/client_test.go index 2a89253007..d731228328 100644 --- a/pkg/messaging/client_test.go +++ b/pkg/messaging/client_test.go @@ -27,7 +27,7 @@ func TestMain(m *testing.M) { func TestClientConnectInitialServers(t *testing.T) { pk, sk := cipher.GenerateKeyPair() discovery := client.NewMock() - c := NewClient(pk, sk, discovery) + c := NewClient(&Config{pk, sk, discovery, 1, 100 * time.Millisecond}) srv, err := newMockServer(discovery) require.NoError(t, err) @@ -44,7 +44,20 @@ func TestClientConnectInitialServers(t *testing.T) { assert.Len(t, entry.Client.DelegatedServers, 1) assert.Equal(t, srv.config.Public, entry.Client.DelegatedServers[0]) - require.NoError(t, srv.Close()) + c.mu.RLock() + l := c.links[srv.config.Public] + c.mu.RUnlock() + require.NotNil(t, l) + require.NoError(t, l.link.Close()) + + time.Sleep(200 * time.Millisecond) + + c.mu.RLock() + require.Len(t, c.links, 1) + c.mu.RUnlock() + + require.NoError(t, c.Close()) + time.Sleep(100 * time.Millisecond) c.mu.RLock() @@ -59,7 +72,8 @@ func TestClientConnectInitialServers(t *testing.T) { func TestClientDial(t *testing.T) { pk, sk := cipher.GenerateKeyPair() discovery := client.NewMock() - c := NewClient(pk, sk, discovery) + c := NewClient(&Config{pk, sk, discovery, 0, 0}) + c.retries = 0 srv, err := newMockServer(discovery) require.NoError(t, err) @@ -68,7 +82,7 @@ func TestClientDial(t *testing.T) { time.Sleep(100 * time.Millisecond) anotherPK, anotherSK := cipher.GenerateKeyPair() - anotherClient := NewClient(anotherPK, anotherSK, discovery) + anotherClient := NewClient(&Config{anotherPK, anotherSK, discovery, 0, 0}) require.NoError(t, anotherClient.ConnectToInitialServers(context.TODO(), 1)) var anotherTr transport.Transport diff --git a/pkg/node/config.go b/pkg/node/config.go index f884b9f28f..c28ee3d64c 100644 --- a/pkg/node/config.go +++ b/pkg/node/config.go @@ -3,8 +3,10 @@ package node import ( "errors" "fmt" + "github.com/skycoin/skywire/pkg/messaging" "os" "path/filepath" + "time" "github.com/skycoin/skywire/pkg/cipher" mClient "github.com/skycoin/skywire/pkg/messaging-discovery/client" @@ -57,15 +59,22 @@ type Config struct { Interfaces InterfaceConfig `json:"interfaces"` } -// MessagingDiscovery returns messaging discovery client. -func (c *Config) MessagingDiscovery() (mClient.APIClient, error) { +// MessagingConfig returns config for messaging client. +func (c *Config) MessagingConfig() (*messaging.Config, error) { + msgConfig := c.Messaging if msgConfig.Discovery == "" { return nil, errors.New("empty discovery") } - return mClient.NewHTTP(msgConfig.Discovery), nil + return &messaging.Config{ + PubKey: c.Node.StaticPubKey, + SecKey: c.Node.StaticSecKey, + Discovery: mClient.NewHTTP(msgConfig.Discovery), + Retries: 5, + RetryDelay: time.Second, + }, nil } // TransportDiscovery returns transport discovery client. diff --git a/pkg/node/config_test.go b/pkg/node/config_test.go index 034ab2f1aa..65279fa9db 100644 --- a/pkg/node/config_test.go +++ b/pkg/node/config_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "os" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -16,14 +17,21 @@ import ( ) func TestMessagingDiscovery(t *testing.T) { + pk, sk := cipher.GenerateKeyPair() conf := Config{} + conf.Node.StaticPubKey = pk + conf.Node.StaticSecKey = sk conf.Messaging.Discovery = "skywire.skycoin.net:8001" conf.Messaging.ServerCount = 10 - discovery, err := conf.MessagingDiscovery() + c, err := conf.MessagingConfig() require.NoError(t, err) - assert.NotNil(t, discovery) + assert.NotNil(t, c.Discovery) + assert.False(t, c.PubKey.Null()) + assert.False(t, c.SecKey.Null()) + assert.Equal(t, 5, c.Retries) + assert.Equal(t, time.Second, c.RetryDelay) } func TestTransportDiscovery(t *testing.T) { diff --git a/pkg/node/node.go b/pkg/node/node.go index 8bb91255c4..238a7edf1e 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -107,12 +107,12 @@ func NewNode(config *Config) (*Node, error) { pk := config.Node.StaticPubKey sk := config.Node.StaticSecKey - mDiscovery, err := config.MessagingDiscovery() + mConfig, err := config.MessagingConfig() if err != nil { - return nil, fmt.Errorf("invalid MessagingConfig: %s", err) + return nil, fmt.Errorf("invalid Messaging config: %s", err) } - node.messenger = messaging.NewClient(pk, sk, mDiscovery) + node.messenger = messaging.NewClient(mConfig) node.messenger.Logger = node.Logger.PackageLogger("messenger") trDiscovery, err := config.TransportDiscovery() diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index 3a7c42ad01..5edef3b8e5 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -73,7 +73,8 @@ func TestNodeStartClose(t *testing.T) { defer os.RemoveAll("chat") node := &Node{config: &Config{}, router: r, executer: executer, appsConf: conf, startedApps: map[string]*appBind{}, logger: logging.MustGetLogger("test")} - node.messenger = messaging.NewClient(cipher.PubKey{}, cipher.SecKey{}, client.NewMock()) + mConf := &messaging.Config{PubKey: cipher.PubKey{}, SecKey: cipher.SecKey{}, Discovery: client.NewMock()} + node.messenger = messaging.NewClient(mConf) var err error tmConf := &transport.ManagerConfig{PubKey: cipher.PubKey{}, DiscoveryClient: transport.NewDiscoveryMock()} From 3704ab948b9e2a4244cef4c720595b9180788063 Mon Sep 17 00:00:00 2001 From: ivcosla Date: Wed, 6 Mar 2019 08:15:32 +0100 Subject: [PATCH 3/5] fix mchannel close --- pkg/messaging/channel.go | 3 ++- pkg/messaging/client_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/messaging/channel.go b/pkg/messaging/channel.go index fc00186340..e8b97b1fec 100644 --- a/pkg/messaging/channel.go +++ b/pkg/messaging/channel.go @@ -145,7 +145,6 @@ func (c *channel) close() { case <-c.doneChan: default: close(c.doneChan) - close(c.readChan) close(c.closeChan) } } @@ -159,6 +158,8 @@ func (c *channel) readEncrypted(ctx context.Context, p []byte) (n int, err error } select { + case <-c.doneChan: + return 0, io.EOF case in, more := <-c.readChan: if !more { return 0, io.EOF diff --git a/pkg/messaging/client_test.go b/pkg/messaging/client_test.go index d731228328..806d419f25 100644 --- a/pkg/messaging/client_test.go +++ b/pkg/messaging/client_test.go @@ -129,8 +129,8 @@ func TestClientDial(t *testing.T) { assert.Equal(t, 3, n) assert.Equal(t, []byte("bar"), buf) - require.NoError(t, anotherTr.Close()) require.NoError(t, tr.Close()) + require.NoError(t, anotherTr.Close()) time.Sleep(100 * time.Millisecond) From a99962f467302208ec418469ae4770a305f846c6 Mon Sep 17 00:00:00 2001 From: ivcosla Date: Wed, 6 Mar 2019 08:42:13 +0100 Subject: [PATCH 4/5] default rpc port and format --- cmd/skywire-cli/commands/root.go | 2 +- pkg/messaging/client.go | 19 +++++++++---------- pkg/node/config.go | 3 ++- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cmd/skywire-cli/commands/root.go b/cmd/skywire-cli/commands/root.go index 7e246c8172..4ed840a803 100644 --- a/cmd/skywire-cli/commands/root.go +++ b/cmd/skywire-cli/commands/root.go @@ -49,7 +49,7 @@ var rootCmd = &cobra.Command{ // Execute executes root CLI command. func Execute() { - rootCmd.PersistentFlags().StringVarP(&rpcAddr, "rpc", "", "localhost:3436", "RPC server address") + rootCmd.PersistentFlags().StringVarP(&rpcAddr, "rpc", "", "localhost:3435", "RPC server address") if err := rootCmd.Execute(); err != nil { fmt.Println(err) diff --git a/pkg/messaging/client.go b/pkg/messaging/client.go index ad79609f54..023c46d206 100644 --- a/pkg/messaging/client.go +++ b/pkg/messaging/client.go @@ -35,7 +35,7 @@ var ( type clientLink struct { link *Link - addr string + addr string chans *chanList } @@ -48,7 +48,6 @@ type Config struct { RetryDelay time.Duration } - // Client sends messages to remote client nodes via relay Server. type Client struct { Logger *logging.Logger @@ -71,15 +70,15 @@ type Client struct { // NewClient constructs a new Client. func NewClient(conf *Config) *Client { c := &Client{ - Logger: logging.MustGetLogger("messenger"), - pubKey: conf.PubKey, - secKey: conf.SecKey, - dc: conf.Discovery, - retries: conf.Retries, + Logger: logging.MustGetLogger("messenger"), + pubKey: conf.PubKey, + secKey: conf.SecKey, + dc: conf.Discovery, + retries: conf.Retries, retryDelay: conf.RetryDelay, - links: make(map[cipher.PubKey]*clientLink), - newChan: make(chan *channel), - doneChan: make(chan struct{}), + links: make(map[cipher.PubKey]*clientLink), + newChan: make(chan *channel), + doneChan: make(chan struct{}), } config := &LinkConfig{ Public: c.pubKey, diff --git a/pkg/node/config.go b/pkg/node/config.go index c28ee3d64c..47643d0358 100644 --- a/pkg/node/config.go +++ b/pkg/node/config.go @@ -3,11 +3,12 @@ package node import ( "errors" "fmt" - "github.com/skycoin/skywire/pkg/messaging" "os" "path/filepath" "time" + "github.com/skycoin/skywire/pkg/messaging" + "github.com/skycoin/skywire/pkg/cipher" mClient "github.com/skycoin/skywire/pkg/messaging-discovery/client" "github.com/skycoin/skywire/pkg/routing" From 631af8e979db8daabbd7c6354c7a1635b4611172 Mon Sep 17 00:00:00 2001 From: ivcosla Date: Thu, 7 Mar 2019 16:51:34 +0100 Subject: [PATCH 5/5] use different messaging channel IDs on initiator and responder side --- pkg/messaging/chan_list.go | 6 ----- pkg/messaging/client.go | 54 ++++++++++++++++++++------------------ 2 files changed, 28 insertions(+), 32 deletions(-) diff --git a/pkg/messaging/chan_list.go b/pkg/messaging/chan_list.go index 22594353b4..4c5fe9d0ad 100644 --- a/pkg/messaging/chan_list.go +++ b/pkg/messaging/chan_list.go @@ -26,12 +26,6 @@ func (c *chanList) add(channel *channel) byte { panic("no free channels") } -func (c *chanList) set(id byte, channel *channel) { - c.Lock() - c.chans[id] = channel - c.Unlock() -} - func (c *chanList) get(id byte) *channel { c.Lock() ch := c.chans[id] diff --git a/pkg/messaging/client.go b/pkg/messaging/client.go index 023c46d206..184dd84b76 100644 --- a/pkg/messaging/client.go +++ b/pkg/messaging/client.go @@ -290,12 +290,13 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error { c.Logger.Debugf("New frame %s from %s@%d", frameType, remotePK, channelID) if frameType == FrameTypeOpenChannel { - if msg, err := c.openChannel(channelID, body[1:34], body[34:], clientLink); err != nil { + if lID, msg, err := c.openChannel(channelID, body[1:34], body[34:], clientLink); err != nil { c.Logger.Warnf("Failed to open new channel for %s: %s", remotePK, err) _, sendErr = l.SendChannelClosed(channelID) } else { - c.Logger.Infof("Opened new channel ID %d with %s", channelID, hex.EncodeToString(body[1:34])) - _, sendErr = l.SendChannelOpened(channelID, msg) + c.Logger.Infof("Opened new channel local ID %d, remote ID %d with %s", lID, channelID, + hex.EncodeToString(body[1:34])) + _, sendErr = l.SendChannelOpened(lID, msg) } return c.warnSendError(remotePK, sendErr) @@ -312,7 +313,7 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error { switch frameType { case FrameTypeCloseChannel: clientLink.chans.remove(channelID) - _, sendErr = l.SendChannelClosed(channelID) + _, sendErr = l.SendChannelClosed(channel.ID) c.Logger.Debugf("Closed channel ID %d", channelID) case FrameTypeChannelOpened: if err := channel.noise.ProcessMessage(body[1:]); err != nil { @@ -324,6 +325,7 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error { default: } case FrameTypeChannelClosed: + channel.ID = body[0] select { case channel.waitChan <- false: case channel.closeChan <- struct{}{}: @@ -350,6 +352,10 @@ func (c *Client) onClose(l *Link, remote bool) { chanLink := c.links[remotePK] c.mu.RUnlock() + for _, channel := range chanLink.chans.dropAll() { + channel.close() + } + select { case <-c.doneChan: default: @@ -365,10 +371,6 @@ func (c *Client) onClose(l *Link, remote bool) { c.Logger.Infof("Closing link with the server %s", remotePK) - for _, channel := range chanLink.chans.dropAll() { - channel.close() - } - c.mu.Lock() delete(c.links, remotePK) c.mu.Unlock() @@ -378,28 +380,27 @@ func (c *Client) onClose(l *Link, remote bool) { } } -func (c *Client) openChannel(channelID byte, remotePK []byte, msg []byte, chanLink *clientLink) ([]byte, error) { - channel := chanLink.chans.get(channelID) - if channel != nil { - return nil, errors.New("channel is already opened") - } - - pubKey, err := cipher.NewPubKey(remotePK) +func (c *Client) openChannel(rID byte, remotePK []byte, noiseMsg []byte, chanLink *clientLink) (lID byte, noiseRes []byte, err error) { + var pubKey cipher.PubKey + pubKey, err = cipher.NewPubKey(remotePK) if err != nil { - return nil, err + return } - channel, err = newChannel(false, c.secKey, pubKey, chanLink.link) + + channel, err := newChannel(false, c.secKey, pubKey, chanLink.link) + channel.ID = rID if err != nil { - return nil, fmt.Errorf("noise setup: %s", err) + err = fmt.Errorf("noise setup: %s", err) + return } - channel.ID = channelID - chanLink.chans.set(channelID, channel) - - if err := channel.noise.ProcessMessage(msg); err != nil { - return nil, fmt.Errorf("noise handshake: %s", err) + if err = channel.noise.ProcessMessage(noiseMsg); err != nil { + err = fmt.Errorf("noise handshake: %s", err) + return } + lID = chanLink.chans.add(channel) + go func() { select { case <-c.doneChan: @@ -407,12 +408,13 @@ func (c *Client) openChannel(channelID byte, remotePK []byte, msg []byte, chanLi } }() - res, err := channel.noise.HandshakeMessage() + noiseRes, err = channel.noise.HandshakeMessage() if err != nil { - return nil, fmt.Errorf("noise handshake: %s", err) + err = fmt.Errorf("noise handshake: %s", err) + return } - return res, nil + return lID, noiseRes, err } func (c *Client) warnSendError(remote cipher.PubKey, err error) error {