diff --git a/cmd/skywire-cli/commands/config.go b/cmd/skywire-cli/commands/config.go index 4b32da96d..614d2810f 100644 --- a/cmd/skywire-cli/commands/config.go +++ b/cmd/skywire-cli/commands/config.go @@ -113,7 +113,7 @@ func defaultConfig() *node.Config { conf.LogLevel = "info" - conf.Interfaces.RPCAddress = "localhost:3436" + conf.Interfaces.RPCAddress = "localhost:3435" return conf } diff --git a/pkg/messaging/chan_list.go b/pkg/messaging/chan_list.go index 22594353b..4c5fe9d0a 100644 --- a/pkg/messaging/chan_list.go +++ b/pkg/messaging/chan_list.go @@ -26,12 +26,6 @@ func (c *chanList) add(channel *channel) byte { panic("no free channels") } -func (c *chanList) set(id byte, channel *channel) { - c.Lock() - c.chans[id] = channel - c.Unlock() -} - func (c *chanList) get(id byte) *channel { c.Lock() ch := c.chans[id] diff --git a/pkg/messaging/channel.go b/pkg/messaging/channel.go index fc0018634..e8b97b1fe 100644 --- a/pkg/messaging/channel.go +++ b/pkg/messaging/channel.go @@ -145,7 +145,6 @@ func (c *channel) close() { case <-c.doneChan: default: close(c.doneChan) - close(c.readChan) close(c.closeChan) } } @@ -159,6 +158,8 @@ func (c *channel) readEncrypted(ctx context.Context, p []byte) (n int, err error } select { + case <-c.doneChan: + return 0, io.EOF case in, more := <-c.readChan: if !more { return 0, io.EOF diff --git a/pkg/messaging/client.go b/pkg/messaging/client.go index 795b384a9..8ae8940d0 100644 --- a/pkg/messaging/client.go +++ b/pkg/messaging/client.go @@ -10,6 +10,7 @@ import ( "fmt" "net" "sync" + "time" "github.com/skycoin/skycoin/src/util/logging" @@ -34,9 +35,19 @@ var ( type clientLink struct { link *Link + addr string chans *chanList } +// Config configures Client. +type Config struct { + PubKey cipher.PubKey + SecKey cipher.SecKey + Discovery client.APIClient + Retries int + RetryDelay time.Duration +} + // Client sends messages to remote client nodes via relay Server. type Client struct { Logger *logging.Logger @@ -46,6 +57,9 @@ type Client struct { dc client.APIClient pool *Pool + retries int + retryDelay time.Duration + links map[cipher.PubKey]*clientLink mu sync.RWMutex @@ -54,15 +68,17 @@ type Client struct { } // NewClient constructs a new Client. -func NewClient(pubKey cipher.PubKey, secKey cipher.SecKey, discoveryClient client.APIClient) *Client { +func NewClient(conf *Config) *Client { c := &Client{ - Logger: logging.MustGetLogger("messenger"), - pubKey: pubKey, - secKey: secKey, - dc: discoveryClient, - links: make(map[cipher.PubKey]*clientLink), - newChan: make(chan *channel), - doneChan: make(chan struct{}), + Logger: logging.MustGetLogger("messenger"), + pubKey: conf.PubKey, + secKey: conf.SecKey, + dc: conf.Discovery, + retries: conf.Retries, + retryDelay: conf.RetryDelay, + links: make(map[cipher.PubKey]*clientLink), + newChan: make(chan *channel), + doneChan: make(chan struct{}), } config := &LinkConfig{ Public: c.pubKey, @@ -142,14 +158,14 @@ func (c *Client) Dial(ctx context.Context, remote cipher.PubKey) (transport.Tran if err != nil { return nil, fmt.Errorf("noise setup: %s", err) } - channel.ID = clientLink.chans.add(channel) + localID := clientLink.chans.add(channel) msg, err := channel.noise.HandshakeMessage() if err != nil { return nil, fmt.Errorf("noise handshake: %s", err) } - if _, err := clientLink.link.SendOpenChannel(channel.ID, remote, msg); err != nil { + if _, err := clientLink.link.SendOpenChannel(localID, remote, msg); err != nil { return nil, fmt.Errorf("failed to open channel: %s", err) } @@ -162,7 +178,7 @@ func (c *Client) Dial(ctx context.Context, remote cipher.PubKey) (transport.Tran return nil, ctx.Err() } - c.Logger.Infof("Opened new channel ID %d with %s", channel.ID, remote) + c.Logger.Infof("Opened new channel local ID %d, remote ID %d with %s", localID, channel.ID, remote) return newAckedChannel(channel), nil } @@ -222,7 +238,7 @@ func (c *Client) link(remotePK cipher.PubKey, addr string) (*clientLink, error) } c.Logger.Infof("Opened new link with the server %s", remotePK) - clientLink := &clientLink{l, newChanList()} + clientLink := &clientLink{l, addr, newChanList()} c.mu.Lock() c.links[remotePK] = clientLink c.mu.Unlock() @@ -274,12 +290,13 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error { c.Logger.Debugf("New frame %s from %s@%d", frameType, remotePK, channelID) if frameType == FrameTypeOpenChannel { - if msg, err := c.openChannel(channelID, body[1:34], body[34:], clientLink); err != nil { + if lID, msg, err := c.openChannel(channelID, body[1:34], body[34:], clientLink); err != nil { c.Logger.Warnf("Failed to open new channel for %s: %s", remotePK, err) _, sendErr = l.SendChannelClosed(channelID) } else { - c.Logger.Infof("Opened new channel ID %d with %s", channelID, hex.EncodeToString(body[1:34])) - _, sendErr = l.SendChannelOpened(channelID, msg) + c.Logger.Infof("Opened new channel local ID %d, remote ID %d with %s", lID, channelID, + hex.EncodeToString(body[1:34])) + _, sendErr = l.SendChannelOpened(channelID, lID, msg) } return c.warnSendError(remotePK, sendErr) @@ -296,10 +313,11 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error { switch frameType { case FrameTypeCloseChannel: clientLink.chans.remove(channelID) - _, sendErr = l.SendChannelClosed(channelID) + _, sendErr = l.SendChannelClosed(channel.ID) c.Logger.Debugf("Closed channel ID %d", channelID) case FrameTypeChannelOpened: - if err := channel.noise.ProcessMessage(body[1:]); err != nil { + channel.ID = body[1] + if err := channel.noise.ProcessMessage(body[2:]); err != nil { sendErr = fmt.Errorf("noise handshake: %s", err) } @@ -308,6 +326,7 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error { default: } case FrameTypeChannelClosed: + channel.ID = body[0] select { case channel.waitChan <- false: case channel.closeChan <- struct{}{}: @@ -329,7 +348,6 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error { func (c *Client) onClose(l *Link, remote bool) { remotePK := l.Remote() - c.Logger.Infof("Closing link with the server %s", remotePK) c.mu.RLock() chanLink := c.links[remotePK] @@ -339,6 +357,21 @@ func (c *Client) onClose(l *Link, remote bool) { channel.close() } + select { + case <-c.doneChan: + default: + c.Logger.Infof("Disconnected from the server %s. Trying to re-connect...", remotePK) + for attemp := 0; attemp < c.retries; attemp++ { + if _, err := c.link(remotePK, chanLink.addr); err == nil { + c.Logger.Infof("Re-connected to the server %s", remotePK) + return + } + time.Sleep(c.retryDelay) + } + } + + c.Logger.Infof("Closing link with the server %s", remotePK) + c.mu.Lock() delete(c.links, remotePK) c.mu.Unlock() @@ -348,28 +381,27 @@ func (c *Client) onClose(l *Link, remote bool) { } } -func (c *Client) openChannel(channelID byte, remotePK []byte, msg []byte, chanLink *clientLink) ([]byte, error) { - channel := chanLink.chans.get(channelID) - if channel != nil { - return nil, errors.New("channel is already opened") +func (c *Client) openChannel(rID byte, remotePK []byte, noiseMsg []byte, chanLink *clientLink) (lID byte, noiseRes []byte, err error) { + var pubKey cipher.PubKey + pubKey, err = cipher.NewPubKey(remotePK) + if err != nil { + return } - pubKey, err := cipher.NewPubKey(remotePK) + channel, err := newChannel(false, c.secKey, pubKey, chanLink.link) + channel.ID = rID if err != nil { - return nil, err + err = fmt.Errorf("noise setup: %s", err) + return } - channel, err = newChannel(false, c.secKey, pubKey, chanLink.link) - if err != nil { - return nil, fmt.Errorf("noise setup: %s", err) - } - - channel.ID = channelID - chanLink.chans.set(channelID, channel) - if err := channel.noise.ProcessMessage(msg); err != nil { - return nil, fmt.Errorf("noise handshake: %s", err) + if err = channel.noise.ProcessMessage(noiseMsg); err != nil { + err = fmt.Errorf("noise handshake: %s", err) + return } + lID = chanLink.chans.add(channel) + go func() { select { case <-c.doneChan: @@ -377,12 +409,13 @@ func (c *Client) openChannel(channelID byte, remotePK []byte, msg []byte, chanLi } }() - res, err := channel.noise.HandshakeMessage() + noiseRes, err = channel.noise.HandshakeMessage() if err != nil { - return nil, fmt.Errorf("noise handshake: %s", err) + err = fmt.Errorf("noise handshake: %s", err) + return } - return res, nil + return lID, noiseRes, err } func (c *Client) warnSendError(remote cipher.PubKey, err error) error { diff --git a/pkg/messaging/client_test.go b/pkg/messaging/client_test.go index 2a8925300..0c35570d2 100644 --- a/pkg/messaging/client_test.go +++ b/pkg/messaging/client_test.go @@ -27,7 +27,7 @@ func TestMain(m *testing.M) { func TestClientConnectInitialServers(t *testing.T) { pk, sk := cipher.GenerateKeyPair() discovery := client.NewMock() - c := NewClient(pk, sk, discovery) + c := NewClient(&Config{pk, sk, discovery, 1, 100 * time.Millisecond}) srv, err := newMockServer(discovery) require.NoError(t, err) @@ -44,7 +44,20 @@ func TestClientConnectInitialServers(t *testing.T) { assert.Len(t, entry.Client.DelegatedServers, 1) assert.Equal(t, srv.config.Public, entry.Client.DelegatedServers[0]) - require.NoError(t, srv.Close()) + c.mu.RLock() + l := c.links[srv.config.Public] + c.mu.RUnlock() + require.NotNil(t, l) + require.NoError(t, l.link.Close()) + + time.Sleep(200 * time.Millisecond) + + c.mu.RLock() + require.Len(t, c.links, 1) + c.mu.RUnlock() + + require.NoError(t, c.Close()) + time.Sleep(100 * time.Millisecond) c.mu.RLock() @@ -59,7 +72,8 @@ func TestClientConnectInitialServers(t *testing.T) { func TestClientDial(t *testing.T) { pk, sk := cipher.GenerateKeyPair() discovery := client.NewMock() - c := NewClient(pk, sk, discovery) + c := NewClient(&Config{pk, sk, discovery, 0, 0}) + c.retries = 0 srv, err := newMockServer(discovery) require.NoError(t, err) @@ -68,7 +82,7 @@ func TestClientDial(t *testing.T) { time.Sleep(100 * time.Millisecond) anotherPK, anotherSK := cipher.GenerateKeyPair() - anotherClient := NewClient(anotherPK, anotherSK, discovery) + anotherClient := NewClient(&Config{anotherPK, anotherSK, discovery, 0, 0}) require.NoError(t, anotherClient.ConnectToInitialServers(context.TODO(), 1)) var anotherTr transport.Transport @@ -115,8 +129,8 @@ func TestClientDial(t *testing.T) { assert.Equal(t, 3, n) assert.Equal(t, []byte("bar"), buf) - require.NoError(t, anotherTr.Close()) require.NoError(t, tr.Close()) + require.NoError(t, anotherTr.Close()) time.Sleep(100 * time.Millisecond) @@ -176,7 +190,7 @@ func (s *mockServer) onData(l *Link, frameType FrameType, body []byte) error { case FrameTypeOpenChannel: _, err = ol.SendOpenChannel(channelID, l.Remote(), body[34:]) case FrameTypeChannelOpened: - _, err = ol.SendChannelOpened(channelID, body[1:]) + _, err = ol.SendChannelOpened(channelID, channelID, body[2:]) case FrameTypeCloseChannel: l.SendChannelClosed(channelID) // nolint _, err = ol.SendCloseChannel(channelID) diff --git a/pkg/messaging/link.go b/pkg/messaging/link.go index 6cd9942be..9583986e0 100644 --- a/pkg/messaging/link.go +++ b/pkg/messaging/link.go @@ -106,8 +106,8 @@ func (c *Link) SendOpenChannel(channelID byte, remotePK cipher.PubKey, noiseMsg } // SendChannelOpened sends ChannelOpened frame. -func (c *Link) SendChannelOpened(channelID byte, noiseMsg []byte) (int, error) { - return c.writeFrame(FrameTypeChannelOpened, append([]byte{channelID}, noiseMsg...)) +func (c *Link) SendChannelOpened(channelID byte, remoteID byte, noiseMsg []byte) (int, error) { + return c.writeFrame(FrameTypeChannelOpened, append([]byte{channelID, remoteID}, noiseMsg...)) } // SendCloseChannel sends CloseChannel request. diff --git a/pkg/node/config.go b/pkg/node/config.go index f2e62abc2..2df9e8979 100644 --- a/pkg/node/config.go +++ b/pkg/node/config.go @@ -5,6 +5,9 @@ import ( "fmt" "os" "path/filepath" + "time" + + "github.com/skycoin/skywire/pkg/messaging" "github.com/skycoin/skywire/pkg/cipher" mClient "github.com/skycoin/skywire/pkg/messaging-discovery/client" @@ -57,15 +60,22 @@ type Config struct { Interfaces InterfaceConfig `json:"interfaces"` } -// MessagingDiscovery returns messaging discovery client. -func (c *Config) MessagingDiscovery() (mClient.APIClient, error) { +// MessagingConfig returns config for messaging client. +func (c *Config) MessagingConfig() (*messaging.Config, error) { + msgConfig := c.Messaging if msgConfig.Discovery == "" { return nil, errors.New("empty discovery") } - return mClient.NewHTTP(msgConfig.Discovery), nil + return &messaging.Config{ + PubKey: c.Node.StaticPubKey, + SecKey: c.Node.StaticSecKey, + Discovery: mClient.NewHTTP(msgConfig.Discovery), + Retries: 5, + RetryDelay: time.Second, + }, nil } // TransportDiscovery returns transport discovery client. diff --git a/pkg/node/config_test.go b/pkg/node/config_test.go index c5fa933f0..5ffae77d1 100644 --- a/pkg/node/config_test.go +++ b/pkg/node/config_test.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -17,14 +18,21 @@ import ( ) func TestMessagingDiscovery(t *testing.T) { + pk, sk := cipher.GenerateKeyPair() conf := Config{} + conf.Node.StaticPubKey = pk + conf.Node.StaticSecKey = sk conf.Messaging.Discovery = "skywire.skycoin.net:8001" conf.Messaging.ServerCount = 10 - discovery, err := conf.MessagingDiscovery() + c, err := conf.MessagingConfig() require.NoError(t, err) - assert.NotNil(t, discovery) + assert.NotNil(t, c.Discovery) + assert.False(t, c.PubKey.Null()) + assert.False(t, c.SecKey.Null()) + assert.Equal(t, 5, c.Retries) + assert.Equal(t, time.Second, c.RetryDelay) } func TestTransportDiscovery(t *testing.T) { diff --git a/pkg/node/node.go b/pkg/node/node.go index 3ad3657c4..6c1fb168e 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -107,12 +107,12 @@ func NewNode(config *Config) (*Node, error) { pk := config.Node.StaticPubKey sk := config.Node.StaticSecKey - mDiscovery, err := config.MessagingDiscovery() + mConfig, err := config.MessagingConfig() if err != nil { - return nil, fmt.Errorf("invalid MessagingConfig: %s", err) + return nil, fmt.Errorf("invalid Messaging config: %s", err) } - node.messenger = messaging.NewClient(pk, sk, mDiscovery) + node.messenger = messaging.NewClient(mConfig) node.messenger.Logger = node.Logger.PackageLogger("messenger") trDiscovery, err := config.TransportDiscovery() diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index 3a7c42ad0..5edef3b8e 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -73,7 +73,8 @@ func TestNodeStartClose(t *testing.T) { defer os.RemoveAll("chat") node := &Node{config: &Config{}, router: r, executer: executer, appsConf: conf, startedApps: map[string]*appBind{}, logger: logging.MustGetLogger("test")} - node.messenger = messaging.NewClient(cipher.PubKey{}, cipher.SecKey{}, client.NewMock()) + mConf := &messaging.Config{PubKey: cipher.PubKey{}, SecKey: cipher.SecKey{}, Discovery: client.NewMock()} + node.messenger = messaging.NewClient(mConf) var err error tmConf := &transport.ManagerConfig{PubKey: cipher.PubKey{}, DiscoveryClient: transport.NewDiscoveryMock()} diff --git a/pkg/router/route_manager.go b/pkg/router/route_manager.go index 417155f7a..6942b872c 100644 --- a/pkg/router/route_manager.go +++ b/pkg/router/route_manager.go @@ -53,7 +53,9 @@ func (rm *routeManager) RemoveLoopRule(addr *app.LoopAddr) error { } appRouteID = routeID - appRule = rule + appRule = make(routing.Rule, len(rule)) + copy(appRule, rule) + return false }) if err != nil { @@ -154,7 +156,8 @@ func (rm *routeManager) confirmLoop(data []byte) (noiseRes []byte, err error) { } appRouteID = routeID - appRule = rule + appRule = make(routing.Rule, len(rule)) + copy(appRule, rule) return false }) if err != nil {