Skip to content

Commit

Permalink
Merge branch 'mainnet' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
ivcosla committed Mar 15, 2019
2 parents bd51899 + 018e3f9 commit 91fa461
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 63 deletions.
2 changes: 1 addition & 1 deletion cmd/skywire-cli/commands/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func defaultConfig() *node.Config {

conf.LogLevel = "info"

conf.Interfaces.RPCAddress = "localhost:3436"
conf.Interfaces.RPCAddress = "localhost:3435"

return conf
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/messaging/chan_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ func (c *chanList) add(channel *channel) byte {
panic("no free channels")
}

func (c *chanList) set(id byte, channel *channel) {
c.Lock()
c.chans[id] = channel
c.Unlock()
}

func (c *chanList) get(id byte) *channel {
c.Lock()
ch := c.chans[id]
Expand Down
3 changes: 2 additions & 1 deletion pkg/messaging/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ func (c *channel) close() {
case <-c.doneChan:
default:
close(c.doneChan)
close(c.readChan)
close(c.closeChan)
}
}
Expand All @@ -159,6 +158,8 @@ func (c *channel) readEncrypted(ctx context.Context, p []byte) (n int, err error
}

select {
case <-c.doneChan:
return 0, io.EOF
case in, more := <-c.readChan:
if !more {
return 0, io.EOF
Expand Down
105 changes: 69 additions & 36 deletions pkg/messaging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"net"
"sync"
"time"

"github.com/skycoin/skycoin/src/util/logging"

Expand All @@ -34,9 +35,19 @@ var (

type clientLink struct {
link *Link
addr string
chans *chanList
}

// Config configures Client.
type Config struct {
PubKey cipher.PubKey
SecKey cipher.SecKey
Discovery client.APIClient
Retries int
RetryDelay time.Duration
}

// Client sends messages to remote client nodes via relay Server.
type Client struct {
Logger *logging.Logger
Expand All @@ -46,6 +57,9 @@ type Client struct {
dc client.APIClient
pool *Pool

retries int
retryDelay time.Duration

links map[cipher.PubKey]*clientLink
mu sync.RWMutex

Expand All @@ -54,15 +68,17 @@ type Client struct {
}

// NewClient constructs a new Client.
func NewClient(pubKey cipher.PubKey, secKey cipher.SecKey, discoveryClient client.APIClient) *Client {
func NewClient(conf *Config) *Client {
c := &Client{
Logger: logging.MustGetLogger("messenger"),
pubKey: pubKey,
secKey: secKey,
dc: discoveryClient,
links: make(map[cipher.PubKey]*clientLink),
newChan: make(chan *channel),
doneChan: make(chan struct{}),
Logger: logging.MustGetLogger("messenger"),
pubKey: conf.PubKey,
secKey: conf.SecKey,
dc: conf.Discovery,
retries: conf.Retries,
retryDelay: conf.RetryDelay,
links: make(map[cipher.PubKey]*clientLink),
newChan: make(chan *channel),
doneChan: make(chan struct{}),
}
config := &LinkConfig{
Public: c.pubKey,
Expand Down Expand Up @@ -142,14 +158,14 @@ func (c *Client) Dial(ctx context.Context, remote cipher.PubKey) (transport.Tran
if err != nil {
return nil, fmt.Errorf("noise setup: %s", err)
}
channel.ID = clientLink.chans.add(channel)
localID := clientLink.chans.add(channel)

msg, err := channel.noise.HandshakeMessage()
if err != nil {
return nil, fmt.Errorf("noise handshake: %s", err)
}

if _, err := clientLink.link.SendOpenChannel(channel.ID, remote, msg); err != nil {
if _, err := clientLink.link.SendOpenChannel(localID, remote, msg); err != nil {
return nil, fmt.Errorf("failed to open channel: %s", err)
}

Expand All @@ -162,7 +178,7 @@ func (c *Client) Dial(ctx context.Context, remote cipher.PubKey) (transport.Tran
return nil, ctx.Err()
}

c.Logger.Infof("Opened new channel ID %d with %s", channel.ID, remote)
c.Logger.Infof("Opened new channel local ID %d, remote ID %d with %s", localID, channel.ID, remote)
return newAckedChannel(channel), nil
}

Expand Down Expand Up @@ -222,7 +238,7 @@ func (c *Client) link(remotePK cipher.PubKey, addr string) (*clientLink, error)
}

c.Logger.Infof("Opened new link with the server %s", remotePK)
clientLink := &clientLink{l, newChanList()}
clientLink := &clientLink{l, addr, newChanList()}
c.mu.Lock()
c.links[remotePK] = clientLink
c.mu.Unlock()
Expand Down Expand Up @@ -274,12 +290,13 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error {

c.Logger.Debugf("New frame %s from %s@%d", frameType, remotePK, channelID)
if frameType == FrameTypeOpenChannel {
if msg, err := c.openChannel(channelID, body[1:34], body[34:], clientLink); err != nil {
if lID, msg, err := c.openChannel(channelID, body[1:34], body[34:], clientLink); err != nil {
c.Logger.Warnf("Failed to open new channel for %s: %s", remotePK, err)
_, sendErr = l.SendChannelClosed(channelID)
} else {
c.Logger.Infof("Opened new channel ID %d with %s", channelID, hex.EncodeToString(body[1:34]))
_, sendErr = l.SendChannelOpened(channelID, msg)
c.Logger.Infof("Opened new channel local ID %d, remote ID %d with %s", lID, channelID,
hex.EncodeToString(body[1:34]))
_, sendErr = l.SendChannelOpened(channelID, lID, msg)
}

return c.warnSendError(remotePK, sendErr)
Expand All @@ -296,10 +313,11 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error {
switch frameType {
case FrameTypeCloseChannel:
clientLink.chans.remove(channelID)
_, sendErr = l.SendChannelClosed(channelID)
_, sendErr = l.SendChannelClosed(channel.ID)
c.Logger.Debugf("Closed channel ID %d", channelID)
case FrameTypeChannelOpened:
if err := channel.noise.ProcessMessage(body[1:]); err != nil {
channel.ID = body[1]
if err := channel.noise.ProcessMessage(body[2:]); err != nil {
sendErr = fmt.Errorf("noise handshake: %s", err)
}

Expand All @@ -308,6 +326,7 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error {
default:
}
case FrameTypeChannelClosed:
channel.ID = body[0]
select {
case channel.waitChan <- false:
case channel.closeChan <- struct{}{}:
Expand All @@ -329,7 +348,6 @@ func (c *Client) onData(l *Link, frameType FrameType, body []byte) error {

func (c *Client) onClose(l *Link, remote bool) {
remotePK := l.Remote()
c.Logger.Infof("Closing link with the server %s", remotePK)

c.mu.RLock()
chanLink := c.links[remotePK]
Expand All @@ -339,6 +357,21 @@ func (c *Client) onClose(l *Link, remote bool) {
channel.close()
}

select {
case <-c.doneChan:
default:
c.Logger.Infof("Disconnected from the server %s. Trying to re-connect...", remotePK)
for attemp := 0; attemp < c.retries; attemp++ {
if _, err := c.link(remotePK, chanLink.addr); err == nil {
c.Logger.Infof("Re-connected to the server %s", remotePK)
return
}
time.Sleep(c.retryDelay)
}
}

c.Logger.Infof("Closing link with the server %s", remotePK)

c.mu.Lock()
delete(c.links, remotePK)
c.mu.Unlock()
Expand All @@ -348,41 +381,41 @@ func (c *Client) onClose(l *Link, remote bool) {
}
}

func (c *Client) openChannel(channelID byte, remotePK []byte, msg []byte, chanLink *clientLink) ([]byte, error) {
channel := chanLink.chans.get(channelID)
if channel != nil {
return nil, errors.New("channel is already opened")
func (c *Client) openChannel(rID byte, remotePK []byte, noiseMsg []byte, chanLink *clientLink) (lID byte, noiseRes []byte, err error) {
var pubKey cipher.PubKey
pubKey, err = cipher.NewPubKey(remotePK)
if err != nil {
return
}

pubKey, err := cipher.NewPubKey(remotePK)
channel, err := newChannel(false, c.secKey, pubKey, chanLink.link)
channel.ID = rID
if err != nil {
return nil, err
err = fmt.Errorf("noise setup: %s", err)
return
}
channel, err = newChannel(false, c.secKey, pubKey, chanLink.link)
if err != nil {
return nil, fmt.Errorf("noise setup: %s", err)
}

channel.ID = channelID
chanLink.chans.set(channelID, channel)

if err := channel.noise.ProcessMessage(msg); err != nil {
return nil, fmt.Errorf("noise handshake: %s", err)
if err = channel.noise.ProcessMessage(noiseMsg); err != nil {
err = fmt.Errorf("noise handshake: %s", err)
return
}

lID = chanLink.chans.add(channel)

go func() {
select {
case <-c.doneChan:
case c.newChan <- channel:
}
}()

res, err := channel.noise.HandshakeMessage()
noiseRes, err = channel.noise.HandshakeMessage()
if err != nil {
return nil, fmt.Errorf("noise handshake: %s", err)
err = fmt.Errorf("noise handshake: %s", err)
return
}

return res, nil
return lID, noiseRes, err
}

func (c *Client) warnSendError(remote cipher.PubKey, err error) error {
Expand Down
26 changes: 20 additions & 6 deletions pkg/messaging/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestMain(m *testing.M) {
func TestClientConnectInitialServers(t *testing.T) {
pk, sk := cipher.GenerateKeyPair()
discovery := client.NewMock()
c := NewClient(pk, sk, discovery)
c := NewClient(&Config{pk, sk, discovery, 1, 100 * time.Millisecond})

srv, err := newMockServer(discovery)
require.NoError(t, err)
Expand All @@ -44,7 +44,20 @@ func TestClientConnectInitialServers(t *testing.T) {
assert.Len(t, entry.Client.DelegatedServers, 1)
assert.Equal(t, srv.config.Public, entry.Client.DelegatedServers[0])

require.NoError(t, srv.Close())
c.mu.RLock()
l := c.links[srv.config.Public]
c.mu.RUnlock()
require.NotNil(t, l)
require.NoError(t, l.link.Close())

time.Sleep(200 * time.Millisecond)

c.mu.RLock()
require.Len(t, c.links, 1)
c.mu.RUnlock()

require.NoError(t, c.Close())

time.Sleep(100 * time.Millisecond)

c.mu.RLock()
Expand All @@ -59,7 +72,8 @@ func TestClientConnectInitialServers(t *testing.T) {
func TestClientDial(t *testing.T) {
pk, sk := cipher.GenerateKeyPair()
discovery := client.NewMock()
c := NewClient(pk, sk, discovery)
c := NewClient(&Config{pk, sk, discovery, 0, 0})
c.retries = 0

srv, err := newMockServer(discovery)
require.NoError(t, err)
Expand All @@ -68,7 +82,7 @@ func TestClientDial(t *testing.T) {
time.Sleep(100 * time.Millisecond)

anotherPK, anotherSK := cipher.GenerateKeyPair()
anotherClient := NewClient(anotherPK, anotherSK, discovery)
anotherClient := NewClient(&Config{anotherPK, anotherSK, discovery, 0, 0})
require.NoError(t, anotherClient.ConnectToInitialServers(context.TODO(), 1))

var anotherTr transport.Transport
Expand Down Expand Up @@ -115,8 +129,8 @@ func TestClientDial(t *testing.T) {
assert.Equal(t, 3, n)
assert.Equal(t, []byte("bar"), buf)

require.NoError(t, anotherTr.Close())
require.NoError(t, tr.Close())
require.NoError(t, anotherTr.Close())

time.Sleep(100 * time.Millisecond)

Expand Down Expand Up @@ -176,7 +190,7 @@ func (s *mockServer) onData(l *Link, frameType FrameType, body []byte) error {
case FrameTypeOpenChannel:
_, err = ol.SendOpenChannel(channelID, l.Remote(), body[34:])
case FrameTypeChannelOpened:
_, err = ol.SendChannelOpened(channelID, body[1:])
_, err = ol.SendChannelOpened(channelID, channelID, body[2:])
case FrameTypeCloseChannel:
l.SendChannelClosed(channelID) // nolint
_, err = ol.SendCloseChannel(channelID)
Expand Down
4 changes: 2 additions & 2 deletions pkg/messaging/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (c *Link) SendOpenChannel(channelID byte, remotePK cipher.PubKey, noiseMsg
}

// SendChannelOpened sends ChannelOpened frame.
func (c *Link) SendChannelOpened(channelID byte, noiseMsg []byte) (int, error) {
return c.writeFrame(FrameTypeChannelOpened, append([]byte{channelID}, noiseMsg...))
func (c *Link) SendChannelOpened(channelID byte, remoteID byte, noiseMsg []byte) (int, error) {
return c.writeFrame(FrameTypeChannelOpened, append([]byte{channelID, remoteID}, noiseMsg...))
}

// SendCloseChannel sends CloseChannel request.
Expand Down
16 changes: 13 additions & 3 deletions pkg/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"fmt"
"os"
"path/filepath"
"time"

"github.com/skycoin/skywire/pkg/messaging"

"github.com/skycoin/skywire/pkg/cipher"
mClient "github.com/skycoin/skywire/pkg/messaging-discovery/client"
Expand Down Expand Up @@ -57,15 +60,22 @@ type Config struct {
Interfaces InterfaceConfig `json:"interfaces"`
}

// MessagingDiscovery returns messaging discovery client.
func (c *Config) MessagingDiscovery() (mClient.APIClient, error) {
// MessagingConfig returns config for messaging client.
func (c *Config) MessagingConfig() (*messaging.Config, error) {

msgConfig := c.Messaging

if msgConfig.Discovery == "" {
return nil, errors.New("empty discovery")
}

return mClient.NewHTTP(msgConfig.Discovery), nil
return &messaging.Config{
PubKey: c.Node.StaticPubKey,
SecKey: c.Node.StaticSecKey,
Discovery: mClient.NewHTTP(msgConfig.Discovery),
Retries: 5,
RetryDelay: time.Second,
}, nil
}

// TransportDiscovery returns transport discovery client.
Expand Down
Loading

0 comments on commit 91fa461

Please sign in to comment.