Skip to content

Commit

Permalink
Integrate dmsg with ports into skywire.
Browse files Browse the repository at this point in the history
* Introduce 'network' module to handle skywire network connections and assigned ports.
* Refactored 'routeManager' to have less coupling with 'Router'.
  • Loading branch information
Evan Lin committed Aug 16, 2019
1 parent d61a857 commit c0a07a9
Show file tree
Hide file tree
Showing 33 changed files with 1,329 additions and 920 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ require (
)

// Uncomment for tests with alternate branches of 'dmsg'
//replace github.com/skycoin/dmsg => ../dmsg
replace github.com/skycoin/dmsg => ../dmsg
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5 h1:hyz3dwM5QLc1Rfoz4FuWJQG5BN7tc6K1MndAUnGpQr4=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
Expand Down
4 changes: 2 additions & 2 deletions pkg/app/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (f Frame) String() string {
case FrameCreateLoop:
return "CreateLoop"
case FrameConfirmLoop:
return "ConfirmLoop"
return "OnConfirmLoop"
case FrameSend:
return "Send"
case FrameClose:
Expand All @@ -35,7 +35,7 @@ const (
FrameInit Frame = iota
// FrameCreateLoop represents CreateLoop request frame type.
FrameCreateLoop
// FrameConfirmLoop represents ConfirmLoop request frame type.
// FrameConfirmLoop represents OnConfirmLoop request frame type.
FrameConfirmLoop
// FrameSend represents Send frame type.
FrameSend
Expand Down
172 changes: 172 additions & 0 deletions pkg/network/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package network

import (
"context"
"errors"
"fmt"
"github.com/skycoin/skycoin/src/util/logging"
"net"
"strings"
"sync"

"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/dmsg/disc"
)

// Default ports.
// TODO(evanlinjin): Define these properly. These are currently random.
const (
SetupPort = uint16(36) // Listening port of a setup node.
AwaitSetupPort = uint16(136) // Listening port of a visor node for setup operations.
TransportPort = uint16(45) // Listening port of a visor node for incoming transports.
)

// Networks.
const (
DmsgNet = "dmsg"
)

var (
ErrUnknownNetwork = errors.New("unknown network type")
)

type Config struct {
PubKey cipher.PubKey
SecKey cipher.SecKey
TpNetworks []string // networks to be used with transports

DmsgDiscAddr string
DmsgMinSrvs int
}

// Network represents
type Network struct {
conf Config
dmsgC *dmsg.Client
}

func New(conf Config) *Network {
dmsgC := dmsg.NewClient(conf.PubKey, conf.SecKey, disc.NewHTTP(conf.DmsgDiscAddr), dmsg.SetLogger(logging.MustGetLogger("network.dmsgC")))
return &Network{
conf: conf,
dmsgC: dmsgC,
}
}

func (n *Network) Init(ctx context.Context) error {
fmt.Println("dmsg: min_servers:", n.conf.DmsgMinSrvs)
if err := n.dmsgC.InitiateServerConnections(ctx, n.conf.DmsgMinSrvs); err != nil {
return fmt.Errorf("failed to initiate 'dmsg': %v", err)
}
return nil
}

func (n *Network) Close() error {
wg := new(sync.WaitGroup)
wg.Add(1)

var dmsgErr error
go func() {
dmsgErr = n.dmsgC.Close()
wg.Done()
}()

wg.Wait()
if dmsgErr != nil {
return dmsgErr
}
return nil
}

func (n *Network) LocalPK() cipher.PubKey { return n.conf.PubKey }

func (n *Network) LocalSK() cipher.SecKey { return n.conf.SecKey }

func (n *Network) Dmsg() *dmsg.Client { return n.dmsgC }

func (n *Network) Dial(network string, pk cipher.PubKey, port uint16) (*Conn, error) {
ctx := context.Background()
switch network {
case DmsgNet:
conn, err := n.dmsgC.Dial(ctx, pk, port)
if err != nil {
return nil, err
}
return makeConn(conn, network), nil
default:
return nil, ErrUnknownNetwork
}
}

func (n *Network) Listen(network string, port uint16) (*Listener, error) {
switch network {
case DmsgNet:
lis, err := n.dmsgC.Listen(port)
if err != nil {
return nil, err
}
return makeListener(lis, network), nil
default:
return nil, ErrUnknownNetwork
}
}

type Listener struct {
net.Listener
lPK cipher.PubKey
lPort uint16
network string
}

func makeListener(l net.Listener, network string) *Listener {
lPK, lPort := disassembleAddr(l.Addr())
return &Listener{Listener: l, lPK: lPK, lPort: lPort, network: network}
}

func (l Listener) LocalPK() cipher.PubKey { return l.lPK }
func (l Listener) LocalPort() uint16 { return l.lPort }
func (l Listener) Network() string { return l.network }

func (l Listener) AcceptConn() (*Conn, error) {
conn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return makeConn(conn, l.network), nil
}

type Conn struct {
net.Conn
lPK cipher.PubKey
rPK cipher.PubKey
lPort uint16
rPort uint16
network string
}

func makeConn(conn net.Conn, network string) *Conn {
lPK, lPort := disassembleAddr(conn.LocalAddr())
rPK, rPort := disassembleAddr(conn.RemoteAddr())
return &Conn{Conn: conn, lPK: lPK, rPK: rPK, lPort: lPort, rPort: rPort, network: network}
}

func (c Conn) LocalPK() cipher.PubKey { return c.lPK }
func (c Conn) RemotePK() cipher.PubKey { return c.rPK }
func (c Conn) LocalPort() uint16 { return c.lPort }
func (c Conn) RemotePort() uint16 { return c.rPort }
func (c Conn) Network() string { return c.network }

func disassembleAddr(addr net.Addr) (pk cipher.PubKey, port uint16) {
strs := strings.Split(addr.String(), ":")
if len(strs) != 2 {
panic(fmt.Errorf("network.disassembleAddr: %v %s", "invalid addr", addr.String()))
}
if err := pk.Set(strs[0]); err != nil {
panic(fmt.Errorf("network.disassembleAddr: %v %s", err, addr.String()))
}
if _, err := fmt.Sscanf(strs[1], "%d", &port); err != nil {
panic(fmt.Errorf("network.disassembleAddr: %v", err))
}
return
}
21 changes: 21 additions & 0 deletions pkg/network/network_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package network

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
)

func TestDisassembleAddr(t *testing.T) {
pk, _ := cipher.GenerateKeyPair()
port := uint16(2)
addr := dmsg.Addr{
PK: pk, Port: port,
}
gotPK, gotPort := disassembleAddr(addr)
require.Equal(t, pk, gotPK)
require.Equal(t, port, gotPort)
}
87 changes: 87 additions & 0 deletions pkg/network/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package network

//import (
// "github.com/skycoin/dmsg/disc"
// "github.com/skycoin/skycoin/src/util/logging"
// "github.com/stretchr/testify/require"
// "golang.org/x/net/nettest"
// "testing"
//)
//
//// KeyPair holds a public/private key pair.
//type KeyPair struct {
// PK cipher.PubKey
// SK cipher.SecKey
//}
//
//// GenKeyPairs generates 'n' number of key pairs.
//func GenKeyPairs(n int) []KeyPair {
// pairs := make([]KeyPair, n)
// for i := range pairs {
// pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte{byte(i)})
// if err != nil {
// panic(err)
// }
// pairs[i] = KeyPair{PK: pk, SK: sk}
// }
// return pairs
//}
//
//// TestEnv contains a dmsg environment.
//type TestEnv struct {
// Disc disc.APIClient
// Srv *Server
// Clients []*Client
// teardown func()
//}
//
//// SetupTestEnv creates a dmsg TestEnv.
//func SetupTestEnv(t *testing.T, keyPairs []KeyPair) *TestEnv {
// discovery := disc.NewMock()
//
// srv, srvErr := createServer(t, discovery)
//
// clients := make([]*Client, len(keyPairs))
// for i, pair := range keyPairs {
// t.Logf("dmsg_client[%d] PK: %s\n", i, pair.PK)
// c := NewClient(pair.PK, pair.SK, discovery,
// SetLogger(logging.MustGetLogger(fmt.Sprintf("client_%d:%s", i, pair.PK.String()[:6]))))
// require.NoError(t, c.InitiateServerConnections(context.TODO(), 1))
// clients[i] = c
// }
//
// teardown := func() {
// for _, c := range clients {
// require.NoError(t, c.Close())
// }
// require.NoError(t, srv.Close())
// for err := range srvErr {
// require.NoError(t, err)
// }
// }
//
// return &TestEnv{
// Disc: discovery,
// Srv: srv,
// Clients: clients,
// teardown: teardown,
// }
//}
//
//// TearDown shutdowns the TestEnv.
//func (e *TestEnv) TearDown() { e.teardown() }
//
//func createServer(t *testing.T, dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error) {
// pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte("s"))
// require.NoError(t, err)
// l, err := nettest.NewLocalListener("tcp")
// require.NoError(t, err)
// srv, err = dmsg.NewServer(pk, sk, "", l, dc)
// require.NoError(t, err)
// errCh := make(chan error, 1)
// go func() {
// errCh <- srv.Serve()
// close(errCh)
// }()
// return srv, errCh
//}
Loading

0 comments on commit c0a07a9

Please sign in to comment.