Skip to content

Commit

Permalink
Merge pull request #491 from Darkren/feature/vpn-improvements
Browse files Browse the repository at this point in the history
VPN improvements
  • Loading branch information
jdknives authored Sep 28, 2020
2 parents 427d6a7 + da7a697 commit ee1fa0e
Show file tree
Hide file tree
Showing 26 changed files with 603 additions and 204 deletions.
7 changes: 2 additions & 5 deletions internal/utclient/mock_api_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions pkg/app/appevent/mock_rpc_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions pkg/app/appnet/mock_networker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 57 additions & 0 deletions pkg/app/appnet/skywire_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package appnet

import (
"net"
"sync"
"time"

"github.com/skycoin/skywire/pkg/router"
)

// SkywireConn is a connection wrapper for skynet.
type SkywireConn struct {
net.Conn
nrg *router.NoiseRouteGroup
freePort func()
freePortMx sync.RWMutex
once sync.Once
}

// IsAlive checks whether connection is alive.
func (c *SkywireConn) IsAlive() bool {
return c.nrg.IsAlive()
}

// Latency returns latency till remote (ms).
func (c *SkywireConn) Latency() time.Duration {
return c.nrg.Latency()
}

// Throughput returns throughput till remote (bytes/s).
func (c *SkywireConn) Throughput() uint32 {
return c.nrg.Throughput()
}

// BandwidthSent returns amount of bandwidth sent (bytes).
func (c *SkywireConn) BandwidthSent() uint64 {
return c.nrg.BandwidthSent()
}

// Close closes connection.
func (c *SkywireConn) Close() error {
var err error

c.once.Do(func() {
defer func() {
c.freePortMx.RLock()
defer c.freePortMx.RUnlock()
if c.freePort != nil {
c.freePort()
}
}()

err = c.Conn.Close()
})

return err
}
35 changes: 6 additions & 29 deletions pkg/app/appnet/skywire_networker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ func (r *SkywireNetworker) DialContext(ctx context.Context, addr Addr) (conn net
return nil, err
}

return &skywireConn{
return &SkywireConn{
Conn: conn,
nrg: conn.(*router.NoiseRouteGroup),
freePort: freePort,
}, nil
}
Expand Down Expand Up @@ -178,7 +179,10 @@ func (l *skywireListener) Accept() (net.Conn, error) {
return nil, errors.New("listening on closed connection")
}

return conn, nil
return &SkywireConn{
Conn: conn,
nrg: conn.(*router.NoiseRouteGroup),
}, nil
}

// Close closes listener.
Expand All @@ -203,30 +207,3 @@ func (l *skywireListener) Addr() net.Addr {
func (l *skywireListener) putConn(conn net.Conn) {
l.connsCh <- conn
}

// skywireConn is a connection wrapper for skynet.
type skywireConn struct {
net.Conn
freePort func()
freePortMx sync.RWMutex
once sync.Once
}

// Close closes connection.
func (c *skywireConn) Close() error {
var err error

c.once.Do(func() {
defer func() {
c.freePortMx.RLock()
defer c.freePortMx.RUnlock()
if c.freePort != nil {
c.freePort()
}
}()

err = c.Conn.Close()
})

return err
}
32 changes: 26 additions & 6 deletions pkg/app/appserver/mock_proc_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 4 additions & 8 deletions pkg/app/appserver/mock_rpc_ingress_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions pkg/app/appserver/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/pkg/app/appcommon"
"github.com/skycoin/skywire/pkg/app/appdisc"
"github.com/skycoin/skywire/pkg/app/appnet"
)

var (
Expand All @@ -38,6 +40,7 @@ type Proc struct {
waitMx sync.Mutex
waitErr error

rpcGWMu sync.Mutex
rpcGW *RPCIngressGateway // gateway shared over 'conn' - introduced AFTER proc is started
conn net.Conn // connection to proc - introduced AFTER proc is started
connCh chan struct{} // push here when conn is received - protected by 'connOnce'
Expand Down Expand Up @@ -94,7 +97,9 @@ func (p *Proc) InjectConn(conn net.Conn) bool {
p.connOnce.Do(func() {
ok = true
p.conn = conn
p.rpcGWMu.Lock()
p.rpcGW = NewRPCGateway(p.log)
p.rpcGWMu.Unlock()

// Send ready signal.
p.connCh <- struct{}{}
Expand Down Expand Up @@ -254,3 +259,57 @@ func (p *Proc) Wait() error {
func (p *Proc) IsRunning() bool {
return atomic.LoadInt32(&p.isRunning) == 1
}

// ConnectionSummary sums up the connection stats.
type ConnectionSummary struct {
IsAlive bool `json:"is_alive"`
Latency time.Duration `json:"latency"`
Throughput uint32 `json:"throughput"`
BandwidthSent uint64 `json:"bandwidth_sent"`
Error string `json:"error"`
}

// ConnectionsSummary returns all of the proc's connections stats.
func (p *Proc) ConnectionsSummary() []ConnectionSummary {
p.rpcGWMu.Lock()
rpcGW := p.rpcGW
p.rpcGWMu.Unlock()

if rpcGW == nil {
return nil
}

var summaries []ConnectionSummary
rpcGW.cm.DoRange(func(id uint16, v interface{}) bool {
if v == nil {
summaries = append(summaries, ConnectionSummary{})
return true
}

conn, ok := v.(net.Conn)
if !ok {
summaries = append(summaries, ConnectionSummary{})
}

wrappedConn := conn.(*appnet.WrappedConn)

skywireConn, isSkywireConn := wrappedConn.Conn.(*appnet.SkywireConn)
if !isSkywireConn {
summaries = append(summaries, ConnectionSummary{
Error: "Can't get such info from this conn",
})
return true
}

summaries = append(summaries, ConnectionSummary{
IsAlive: skywireConn.IsAlive(),
Latency: skywireConn.Latency(),
Throughput: skywireConn.Throughput(),
BandwidthSent: skywireConn.BandwidthSent(),
})

return true
})

return summaries
}
10 changes: 10 additions & 0 deletions pkg/app/appserver/proc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ProcManager interface {
Stop(appName string) error
Wait(appName string) error
Range(next func(appName string, proc *Proc) bool)
ConnectionsSummary(appName string) ([]ConnectionSummary, error)
Addr() net.Addr
}

Expand Down Expand Up @@ -266,6 +267,15 @@ func (m *procManager) Range(next func(name string, proc *Proc) bool) {
}
}

func (m *procManager) ConnectionsSummary(appName string) ([]ConnectionSummary, error) {
p, err := m.get(appName)
if err != nil {
return nil, err
}

return p.ConnectionsSummary(), nil
}

// stopAll stops all the apps run with this manager instance.
func (m *procManager) stopAll() {
for name, proc := range m.procs {
Expand Down
9 changes: 3 additions & 6 deletions pkg/routefinder/rfclient/mock_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 5 additions & 11 deletions pkg/router/mock_router.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ee1fa0e

Please sign in to comment.