Skip to content

Commit

Permalink
Merge pull request #206 from skycoin/bug/fix-mchannel-close
Browse files Browse the repository at this point in the history
[WIP] re-connect logic for server connections in messaging-client
  • Loading branch information
ivcosla authored Mar 13, 2019
2 parents 16b1ef5 + e8296c9 commit 6c16069
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 55 deletions.
2 changes: 1 addition & 1 deletion cmd/skywire-cli/commands/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func defaultConfig() *node.Config {

conf.LogLevel = "info"

conf.Interfaces.RPCAddress = "localhost:3436"
conf.Interfaces.RPCAddress = "localhost:3435"

return conf
}
6 changes: 0 additions & 6 deletions pkg/messaging/chan_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ func (c *chanList) add(channel *channel) byte {
panic("no free channels")
}

func (c *chanList) set(id byte, channel *channel) {
c.Lock()
c.chans[id] = channel
c.Unlock()
}

func (c *chanList) get(id byte) *channel {
c.Lock()
ch := c.chans[id]
Expand Down
3 changes: 2 additions & 1 deletion pkg/messaging/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ func (c *channel) close() {
case <-c.doneChan:
default:
close(c.doneChan)
close(c.readChan)
close(c.closeChan)
}
}
Expand All @@ -159,6 +158,8 @@ func (c *channel) readEncrypted(ctx context.Context, p []byte) (n int, err error
}

select {
case <-c.doneChan:
return 0, io.EOF
case in, more := <-c.readChan:
if !more {
return 0, io.EOF
Expand Down
96 changes: 64 additions & 32 deletions pkg/messaging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"net"
"sync"
"time"

"github.com/skycoin/skycoin/src/util/logging"

Expand All @@ -34,9 +35,19 @@ var (

type clientLink struct {
link *Link
addr string
chans *chanList
}

// Config configures Client.
type Config struct {
PubKey cipher.PubKey
SecKey cipher.SecKey
Discovery client.APIClient
Retries int
RetryDelay time.Duration
}

// Client sends messages to remote client nodes via relay Server.
type Client struct {
Logger *logging.Logger
Expand All @@ -46,6 +57,9 @@ type Client struct {
dc client.APIClient
pool *Pool

retries int
retryDelay time.Duration

links map[cipher.PubKey]*clientLink
mu sync.RWMutex

Expand All @@ -54,15 +68,17 @@ type Client struct {
}

// NewClient constructs a new Client.
func NewClient(pubKey cipher.PubKey, secKey cipher.SecKey, discoveryClient client.APIClient) *Client {
func NewClient(conf *Config) *Client {
c := &Client{
Logger: logging.MustGetLogger("messenger"),
pubKey: pubKey,
secKey: secKey,
dc: discoveryClient,
links: make(map[cipher.PubKey]*clientLink),
newChan: make(chan *channel),
doneChan: make(chan struct{}),
Logger: logging.MustGetLogger("messenger"),
pubKey: conf.PubKey,
secKey: conf.SecKey,
dc: conf.Discovery,
retries: conf.Retries,
retryDelay: conf.RetryDelay,
links: make(map[cipher.PubKey]*clientLink),
newChan: make(chan *channel),
doneChan: make(chan struct{}),
}
config := &LinkConfig{
Public: c.pubKey,
Expand Down Expand Up @@ -222,7 +238,7 @@ func (c *Client) link(remotePK cipher.PubKey, addr string) (*clientLink, error)
}

c.Logger.Infof("Opened new link with the server %s", remotePK)
clientLink := &clientLink{l, newChanList()}
clientLink := &clientLink{l, addr, newChanList()}
c.mu.Lock()
c.links[remotePK] = clientLink
c.mu.Unlock()
Expand Down Expand Up @@ -274,12 +290,13 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error {

c.Logger.Debugf("New frame %s from %s@%d", frameType, remotePK, channelID)
if frameType == FrameTypeOpenChannel {
if msg, err := c.openChannel(channelID, body[1:34], body[34:], clientLink); err != nil {
if lID, msg, err := c.openChannel(channelID, body[1:34], body[34:], clientLink); err != nil {
c.Logger.Warnf("Failed to open new channel for %s: %s", remotePK, err)
_, sendErr = l.SendChannelClosed(channelID)
} else {
c.Logger.Infof("Opened new channel ID %d with %s", channelID, hex.EncodeToString(body[1:34]))
_, sendErr = l.SendChannelOpened(channelID, msg)
c.Logger.Infof("Opened new channel local ID %d, remote ID %d with %s", lID, channelID,
hex.EncodeToString(body[1:34]))
_, sendErr = l.SendChannelOpened(lID, msg)
}

return c.warnSendError(remotePK, sendErr)
Expand All @@ -296,7 +313,7 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error {
switch frameType {
case FrameTypeCloseChannel:
clientLink.chans.remove(channelID)
_, sendErr = l.SendChannelClosed(channelID)
_, sendErr = l.SendChannelClosed(channel.ID)
c.Logger.Debugf("Closed channel ID %d", channelID)
case FrameTypeChannelOpened:
if err := channel.noise.ProcessMessage(body[1:]); err != nil {
Expand All @@ -308,6 +325,7 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error {
default:
}
case FrameTypeChannelClosed:
channel.ID = body[0]
select {
case channel.waitChan <- false:
case channel.closeChan <- struct{}{}:
Expand All @@ -329,7 +347,6 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error {

func (c *Client) onClose(l *Link, remote bool) {
remotePK := l.Remote()
c.Logger.Infof("Closing link with the server %s", remotePK)

c.mu.RLock()
chanLink := c.links[remotePK]
Expand All @@ -339,6 +356,21 @@ func (c *Client) onClose(l *Link, remote bool) {
channel.close()
}

select {
case <-c.doneChan:
default:
c.Logger.Infof("Disconnected from the server %s. Trying to re-connect...", remotePK)
for attemp := 0; attemp < c.retries; attemp++ {
if _, err := c.link(remotePK, chanLink.addr); err == nil {
c.Logger.Infof("Re-connected to the server %s", remotePK)
return
}
time.Sleep(c.retryDelay)
}
}

c.Logger.Infof("Closing link with the server %s", remotePK)

c.mu.Lock()
delete(c.links, remotePK)
c.mu.Unlock()
Expand All @@ -348,41 +380,41 @@ func (c *Client) onClose(l *Link, remote bool) {
}
}

func (c *Client) openChannel(channelID byte, remotePK []byte, msg []byte, chanLink *clientLink) ([]byte, error) {
channel := chanLink.chans.get(channelID)
if channel != nil {
return nil, errors.New("channel is already opened")
func (c *Client) openChannel(rID byte, remotePK []byte, noiseMsg []byte, chanLink *clientLink) (lID byte, noiseRes []byte, err error) {
var pubKey cipher.PubKey
pubKey, err = cipher.NewPubKey(remotePK)
if err != nil {
return
}

pubKey, err := cipher.NewPubKey(remotePK)
channel, err := newChannel(false, c.secKey, pubKey, chanLink.link)
channel.ID = rID
if err != nil {
return nil, err
err = fmt.Errorf("noise setup: %s", err)
return
}
channel, err = newChannel(false, c.secKey, pubKey, chanLink.link)
if err != nil {
return nil, fmt.Errorf("noise setup: %s", err)
}

channel.ID = channelID
chanLink.chans.set(channelID, channel)

if err := channel.noise.ProcessMessage(msg); err != nil {
return nil, fmt.Errorf("noise handshake: %s", err)
if err = channel.noise.ProcessMessage(noiseMsg); err != nil {
err = fmt.Errorf("noise handshake: %s", err)
return
}

lID = chanLink.chans.add(channel)

go func() {
select {
case <-c.doneChan:
case c.newChan <- channel:
}
}()

res, err := channel.noise.HandshakeMessage()
noiseRes, err = channel.noise.HandshakeMessage()
if err != nil {
return nil, fmt.Errorf("noise handshake: %s", err)
err = fmt.Errorf("noise handshake: %s", err)
return
}

return res, nil
return lID, noiseRes, err
}

func (c *Client) warnSendError(remote cipher.PubKey, err error) error {
Expand Down
24 changes: 19 additions & 5 deletions pkg/messaging/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestMain(m *testing.M) {
func TestClientConnectInitialServers(t *testing.T) {
pk, sk := cipher.GenerateKeyPair()
discovery := client.NewMock()
c := NewClient(pk, sk, discovery)
c := NewClient(&Config{pk, sk, discovery, 1, 100 * time.Millisecond})

srv, err := newMockServer(discovery)
require.NoError(t, err)
Expand All @@ -44,7 +44,20 @@ func TestClientConnectInitialServers(t *testing.T) {
assert.Len(t, entry.Client.DelegatedServers, 1)
assert.Equal(t, srv.config.Public, entry.Client.DelegatedServers[0])

require.NoError(t, srv.Close())
c.mu.RLock()
l := c.links[srv.config.Public]
c.mu.RUnlock()
require.NotNil(t, l)
require.NoError(t, l.link.Close())

time.Sleep(200 * time.Millisecond)

c.mu.RLock()
require.Len(t, c.links, 1)
c.mu.RUnlock()

require.NoError(t, c.Close())

time.Sleep(100 * time.Millisecond)

c.mu.RLock()
Expand All @@ -59,7 +72,8 @@ func TestClientConnectInitialServers(t *testing.T) {
func TestClientDial(t *testing.T) {
pk, sk := cipher.GenerateKeyPair()
discovery := client.NewMock()
c := NewClient(pk, sk, discovery)
c := NewClient(&Config{pk, sk, discovery, 0, 0})
c.retries = 0

srv, err := newMockServer(discovery)
require.NoError(t, err)
Expand All @@ -68,7 +82,7 @@ func TestClientDial(t *testing.T) {
time.Sleep(100 * time.Millisecond)

anotherPK, anotherSK := cipher.GenerateKeyPair()
anotherClient := NewClient(anotherPK, anotherSK, discovery)
anotherClient := NewClient(&Config{anotherPK, anotherSK, discovery, 0, 0})
require.NoError(t, anotherClient.ConnectToInitialServers(context.TODO(), 1))

var anotherTr transport.Transport
Expand Down Expand Up @@ -115,8 +129,8 @@ func TestClientDial(t *testing.T) {
assert.Equal(t, 3, n)
assert.Equal(t, []byte("bar"), buf)

require.NoError(t, anotherTr.Close())
require.NoError(t, tr.Close())
require.NoError(t, anotherTr.Close())

time.Sleep(100 * time.Millisecond)

Expand Down
16 changes: 13 additions & 3 deletions pkg/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"fmt"
"os"
"path/filepath"
"time"

"github.com/skycoin/skywire/pkg/messaging"

"github.com/skycoin/skywire/pkg/cipher"
mClient "github.com/skycoin/skywire/pkg/messaging-discovery/client"
Expand Down Expand Up @@ -57,15 +60,22 @@ type Config struct {
Interfaces InterfaceConfig `json:"interfaces"`
}

// MessagingDiscovery returns messaging discovery client.
func (c *Config) MessagingDiscovery() (mClient.APIClient, error) {
// MessagingConfig returns config for messaging client.
func (c *Config) MessagingConfig() (*messaging.Config, error) {

msgConfig := c.Messaging

if msgConfig.Discovery == "" {
return nil, errors.New("empty discovery")
}

return mClient.NewHTTP(msgConfig.Discovery), nil
return &messaging.Config{
PubKey: c.Node.StaticPubKey,
SecKey: c.Node.StaticSecKey,
Discovery: mClient.NewHTTP(msgConfig.Discovery),
Retries: 5,
RetryDelay: time.Second,
}, nil
}

// TransportDiscovery returns transport discovery client.
Expand Down
12 changes: 10 additions & 2 deletions pkg/node/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -17,14 +18,21 @@ import (
)

func TestMessagingDiscovery(t *testing.T) {
pk, sk := cipher.GenerateKeyPair()
conf := Config{}
conf.Node.StaticPubKey = pk
conf.Node.StaticSecKey = sk
conf.Messaging.Discovery = "skywire.skycoin.net:8001"
conf.Messaging.ServerCount = 10

discovery, err := conf.MessagingDiscovery()
c, err := conf.MessagingConfig()
require.NoError(t, err)

assert.NotNil(t, discovery)
assert.NotNil(t, c.Discovery)
assert.False(t, c.PubKey.Null())
assert.False(t, c.SecKey.Null())
assert.Equal(t, 5, c.Retries)
assert.Equal(t, time.Second, c.RetryDelay)
}

func TestTransportDiscovery(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ func NewNode(config *Config) (*Node, error) {

pk := config.Node.StaticPubKey
sk := config.Node.StaticSecKey
mDiscovery, err := config.MessagingDiscovery()
mConfig, err := config.MessagingConfig()
if err != nil {
return nil, fmt.Errorf("invalid MessagingConfig: %s", err)
return nil, fmt.Errorf("invalid Messaging config: %s", err)
}

node.messenger = messaging.NewClient(pk, sk, mDiscovery)
node.messenger = messaging.NewClient(mConfig)
node.messenger.Logger = node.Logger.PackageLogger("messenger")

trDiscovery, err := config.TransportDiscovery()
Expand Down
3 changes: 2 additions & 1 deletion pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func TestNodeStartClose(t *testing.T) {
defer os.RemoveAll("chat")
node := &Node{config: &Config{}, router: r, executer: executer, appsConf: conf,
startedApps: map[string]*appBind{}, logger: logging.MustGetLogger("test")}
node.messenger = messaging.NewClient(cipher.PubKey{}, cipher.SecKey{}, client.NewMock())
mConf := &messaging.Config{PubKey: cipher.PubKey{}, SecKey: cipher.SecKey{}, Discovery: client.NewMock()}
node.messenger = messaging.NewClient(mConf)
var err error

tmConf := &transport.ManagerConfig{PubKey: cipher.PubKey{}, DiscoveryClient: transport.NewDiscoveryMock()}
Expand Down
Loading

0 comments on commit 6c16069

Please sign in to comment.