Skip to content

Commit

Permalink
Add SkywireNetworker, fix some data races
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Sep 30, 2019
1 parent 7d2d6f1 commit 3d50c32
Show file tree
Hide file tree
Showing 15 changed files with 636 additions and 204 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ require (
)

// Uncomment for tests with alternate branches of 'dmsg'
//replace github.com/skycoin/dmsg => ../dmsg
replace github.com/skycoin/dmsg => ../dmsg
6 changes: 6 additions & 0 deletions pkg/app2/appnet/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func ConvertAddr(addr net.Addr) (Addr, error) {
PubKey: a.PK,
Port: routing.Port(a.Port),
}, nil
case routing.Addr:
return Addr{
Net: TypeSkynet,
PubKey: a.PubKey,
Port: a.Port,
}, nil
default:
return Addr{}, ErrUnknownAddrType
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/app2/appnet/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package appnet provides methods to access different networks
// from skywire apps.
package appnet
8 changes: 8 additions & 0 deletions pkg/app2/appnet/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package appnet

import "errors"

var (
// ErrPortAlreadyBound is being returned when the desired port is already bound to.
ErrPortAlreadyBound = errors.New("port already bound")
)
193 changes: 193 additions & 0 deletions pkg/app2/appnet/skywire_networker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package appnet

import (
"context"
"net"
"sync"
"sync/atomic"

"github.com/skycoin/dmsg/netutil"
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/pkg/app2"
"github.com/skycoin/skywire/pkg/router"
"github.com/skycoin/skywire/pkg/routing"
)

// SkywireNetworker implements `Networker` for skynet.
type SkywireNetworker struct {
log *logging.Logger
r router.Interface
porter *netutil.Porter
isServing int32
}

// NewSkywireNetworker constructs skywire networker.
func NewSkywireNetworker(l *logging.Logger, r router.Interface) Networker {
return &SkywireNetworker{
log: l,
r: r,
porter: netutil.NewPorter(netutil.PorterMinEphemeral),
}
}

// Dial dials remote `addr` via `skynet`.
func (r *SkywireNetworker) Dial(addr Addr) (net.Conn, error) {
return r.DialContext(context.Background(), addr)
}

// Dial dials remote `addr` via `skynet` with context.
func (r *SkywireNetworker) DialContext(ctx context.Context, addr Addr) (net.Conn, error) {
localPort, freePort, err := r.porter.ReserveEphemeral(ctx, nil)
if err != nil {
return nil, err
}

_, err = r.r.DialRoutes(ctx, addr.PubKey, routing.Port(localPort), addr.Port, router.DefaultDialOptions)
if err != nil {
return nil, err
}
rg := &app2.MockConn{}

return &skywireConn{
Conn: rg,
freePort: freePort,
}, nil
}

// Listen starts listening on local `addr` in the skynet.
func (r *SkywireNetworker) Listen(addr Addr) (net.Listener, error) {
return r.ListenContext(context.Background(), addr)
}

// Listen starts listening on local `addr` in the skynet with context.
func (r *SkywireNetworker) ListenContext(ctx context.Context, addr Addr) (net.Listener, error) {
lis := &skywireListener{
addr: addr,
// TODO: pass buf size
connsCh: make(chan net.Conn, 1000000),
freePort: nil,
}

ok, freePort := r.porter.Reserve(uint16(addr.Port), lis)
if !ok {
return nil, ErrPortAlreadyBound
}

lis.freePortMx.Lock()
lis.freePort = freePort
lis.freePortMx.Unlock()

if atomic.CompareAndSwapInt32(&r.isServing, 0, 1) {
go func() {
if err := r.serve(); err != nil {
r.log.WithError(err).Error("error serving")
}
}()
}

return lis, nil
}

// serve accepts and serves routes.
func (r *SkywireNetworker) serve() error {
for {
_, err := r.r.AcceptRoutes()
if err != nil {
return err
}
rg := &app2.MockConn{}

go r.serveRG(rg)
}
}

// TODO: change to `*router.RouterGroup`
// serveRG passes accepted router group to the corresponding listener.
func (r *SkywireNetworker) serveRG(rg net.Conn) {
localAddr, ok := rg.LocalAddr().(routing.Addr)
if !ok {
r.closeRG(rg)
r.log.Error("wrong type of addr in accepted conn")
return
}

lisIfc, ok := r.porter.PortValue(uint16(localAddr.Port))
if !ok {
r.closeRG(rg)
r.log.Errorf("no listener on port %d", localAddr.Port)
return
}

lis, ok := lisIfc.(*skywireListener)
if !ok {
r.closeRG(rg)
r.log.Errorf("wrong type of listener on port %d", localAddr.Port)
return
}

lis.putConn(rg)
}

// TODO: change to `*router.RouterGroup`
// closeRG closes router group and logs error if any.
func (r *SkywireNetworker) closeRG(rg net.Conn) {
if err := rg.Close(); err != nil {
r.log.Error(err)
}
}

// skywireListener is a listener for skynet.
// Implements net.Listener.
type skywireListener struct {
addr Addr
connsCh chan net.Conn
freePort func()
freePortMx sync.RWMutex
}

// Accept accepts incoming connection.
func (l *skywireListener) Accept() (net.Conn, error) {
return <-l.connsCh, nil
}

// Close closes listener.
func (l *skywireListener) Close() error {
l.freePortMx.RLock()
defer l.freePortMx.RUnlock()
l.freePort()

return nil
}

// Addr returns local address.
func (l *skywireListener) Addr() net.Addr {
return l.addr
}

// putConn puts accepted conn to the listener to be later retrieved
// via `Accept`.
func (l *skywireListener) putConn(conn net.Conn) {
l.connsCh <- conn
}

// skywireConn is a connection wrapper for skynet.
type skywireConn struct {
// TODO: change to `*router.RouterGroup`
net.Conn
freePort func()
freePortMx sync.RWMutex
}

// Close closes connection.
func (c *skywireConn) Close() error {
defer func() {
c.freePortMx.RLock()
defer c.freePortMx.RUnlock()
if c.freePort != nil {
c.freePort()
}
}()

return c.Conn.Close()
}
5 changes: 4 additions & 1 deletion pkg/app2/appnet/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ type Type string
const (
// TypeDMSG is a network type for DMSG communication.
TypeDMSG Type = "dmsg"
// TypeSkynet is a network type for skywire communication.
TypeSkynet Type = "skynet"
)

// IsValid checks whether the network contains valid value for the type.
Expand All @@ -16,6 +18,7 @@ func (n Type) IsValid() bool {

var (
validNetworks = map[Type]struct{}{
TypeDMSG: {},
TypeDMSG: {},
TypeSkynet: {},
}
)
4 changes: 4 additions & 0 deletions pkg/app2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func (c *Client) Dial(remote appnet.Addr) (net.Conn, error) {
return nil, err
}

conn.freeConnMx.Lock()
conn.freeConn = free
conn.freeConnMx.Unlock()

return conn, nil
}
Expand Down Expand Up @@ -98,7 +100,9 @@ func (c *Client) Listen(n appnet.Type, port routing.Port) (net.Listener, error)
return nil, err
}

listener.freeLisMx.Lock()
listener.freeLis = freeLis
listener.freeLisMx.Unlock()

return listener, nil
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/app2/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app2

import (
"net"
"sync"
"time"

"github.com/skycoin/skywire/pkg/app2/appnet"
Expand All @@ -10,11 +11,12 @@ import (
// Conn is a connection from app client to the server.
// Implements `net.Conn`.
type Conn struct {
id uint16
rpc RPCClient
local appnet.Addr
remote appnet.Addr
freeConn func()
id uint16
rpc RPCClient
local appnet.Addr
remote appnet.Addr
freeConn func()
freeConnMx sync.RWMutex
}

func (c *Conn) Read(b []byte) (int, error) {
Expand All @@ -32,6 +34,8 @@ func (c *Conn) Write(b []byte) (int, error) {

func (c *Conn) Close() error {
defer func() {
c.freeConnMx.RLock()
defer c.freeConnMx.RUnlock()
if c.freeConn != nil {
c.freeConn()
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/app2/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app2

import (
"net"
"sync"

"github.com/skycoin/skycoin/src/util/logging"
"github.com/skycoin/skywire/pkg/app2/appnet"
Expand All @@ -10,12 +11,13 @@ import (
// Listener is a listener for app server connections.
// Implements `net.Listener`.
type Listener struct {
log *logging.Logger
id uint16
rpc RPCClient
addr appnet.Addr
cm *idManager // contains conns associated with their IDs
freeLis func()
log *logging.Logger
id uint16
rpc RPCClient
addr appnet.Addr
cm *idManager // contains conns associated with their IDs
freeLis func()
freeLisMx sync.RWMutex
}

func (l *Listener) Accept() (net.Conn, error) {
Expand All @@ -40,13 +42,17 @@ func (l *Listener) Accept() (net.Conn, error) {
return nil, err
}

conn.freeConnMx.Lock()
conn.freeConn = free
conn.freeConnMx.Unlock()

return conn, nil
}

func (l *Listener) Close() error {
defer func() {
l.freeLisMx.RLock()
defer l.freeLisMx.RUnlock()
if l.freeLis != nil {
l.freeLis()
}
Expand Down
13 changes: 9 additions & 4 deletions vendor/github.com/skycoin/dmsg/client_conn.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3d50c32

Please sign in to comment.