diff --git a/internal/utclient/mock_api_client.go b/internal/utclient/mock_api_client.go index 42f06f07b2..8252bd4924 100644 --- a/internal/utclient/mock_api_client.go +++ b/internal/utclient/mock_api_client.go @@ -2,11 +2,8 @@ package utclient -import ( - context "context" - - mock "github.com/stretchr/testify/mock" -) +import context "context" +import mock "github.com/stretchr/testify/mock" // MockAPIClient is an autogenerated mock type for the APIClient type type MockAPIClient struct { diff --git a/pkg/app/appevent/mock_rpc_client.go b/pkg/app/appevent/mock_rpc_client.go index 393e3af472..e14f85b6a2 100644 --- a/pkg/app/appevent/mock_rpc_client.go +++ b/pkg/app/appevent/mock_rpc_client.go @@ -2,13 +2,9 @@ package appevent -import ( - context "context" - - appcommon "github.com/skycoin/skywire/pkg/app/appcommon" - - mock "github.com/stretchr/testify/mock" -) +import appcommon "github.com/skycoin/skywire/pkg/app/appcommon" +import context "context" +import mock "github.com/stretchr/testify/mock" // MockRPCClient is an autogenerated mock type for the RPCClient type type MockRPCClient struct { diff --git a/pkg/app/appnet/mock_networker.go b/pkg/app/appnet/mock_networker.go index 2258fc8552..39178d034c 100644 --- a/pkg/app/appnet/mock_networker.go +++ b/pkg/app/appnet/mock_networker.go @@ -2,12 +2,9 @@ package appnet -import ( - context "context" - net "net" - - mock "github.com/stretchr/testify/mock" -) +import context "context" +import mock "github.com/stretchr/testify/mock" +import net "net" // MockNetworker is an autogenerated mock type for the Networker type type MockNetworker struct { diff --git a/pkg/app/appnet/skywire_conn.go b/pkg/app/appnet/skywire_conn.go new file mode 100644 index 0000000000..b925625b87 --- /dev/null +++ b/pkg/app/appnet/skywire_conn.go @@ -0,0 +1,57 @@ +package appnet + +import ( + "net" + "sync" + "time" + + "github.com/skycoin/skywire/pkg/router" +) + +// SkywireConn is a connection wrapper for skynet. +type SkywireConn struct { + net.Conn + nrg *router.NoiseRouteGroup + freePort func() + freePortMx sync.RWMutex + once sync.Once +} + +// IsAlive checks whether connection is alive. +func (c *SkywireConn) IsAlive() bool { + return c.nrg.IsAlive() +} + +// Latency returns latency till remote (ms). +func (c *SkywireConn) Latency() time.Duration { + return c.nrg.Latency() +} + +// Throughput returns throughput till remote (bytes/s). +func (c *SkywireConn) Throughput() uint32 { + return c.nrg.Throughput() +} + +// BandwidthSent returns amount of bandwidth sent (bytes). +func (c *SkywireConn) BandwidthSent() uint64 { + return c.nrg.BandwidthSent() +} + +// Close closes connection. +func (c *SkywireConn) Close() error { + var err error + + c.once.Do(func() { + defer func() { + c.freePortMx.RLock() + defer c.freePortMx.RUnlock() + if c.freePort != nil { + c.freePort() + } + }() + + err = c.Conn.Close() + }) + + return err +} diff --git a/pkg/app/appnet/skywire_networker.go b/pkg/app/appnet/skywire_networker.go index 0328d6d0e2..e409d4074f 100644 --- a/pkg/app/appnet/skywire_networker.go +++ b/pkg/app/appnet/skywire_networker.go @@ -62,8 +62,9 @@ func (r *SkywireNetworker) DialContext(ctx context.Context, addr Addr) (conn net return nil, err } - return &skywireConn{ + return &SkywireConn{ Conn: conn, + nrg: conn.(*router.NoiseRouteGroup), freePort: freePort, }, nil } @@ -178,7 +179,10 @@ func (l *skywireListener) Accept() (net.Conn, error) { return nil, errors.New("listening on closed connection") } - return conn, nil + return &SkywireConn{ + Conn: conn, + nrg: conn.(*router.NoiseRouteGroup), + }, nil } // Close closes listener. @@ -203,30 +207,3 @@ func (l *skywireListener) Addr() net.Addr { func (l *skywireListener) putConn(conn net.Conn) { l.connsCh <- conn } - -// skywireConn is a connection wrapper for skynet. -type skywireConn struct { - net.Conn - freePort func() - freePortMx sync.RWMutex - once sync.Once -} - -// Close closes connection. -func (c *skywireConn) Close() error { - var err error - - c.once.Do(func() { - defer func() { - c.freePortMx.RLock() - defer c.freePortMx.RUnlock() - if c.freePort != nil { - c.freePort() - } - }() - - err = c.Conn.Close() - }) - - return err -} diff --git a/pkg/app/appserver/mock_proc_manager.go b/pkg/app/appserver/mock_proc_manager.go index 4559089282..63b9935468 100644 --- a/pkg/app/appserver/mock_proc_manager.go +++ b/pkg/app/appserver/mock_proc_manager.go @@ -2,12 +2,9 @@ package appserver -import ( - appcommon "github.com/skycoin/skywire/pkg/app/appcommon" - mock "github.com/stretchr/testify/mock" - - net "net" -) +import appcommon "github.com/skycoin/skywire/pkg/app/appcommon" +import mock "github.com/stretchr/testify/mock" +import net "net" // MockProcManager is an autogenerated mock type for the ProcManager type type MockProcManager struct { @@ -44,6 +41,29 @@ func (_m *MockProcManager) Close() error { return r0 } +// ConnectionsSummary provides a mock function with given fields: appName +func (_m *MockProcManager) ConnectionsSummary(appName string) ([]ConnectionSummary, error) { + ret := _m.Called(appName) + + var r0 []ConnectionSummary + if rf, ok := ret.Get(0).(func(string) []ConnectionSummary); ok { + r0 = rf(appName) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]ConnectionSummary) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(appName) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // ProcByName provides a mock function with given fields: appName func (_m *MockProcManager) ProcByName(appName string) (*Proc, bool) { ret := _m.Called(appName) diff --git a/pkg/app/appserver/mock_rpc_ingress_client.go b/pkg/app/appserver/mock_rpc_ingress_client.go index c5de1206c1..c3e3494fd1 100644 --- a/pkg/app/appserver/mock_rpc_ingress_client.go +++ b/pkg/app/appserver/mock_rpc_ingress_client.go @@ -2,14 +2,10 @@ package appserver -import ( - appnet "github.com/skycoin/skywire/pkg/app/appnet" - mock "github.com/stretchr/testify/mock" - - routing "github.com/skycoin/skywire/pkg/routing" - - time "time" -) +import appnet "github.com/skycoin/skywire/pkg/app/appnet" +import mock "github.com/stretchr/testify/mock" +import routing "github.com/skycoin/skywire/pkg/routing" +import time "time" // MockRPCIngressClient is an autogenerated mock type for the RPCIngressClient type type MockRPCIngressClient struct { diff --git a/pkg/app/appserver/proc.go b/pkg/app/appserver/proc.go index e22524c97e..855077b55e 100644 --- a/pkg/app/appserver/proc.go +++ b/pkg/app/appserver/proc.go @@ -11,11 +11,13 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/skywire/pkg/app/appcommon" "github.com/skycoin/skywire/pkg/app/appdisc" + "github.com/skycoin/skywire/pkg/app/appnet" ) var ( @@ -38,6 +40,7 @@ type Proc struct { waitMx sync.Mutex waitErr error + rpcGWMu sync.Mutex rpcGW *RPCIngressGateway // gateway shared over 'conn' - introduced AFTER proc is started conn net.Conn // connection to proc - introduced AFTER proc is started connCh chan struct{} // push here when conn is received - protected by 'connOnce' @@ -94,7 +97,9 @@ func (p *Proc) InjectConn(conn net.Conn) bool { p.connOnce.Do(func() { ok = true p.conn = conn + p.rpcGWMu.Lock() p.rpcGW = NewRPCGateway(p.log) + p.rpcGWMu.Unlock() // Send ready signal. p.connCh <- struct{}{} @@ -254,3 +259,57 @@ func (p *Proc) Wait() error { func (p *Proc) IsRunning() bool { return atomic.LoadInt32(&p.isRunning) == 1 } + +// ConnectionSummary sums up the connection stats. +type ConnectionSummary struct { + IsAlive bool `json:"is_alive"` + Latency time.Duration `json:"latency"` + Throughput uint32 `json:"throughput"` + BandwidthSent uint64 `json:"bandwidth_sent"` + Error string `json:"error"` +} + +// ConnectionsSummary returns all of the proc's connections stats. +func (p *Proc) ConnectionsSummary() []ConnectionSummary { + p.rpcGWMu.Lock() + rpcGW := p.rpcGW + p.rpcGWMu.Unlock() + + if rpcGW == nil { + return nil + } + + var summaries []ConnectionSummary + rpcGW.cm.DoRange(func(id uint16, v interface{}) bool { + if v == nil { + summaries = append(summaries, ConnectionSummary{}) + return true + } + + conn, ok := v.(net.Conn) + if !ok { + summaries = append(summaries, ConnectionSummary{}) + } + + wrappedConn := conn.(*appnet.WrappedConn) + + skywireConn, isSkywireConn := wrappedConn.Conn.(*appnet.SkywireConn) + if !isSkywireConn { + summaries = append(summaries, ConnectionSummary{ + Error: "Can't get such info from this conn", + }) + return true + } + + summaries = append(summaries, ConnectionSummary{ + IsAlive: skywireConn.IsAlive(), + Latency: skywireConn.Latency(), + Throughput: skywireConn.Throughput(), + BandwidthSent: skywireConn.BandwidthSent(), + }) + + return true + }) + + return summaries +} diff --git a/pkg/app/appserver/proc_manager.go b/pkg/app/appserver/proc_manager.go index 500685f3dd..05e64bca03 100644 --- a/pkg/app/appserver/proc_manager.go +++ b/pkg/app/appserver/proc_manager.go @@ -41,6 +41,7 @@ type ProcManager interface { Stop(appName string) error Wait(appName string) error Range(next func(appName string, proc *Proc) bool) + ConnectionsSummary(appName string) ([]ConnectionSummary, error) Addr() net.Addr } @@ -266,6 +267,15 @@ func (m *procManager) Range(next func(name string, proc *Proc) bool) { } } +func (m *procManager) ConnectionsSummary(appName string) ([]ConnectionSummary, error) { + p, err := m.get(appName) + if err != nil { + return nil, err + } + + return p.ConnectionsSummary(), nil +} + // stopAll stops all the apps run with this manager instance. func (m *procManager) stopAll() { for name, proc := range m.procs { diff --git a/pkg/routefinder/rfclient/mock_client.go b/pkg/routefinder/rfclient/mock_client.go index decbba4c0f..5291ea6ba3 100644 --- a/pkg/routefinder/rfclient/mock_client.go +++ b/pkg/routefinder/rfclient/mock_client.go @@ -2,12 +2,9 @@ package rfclient -import ( - context "context" - - routing "github.com/skycoin/skywire/pkg/routing" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import mock "github.com/stretchr/testify/mock" +import routing "github.com/skycoin/skywire/pkg/routing" // MockClient is an autogenerated mock type for the Client type type MockClient struct { diff --git a/pkg/router/mock_router.go b/pkg/router/mock_router.go index 6b1865b74f..aeb1d9666c 100644 --- a/pkg/router/mock_router.go +++ b/pkg/router/mock_router.go @@ -2,17 +2,11 @@ package router -import ( - context "context" - - cipher "github.com/skycoin/dmsg/cipher" - - mock "github.com/stretchr/testify/mock" - - net "net" - - routing "github.com/skycoin/skywire/pkg/routing" -) +import cipher "github.com/skycoin/dmsg/cipher" +import context "context" +import mock "github.com/stretchr/testify/mock" +import net "net" +import routing "github.com/skycoin/skywire/pkg/routing" // MockRouter is an autogenerated mock type for the Router type type MockRouter struct { diff --git a/pkg/router/network_stats.go b/pkg/router/network_stats.go new file mode 100644 index 0000000000..f7862d0d9e --- /dev/null +++ b/pkg/router/network_stats.go @@ -0,0 +1,67 @@ +package router + +import ( + "sync" + "sync/atomic" + "time" +) + +type networkStats struct { + bandwidthSent uint64 + bandwidthReceived uint64 + latency uint32 + throughput uint32 + + bandwidthReceivedRecStartMu sync.Mutex + bandwidthReceivedRecStart time.Time +} + +func newNetworkStats() *networkStats { + return &networkStats{ + bandwidthReceivedRecStart: time.Now(), + } +} + +func (s *networkStats) SetLatency(latency time.Duration) { + atomic.StoreUint32(&s.latency, uint32(latency.Milliseconds())) +} + +func (s *networkStats) Latency() time.Duration { + latencyMs := atomic.LoadUint32(&s.latency) + + return time.Duration(latencyMs) +} + +func (s *networkStats) SetLocalThroughput(throughput uint32) { + atomic.StoreUint32(&s.throughput, throughput) +} + +func (s *networkStats) LocalThroughput() uint32 { + return atomic.LoadUint32(&s.throughput) +} + +func (s *networkStats) BandwidthSent() uint64 { + return atomic.LoadUint64(&s.bandwidthSent) +} + +func (s *networkStats) AddBandwidthSent(amount uint64) { + atomic.AddUint64(&s.bandwidthSent, amount) +} + +func (s *networkStats) AddBandwidthReceived(amount uint64) { + atomic.AddUint64(&s.bandwidthReceived, amount) +} + +func (s *networkStats) RemoteThroughput() int64 { + s.bandwidthReceivedRecStartMu.Lock() + timePassed := time.Since(s.bandwidthReceivedRecStart) + s.bandwidthReceivedRecStart = time.Now() + s.bandwidthReceivedRecStartMu.Unlock() + + bandwidth := atomic.LoadUint64(&s.bandwidthReceived) + atomic.StoreUint64(&s.bandwidthReceived, 0) + + throughput := float64(bandwidth) / timePassed.Seconds() + + return int64(throughput) +} diff --git a/pkg/router/noise_route_group.go b/pkg/router/noise_route_group.go index 99879a8c7c..637e9d6260 100644 --- a/pkg/router/noise_route_group.go +++ b/pkg/router/noise_route_group.go @@ -2,27 +2,52 @@ package router import ( "net" + "time" "github.com/skycoin/skywire/pkg/routing" ) -type noiseRouteGroup struct { +// NoiseRouteGroup is a route group wrapped with noise. +// Implements net.Conn. +type NoiseRouteGroup struct { rg *RouteGroup net.Conn } -func (nrg *noiseRouteGroup) LocalAddr() net.Addr { +// LocalAddr returns local address. +func (nrg *NoiseRouteGroup) LocalAddr() net.Addr { return nrg.rg.LocalAddr() } -func (nrg *noiseRouteGroup) RemoteAddr() net.Addr { +// RemoteAddr returns remote address. +func (nrg *NoiseRouteGroup) RemoteAddr() net.Addr { return nrg.rg.RemoteAddr() } -func (nrg *noiseRouteGroup) isClosed() bool { +// IsAlive checks if connection is alive. +func (nrg *NoiseRouteGroup) IsAlive() bool { + return nrg.rg.IsAlive() +} + +// Latency returns latency till remote (ms). +func (nrg *NoiseRouteGroup) Latency() time.Duration { + return nrg.rg.Latency() +} + +// Throughput returns throughput till remote (bytes/s). +func (nrg *NoiseRouteGroup) Throughput() uint32 { + return nrg.rg.Throughput() +} + +// BandwidthSent returns amount of bandwidth sent (bytes). +func (nrg *NoiseRouteGroup) BandwidthSent() uint64 { + return nrg.rg.BandwidthSent() +} + +func (nrg *NoiseRouteGroup) isClosed() bool { return nrg.rg.isClosed() } -func (nrg *noiseRouteGroup) handlePacket(packet routing.Packet) error { +func (nrg *NoiseRouteGroup) handlePacket(packet routing.Packet) error { return nrg.rg.handlePacket(packet) } diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 392ef68bfc..5d7e074516 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -3,6 +3,7 @@ package router import ( "bytes" "context" + "encoding/binary" "errors" "fmt" "io" @@ -21,6 +22,7 @@ import ( const ( defaultRouteGroupKeepAliveInterval = DefaultRouteKeepAlive / 2 + defaultNetworkProbeInterval = 3 * time.Second defaultReadChBufSize = 1024 closeRoutineTimeout = 2 * time.Second ) @@ -44,18 +46,22 @@ func (timeoutError) Error() string { return "timeout" } func (timeoutError) Timeout() bool { return true } func (timeoutError) Temporary() bool { return true } +type sendServicePacketFn func(interval time.Duration) + // RouteGroupConfig configures RouteGroup. type RouteGroupConfig struct { - ReadChBufSize int - KeepAliveInterval time.Duration + ReadChBufSize int + KeepAliveInterval time.Duration + NetworkProbeInterval time.Duration } // DefaultRouteGroupConfig returns default RouteGroup config. // Used by default if config is nil. func DefaultRouteGroupConfig() *RouteGroupConfig { return &RouteGroupConfig{ - KeepAliveInterval: defaultRouteGroupKeepAliveInterval, - ReadChBufSize: defaultReadChBufSize, + KeepAliveInterval: defaultRouteGroupKeepAliveInterval, + NetworkProbeInterval: defaultNetworkProbeInterval, + ReadChBufSize: defaultReadChBufSize, } } @@ -98,6 +104,8 @@ type RouteGroup struct { readDeadline deadline.PipeDeadline writeDeadline deadline.PipeDeadline + networkStats *networkStats + // used as a bool to indicate if this particular route group initiated close loop closeInitiated int32 remoteClosedOnce sync.Once @@ -129,10 +137,9 @@ func NewRouteGroup(cfg *RouteGroupConfig, rt routing.Table, desc routing.RouteDe readDeadline: deadline.MakePipeDeadline(), writeDeadline: deadline.MakePipeDeadline(), handshakeProcessed: make(chan struct{}), + networkStats: newNetworkStats(), } - go rg.keepAliveLoop(cfg.KeepAliveInterval) - return rg } @@ -240,6 +247,26 @@ func (rg *RouteGroup) SetWriteDeadline(t time.Time) error { return nil } +// IsAlive checks whether connection is alive. +func (rg *RouteGroup) IsAlive() bool { + return !rg.isClosed() && !rg.isRemoteClosed() +} + +// Latency returns latency till remote (ms). +func (rg *RouteGroup) Latency() time.Duration { + return rg.networkStats.Latency() +} + +// Throughput returns throughput till remote (bytes/s). +func (rg *RouteGroup) Throughput() uint32 { + return rg.networkStats.LocalThroughput() +} + +// BandwidthSent returns amount of bandwidth sent (bytes). +func (rg *RouteGroup) BandwidthSent() uint64 { + return rg.networkStats.BandwidthSent() +} + // read reads incoming data. It tries to fetch the data from the internal buffer. // If buffer is empty it blocks on receiving from the data channel func (rg *RouteGroup) read(p []byte) (int, error) { @@ -323,6 +350,10 @@ func (rg *RouteGroup) writePacket(ctx context.Context, tp *transport.ManagedTran err := tp.WritePacket(ctx, packet) // note equality here. update activity only if there was NO error if err == nil { + if packet.Type() == routing.DataPacket { + rg.networkStats.AddBandwidthSent(uint64(packet.Size())) + } + if err := rg.rt.UpdateActivity(ruleID); err != nil { rg.logger.WithError(err).Errorf("error updating activity of rule %d", ruleID) } @@ -359,26 +390,70 @@ func (rg *RouteGroup) tp() (*transport.ManagedTransport, error) { return tp, nil } -func (rg *RouteGroup) keepAliveLoop(interval time.Duration) { +func (rg *RouteGroup) startOffServiceLoops() { + go rg.servicePacketLoop("keep-alive", rg.cfg.KeepAliveInterval, rg.keepAliveServiceFn) + go rg.servicePacketLoop("network probe", rg.cfg.NetworkProbeInterval, rg.networkProbeServiceFn) +} + +func (rg *RouteGroup) sendNetworkProbe() error { + rg.mu.Lock() + + if len(rg.tps) == 0 || len(rg.fwd) == 0 { + rg.mu.Unlock() + // if no transports, no rules, then no latency probe + return nil + } + + tp := rg.tps[0] + rule := rg.fwd[0] + rg.mu.Unlock() + + if tp == nil { + return nil + } + + throughput := rg.networkStats.RemoteThroughput() + timestamp := time.Now().UnixNano() / int64(time.Millisecond) + + packet := routing.MakeNetworkProbePacket(rule.NextRouteID(), timestamp, throughput) + + if err := rg.writePacket(context.Background(), tp, packet, rule.KeyRouteID()); err != nil { + return err + } + + return nil +} + +func (rg *RouteGroup) networkProbeServiceFn(_ time.Duration) { + if err := rg.sendNetworkProbe(); err != nil { + rg.logger.Warnf("Failed to send network probe: %v", err) + } +} + +func (rg *RouteGroup) servicePacketLoop(name string, interval time.Duration, f sendServicePacketFn) { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-rg.remoteClosed: - rg.logger.Infoln("Remote got closed, stopping keep-alive loop") + rg.logger.Infof("Remote got closed, stopping %s loop", name) return case <-ticker.C: - lastSent := time.Unix(0, atomic.LoadInt64(&rg.lastSent)) + f(interval) + } + } +} - if time.Since(lastSent) < interval { - continue - } +func (rg *RouteGroup) keepAliveServiceFn(interval time.Duration) { + lastSent := time.Unix(0, atomic.LoadInt64(&rg.lastSent)) - if err := rg.sendKeepAlive(); err != nil { - rg.logger.Warnf("Failed to send keepalive: %v", err) - } - } + if time.Since(lastSent) < interval { + return + } + + if err := rg.sendKeepAlive(); err != nil { + rg.logger.Warnf("Failed to send keepalive: %v", err) } } @@ -508,6 +583,8 @@ func (rg *RouteGroup) handlePacket(packet routing.Packet) error { close(rg.handshakeProcessed) }) return rg.handleDataPacket(packet) + case routing.NetworkProbePacket: + return rg.handleNetworkProbePacket(packet) case routing.HandshakePacket: rg.handshakeProcessedOnce.Do(func() { // first packet is handshake packet, so we're communicating with the new visor @@ -523,7 +600,24 @@ func (rg *RouteGroup) handlePacket(packet routing.Packet) error { return nil } +func (rg *RouteGroup) handleNetworkProbePacket(packet routing.Packet) error { + payload := packet.Payload() + + sentAtMs := binary.BigEndian.Uint64(payload) + throughput := binary.BigEndian.Uint64(payload[8:]) + + ms := sentAtMs % 1000 + sentAt := time.Unix(int64(sentAtMs/1000), int64(ms)*int64(time.Millisecond)) + + rg.networkStats.SetLatency(time.Since(sentAt)) + rg.networkStats.SetLocalThroughput(uint32(throughput)) + + return nil +} + func (rg *RouteGroup) handleDataPacket(packet routing.Packet) error { + rg.networkStats.AddBandwidthReceived(uint64(packet.Size())) + select { case <-rg.closed: return io.ErrClosedPipe diff --git a/pkg/router/router.go b/pkg/router/router.go index ee366063c8..db98be2992 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -143,7 +143,7 @@ type router struct { trustedVisors map[cipher.PubKey]struct{} tm *transport.Manager rt routing.Table - rgsNs map[routing.RouteDescriptor]*noiseRouteGroup // Noise-wrapped route groups to push incoming reads from transports. + rgsNs map[routing.RouteDescriptor]*NoiseRouteGroup // Noise-wrapped route groups to push incoming reads from transports. rgsRaw map[routing.RouteDescriptor]*RouteGroup // Not-yet-noise-wrapped route groups. when one of these gets wrapped, it gets removed from here rpcSrv *rpc.Server accept chan routing.EdgeRules @@ -173,7 +173,7 @@ func New(n *snet.Network, config *Config) (Router, error) { tm: config.TransportManager, rt: routing.NewTable(), sl: sl, - rgsNs: make(map[routing.RouteDescriptor]*noiseRouteGroup), + rgsNs: make(map[routing.RouteDescriptor]*NoiseRouteGroup), rgsRaw: make(map[routing.RouteDescriptor]*RouteGroup), rpcSrv: rpc.NewServer(), accept: make(chan routing.EdgeRules, acceptSize), @@ -249,6 +249,8 @@ func (r *router) DialRoutes( return nil, fmt.Errorf("saveRouteGroupRules: %w", err) } + nrg.rg.startOffServiceLoops() + r.logger.Infof("Created new routes to %s on port %d", rPK, lPort) return nrg, nil @@ -298,6 +300,8 @@ func (r *router) AcceptRoutes(ctx context.Context) (net.Conn, error) { return nil, fmt.Errorf("saveRouteGroupRules: %w", err) } + nrg.rg.startOffServiceLoops() + return nrg, nil } @@ -367,7 +371,7 @@ func (r *router) serveSetup() { } } -func (r *router) saveRouteGroupRules(rules routing.EdgeRules, nsConf noise.Config) (*noiseRouteGroup, error) { +func (r *router) saveRouteGroupRules(rules routing.EdgeRules, nsConf noise.Config) (*NoiseRouteGroup, error) { r.logger.Infof("Saving route group rules with desc: %s", &rules.Desc) // When route group is wrapped with noise, it's put into `nrgs`. but before that, @@ -456,12 +460,12 @@ func (r *router) saveRouteGroupRules(rules routing.EdgeRules, nsConf noise.Confi return nil, fmt.Errorf("WrapConn (%s): %w", &rules.Desc, err) } - nrg = &noiseRouteGroup{ + nrg = &NoiseRouteGroup{ rg: rg, Conn: wrappedRG, } } else { - nrg = &noiseRouteGroup{ + nrg = &NoiseRouteGroup{ rg: rg, Conn: rg, } @@ -484,6 +488,8 @@ func (r *router) handleTransportPacket(ctx context.Context, packet routing.Packe return r.handleClosePacket(ctx, packet) case routing.KeepAlivePacket: return r.handleKeepAlivePacket(ctx, packet) + case routing.NetworkProbePacket: + return r.handleNetworkProbePacket(ctx, packet) default: return ErrUnknownPacketType } @@ -601,6 +607,58 @@ func (r *router) handleClosePacket(ctx context.Context, packet routing.Packet) e return nil } +func (r *router) handleNetworkProbePacket(ctx context.Context, packet routing.Packet) error { + rule, err := r.GetRule(packet.RouteID()) + if err != nil { + return err + } + + if rt := rule.Type(); rt == routing.RuleForward || rt == routing.RuleIntermediary { + r.logger.Debugf("Handling packet of type %s with route ID %d and next ID %d", packet.Type(), + packet.RouteID(), rule.NextRouteID()) + return r.forwardPacket(ctx, packet, rule) + } + + r.logger.Debugf("Handling packet of type %s with route ID %d", packet.Type(), packet.RouteID()) + + desc := rule.RouteDescriptor() + nrg, ok := r.noiseRouteGroup(desc) + + r.logger.Infof("Handling packet with descriptor %s", &desc) + + if ok { + if nrg == nil { + return errors.New("noiseRouteGroup is nil") + } + + // in this case we have already initialized nrg and may use it straightforward + r.logger.Infof("Got new remote packet with size %d and route ID %d. Using rule: %s", + len(packet.Payload()), packet.RouteID(), rule) + + return nrg.handlePacket(packet) + } + + // we don't have nrg for this packet. it's either handshake message or + // we don't have route for this one completely + + rg, ok := r.initializingRouteGroup(desc) + if !ok { + // no route, just return error + r.logger.Infof("Descriptor not found for rule with type %s, descriptor: %s", rule.Type(), &desc) + return errors.New("route descriptor does not exist") + } + + if rg == nil { + return errors.New("initializing RouteGroup is nil") + } + + // handshake packet, handling with the raw rg + r.logger.Infof("Got new remote packet with size %d and route ID %d. Using rule: %s", + len(packet.Payload()), packet.RouteID(), rule) + + return rg.handlePacket(packet) +} + func (r *router) handleKeepAlivePacket(ctx context.Context, packet routing.Packet) error { routeID := packet.RouteID() @@ -807,7 +865,7 @@ func (r *router) ReserveKeys(n int) ([]routing.RouteID, error) { return ids, err } -func (r *router) popNoiseRouteGroup(desc routing.RouteDescriptor) (*noiseRouteGroup, bool) { +func (r *router) popNoiseRouteGroup(desc routing.RouteDescriptor) (*NoiseRouteGroup, bool) { r.mx.Lock() defer r.mx.Unlock() @@ -821,7 +879,7 @@ func (r *router) popNoiseRouteGroup(desc routing.RouteDescriptor) (*noiseRouteGr return nrg, true } -func (r *router) noiseRouteGroup(desc routing.RouteDescriptor) (*noiseRouteGroup, bool) { +func (r *router) noiseRouteGroup(desc routing.RouteDescriptor) (*NoiseRouteGroup, bool) { r.mx.Lock() defer r.mx.Unlock() diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index 14408874c5..c0c7dd044b 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -200,11 +200,11 @@ func Test_router_NoiseRouteGroups(t *testing.T) { nrg1Ifc := <-nrg1IfcCh require.NotNil(t, nrg1Ifc) - nrg0, ok := nrg0Ifc.(*noiseRouteGroup) + nrg0, ok := nrg0Ifc.(*NoiseRouteGroup) require.True(t, ok) require.NotNil(t, nrg0) - nrg1, ok := nrg1Ifc.(*noiseRouteGroup) + nrg1, ok := nrg1Ifc.(*NoiseRouteGroup) require.True(t, ok) require.NotNil(t, nrg1) @@ -219,12 +219,19 @@ func Test_router_NoiseRouteGroups(t *testing.T) { require.Equal(t, len(data), n) require.Equal(t, data, received[:n]) + require.True(t, nrg0.IsAlive()) + require.True(t, nrg1.IsAlive()) + err = nrg0.Close() require.NoError(t, err) + require.False(t, nrg0.IsAlive()) + require.False(t, nrg1.IsAlive()) + require.True(t, nrg1.rg.isRemoteClosed()) err = nrg1.Close() require.NoError(t, err) + require.False(t, nrg1.IsAlive()) } func TestRouter_Serve(t *testing.T) { @@ -410,7 +417,7 @@ func testClosePacketRemote(t *testing.T, r0, r1 *router, pk1, pk2 cipher.PubKey, rg1 := NewRouteGroup(DefaultRouteGroupConfig(), r1.rt, rules.Desc) rg1.appendRules(rules.Forward, rules.Reverse, r1.tm.Transport(rules.Forward.NextTransportID())) - nrg1 := &noiseRouteGroup{rg: rg1} + nrg1 := &NoiseRouteGroup{rg: rg1} r1.rgsNs[rg1.desc] = nrg1 packet := routing.MakeClosePacket(intFwdID[0], routing.CloseRequested) @@ -471,7 +478,7 @@ func testClosePacketInitiator(t *testing.T, r0, r1 *router, pk1, pk2 cipher.PubK rg1 := NewRouteGroup(DefaultRouteGroupConfig(), r1.rt, rules.Desc) rg1.appendRules(rules.Forward, rules.Reverse, r1.tm.Transport(rules.Forward.NextTransportID())) - nrg1 := &noiseRouteGroup{rg: rg1} + nrg1 := &NoiseRouteGroup{rg: rg1} r1.rgsNs[rg1.desc] = nrg1 packet := routing.MakeClosePacket(intFwdID[0], routing.CloseRequested) @@ -518,7 +525,7 @@ func testForwardRule(t *testing.T, r0, r1 *router, tp1 *transport.ManagedTranspo rg0 := NewRouteGroup(DefaultRouteGroupConfig(), r0.rt, rules.Desc) rg0.appendRules(rules.Forward, rules.Reverse, r0.tm.Transport(rules.Forward.NextTransportID())) - nrg0 := &noiseRouteGroup{rg: rg0} + nrg0 := &NoiseRouteGroup{rg: rg0} r0.rgsNs[rg0.desc] = nrg0 // Call handleTransportPacket for r0 (this should in turn, use the rule we added). @@ -597,7 +604,7 @@ func testConsumeRule(t *testing.T, r0, r1 *router, tp1 *transport.ManagedTranspo rg1 := NewRouteGroup(DefaultRouteGroupConfig(), r1.rt, rules.Desc) rg1.appendRules(rules.Forward, rules.Reverse, r1.tm.Transport(rules.Forward.NextTransportID())) - nrg1 := &noiseRouteGroup{rg: rg1} + nrg1 := &NoiseRouteGroup{rg: rg1} r1.rgsNs[rg1.desc] = nrg1 packet, err := routing.MakeDataPacket(intFwdRtID[0], []byte("test intermediary forward")) @@ -743,7 +750,7 @@ func TestRouter_SetupIsTrusted(t *testing.T) { func clearRouteGroups(routers ...*router) { for _, r := range routers { - r.rgsNs = make(map[routing.RouteDescriptor]*noiseRouteGroup) + r.rgsNs = make(map[routing.RouteDescriptor]*NoiseRouteGroup) } } diff --git a/pkg/routing/packet.go b/pkg/routing/packet.go index 02d096b4c9..e3713e123c 100644 --- a/pkg/routing/packet.go +++ b/pkg/routing/packet.go @@ -41,6 +41,8 @@ func (t PacketType) String() string { return "ClosePacket" case KeepAlivePacket: return "KeepAlivePacket" + case NetworkProbePacket: + return "NetworkProbe" case HandshakePacket: return "Handshake" default: @@ -57,6 +59,7 @@ const ( ClosePacket KeepAlivePacket HandshakePacket + NetworkProbePacket ) // CloseCode represents close code for ClosePacket. @@ -119,6 +122,19 @@ func MakeKeepAlivePacket(id RouteID) Packet { return packet } +// MakeNetworkProbePacket constructs a new NetworkProbePacket. +func MakeNetworkProbePacket(id RouteID, timestamp, throughput int64) Packet { + packet := make([]byte, PacketHeaderSize+16) + + packet[PacketTypeOffset] = byte(NetworkProbePacket) + binary.BigEndian.PutUint32(packet[PacketRouteIDOffset:], uint32(id)) + binary.BigEndian.PutUint16(packet[PacketPayloadSizeOffset:], uint16(16)) + binary.BigEndian.PutUint64(packet[PacketPayloadOffset:], uint64(timestamp)) + binary.BigEndian.PutUint64(packet[PacketPayloadOffset+8:], uint64(throughput)) + + return packet +} + // MakeHandshakePacket constructs a new HandshakePacket. func MakeHandshakePacket(id RouteID, supportEncryption bool) Packet { packet := make([]byte, PacketHeaderSize+1) diff --git a/pkg/setup/mock_id_reserver.go b/pkg/setup/mock_id_reserver.go index 8e8eace704..7be28aed22 100644 --- a/pkg/setup/mock_id_reserver.go +++ b/pkg/setup/mock_id_reserver.go @@ -2,17 +2,11 @@ package setup -import ( - context "context" - - cipher "github.com/skycoin/dmsg/cipher" - - mock "github.com/stretchr/testify/mock" - - routerclient "github.com/skycoin/skywire/pkg/router/routerclient" - - routing "github.com/skycoin/skywire/pkg/routing" -) +import cipher "github.com/skycoin/dmsg/cipher" +import context "context" +import mock "github.com/stretchr/testify/mock" +import routerclient "github.com/skycoin/skywire/pkg/router/routerclient" +import routing "github.com/skycoin/skywire/pkg/routing" // MockIDReserver is an autogenerated mock type for the IDReserver type type MockIDReserver struct { diff --git a/pkg/setup/setupclient/mock_route_group_dialer.go b/pkg/setup/setupclient/mock_route_group_dialer.go index bd1d127d06..4d40501d90 100644 --- a/pkg/setup/setupclient/mock_route_group_dialer.go +++ b/pkg/setup/setupclient/mock_route_group_dialer.go @@ -2,19 +2,12 @@ package setupclient -import ( - context "context" - - cipher "github.com/skycoin/dmsg/cipher" - - logging "github.com/skycoin/skycoin/src/util/logging" - - mock "github.com/stretchr/testify/mock" - - routing "github.com/skycoin/skywire/pkg/routing" - - snet "github.com/skycoin/skywire/pkg/snet" -) +import cipher "github.com/skycoin/dmsg/cipher" +import context "context" +import logging "github.com/skycoin/skycoin/src/util/logging" +import mock "github.com/stretchr/testify/mock" +import routing "github.com/skycoin/skywire/pkg/routing" +import snet "github.com/skycoin/skywire/pkg/snet" // MockRouteGroupDialer is an autogenerated mock type for the RouteGroupDialer type type MockRouteGroupDialer struct { diff --git a/pkg/snet/arclient/mock_api_client.go b/pkg/snet/arclient/mock_api_client.go index 955bcb6d03..eb1dea0841 100644 --- a/pkg/snet/arclient/mock_api_client.go +++ b/pkg/snet/arclient/mock_api_client.go @@ -2,15 +2,10 @@ package arclient -import ( - context "context" - - cipher "github.com/skycoin/dmsg/cipher" - - mock "github.com/stretchr/testify/mock" - - pfilter "github.com/AudriusButkevicius/pfilter" -) +import cipher "github.com/skycoin/dmsg/cipher" +import context "context" +import mock "github.com/stretchr/testify/mock" +import pfilter "github.com/AudriusButkevicius/pfilter" // MockAPIClient is an autogenerated mock type for the APIClient type type MockAPIClient struct { diff --git a/pkg/snet/mock_dialer.go b/pkg/snet/mock_dialer.go index cf359fd18f..d5355cfca4 100644 --- a/pkg/snet/mock_dialer.go +++ b/pkg/snet/mock_dialer.go @@ -2,15 +2,10 @@ package snet -import ( - context "context" - - cipher "github.com/skycoin/dmsg/cipher" - - mock "github.com/stretchr/testify/mock" - - net "net" -) +import cipher "github.com/skycoin/dmsg/cipher" +import context "context" +import mock "github.com/stretchr/testify/mock" +import net "net" // MockDialer is an autogenerated mock type for the Dialer type type MockDialer struct { diff --git a/pkg/visor/api.go b/pkg/visor/api.go index ee399f040b..1df9c6b647 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -14,6 +14,7 @@ import ( "github.com/skycoin/dmsg/buildinfo" "github.com/skycoin/dmsg/cipher" + "github.com/skycoin/skywire/pkg/app/appserver" "github.com/skycoin/skywire/pkg/app/launcher" "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/skyenv" @@ -37,6 +38,7 @@ type API interface { SetAppPassword(appName, password string) error SetAppPK(appName string, pk cipher.PubKey) error LogsSince(timestamp time.Time, appName string) ([]string, error) + GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) TransportTypes() ([]string, error) Transports(types []string, pks []cipher.PubKey, logs bool) ([]*TransportSummary, error) @@ -352,6 +354,16 @@ func (v *Visor) LogsSince(timestamp time.Time, appName string) ([]string, error) return res, nil } +// GetAppConnectionsSummary implements API. +func (v *Visor) GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) { + summary, err := v.procM.ConnectionsSummary(appName) + if err != nil { + return nil, err + } + + return summary, nil +} + // TransportTypes implements API. func (v *Visor) TransportTypes() ([]string, error) { return v.tpM.Networks(), nil diff --git a/pkg/visor/hypervisor.go b/pkg/visor/hypervisor.go index 1fdf6ee155..c1687a9141 100644 --- a/pkg/visor/hypervisor.go +++ b/pkg/visor/hypervisor.go @@ -207,6 +207,7 @@ func (hv *Hypervisor) makeMux() *chi.Mux { r.Get("/visors/{pk}/apps/{app}", hv.getApp()) r.Put("/visors/{pk}/apps/{app}", hv.putApp()) r.Get("/visors/{pk}/apps/{app}/logs", hv.appLogsSince()) + r.Get("/visors/{pk}/apps/{app}/connections", hv.appConnections()) r.Get("/visors/{pk}/transport-types", hv.getTransportTypes()) r.Get("/visors/{pk}/transports", hv.getTransports()) r.Post("/visors/{pk}/transports", hv.postTransport()) @@ -570,6 +571,18 @@ func (hv *Hypervisor) appLogsSince() http.HandlerFunc { }) } +func (hv *Hypervisor) appConnections() http.HandlerFunc { + return hv.withCtx(hv.appCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { + summary, err := ctx.API.GetAppConnectionsSummary(ctx.App.Name) + if err != nil { + httputil.WriteJSON(w, r, http.StatusInternalServerError, err) + return + } + + httputil.WriteJSON(w, r, http.StatusOK, &summary) + }) +} + func (hv *Hypervisor) getTransportTypes() http.HandlerFunc { return hv.withCtx(hv.visorCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { types, err := ctx.API.TransportTypes() diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index 18a3b81fd0..ff2aee6839 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" "github.com/skycoin/dmsg/cipher" + "github.com/skycoin/skywire/pkg/app/appserver" "github.com/skycoin/skywire/pkg/app/launcher" "github.com/skycoin/skywire/pkg/routing" "github.com/skycoin/skywire/pkg/transport" @@ -258,6 +259,18 @@ func (r *RPC) SetAppPK(in *SetAppPKIn, _ *struct{}) (err error) { return r.visor.SetAppPK(in.AppName, in.PK) } +// GetAppConnectionsSummary returns connections stats for the app. +func (r *RPC) GetAppConnectionsSummary(appName *string, out *[]appserver.ConnectionSummary) (err error) { + defer rpcutil.LogCall(r.log, "GetAppConnectionsSummary", appName)(out, &err) + + summary, err := r.visor.GetAppConnectionsSummary(*appName) + if summary != nil { + *out = summary + } + + return err +} + /* <<< TRANSPORT MANAGEMENT >>> */ diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 4b17708939..13091ae4d9 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -20,6 +20,7 @@ import ( "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/skywire/pkg/app/appcommon" + "github.com/skycoin/skywire/pkg/app/appserver" "github.com/skycoin/skywire/pkg/app/launcher" "github.com/skycoin/skywire/pkg/router" "github.com/skycoin/skywire/pkg/routing" @@ -175,6 +176,17 @@ func (rc *rpcClient) LogsSince(timestamp time.Time, appName string) ([]string, e return res, nil } +// GetAppConnectionsSummary get connections stats for the app. +func (rc *rpcClient) GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) { + var summary []appserver.ConnectionSummary + + if err := rc.Call("GetAppConnectionsSummary", &appName, &summary); err != nil { + return nil, err + } + + return summary, nil +} + // TransportTypes calls TransportTypes. func (rc *rpcClient) TransportTypes() ([]string, error) { var types []string @@ -635,6 +647,11 @@ func (mc *mockRPCClient) LogsSince(timestamp time.Time, _ string) ([]string, err return mc.logS.LogsSince(timestamp) } +// GetAppConnectionsSummary get connections stats for the app. +func (mc *mockRPCClient) GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) { + return nil, nil +} + // TransportTypes implements API. func (mc *mockRPCClient) TransportTypes() ([]string, error) { return mc.tpTypes, nil diff --git a/pkg/visor/visorconfig/README.md b/pkg/visor/visorconfig/README.md index 4631386f0a..8e4fbeb298 100644 --- a/pkg/visor/visorconfig/README.md +++ b/pkg/visor/visorconfig/README.md @@ -18,19 +18,9 @@ - `hypervisor` (*[Config](#Config)) -# V1Routing - -- `setup_nodes` () -- `route_finder` (string) -- `route_finder_timeout` (Duration) - - -# V1Dmsgpty +# V1UptimeTracker -- `port` (uint16) -- `authorization_file` (string) -- `cli_network` (string) -- `cli_address` (string) +- `addr` (string) # V1Transport @@ -41,12 +31,6 @@ - `trusted_visors` () -# V1LogStore - -- `type` (string) - Type defines the log store type. Valid values: file, memory. -- `location` (string) - - # V1Launcher - `discovery` (*[V1AppDisc](#V1AppDisc)) @@ -56,53 +40,40 @@ - `local_path` (string) -# V1UptimeTracker - -- `addr` (string) - - # V1AppDisc - `update_interval` (Duration) - `proxy_discovery_addr` (string) -# Common - -- `path` (string) -- `log` (*[MasterLogger](#MasterLogger)) -- `version` (string) -- `sk` (SecKey) -- `pk` (PubKey) - - -# RWMutex +# V1LogStore -- `w` ([Mutex](#Mutex)) -- `writerSem` (uint32) -- `readerSem` (uint32) -- `readerCount` (int32) -- `readerWait` (int32) +- `type` (string) - Type defines the log store type. Valid values: file, memory. +- `location` (string) -# Mutex +# V1Dmsgpty -- `state` (int32) -- `sema` (uint32) +- `port` (uint16) +- `authorization_file` (string) +- `cli_network` (string) +- `cli_address` (string) -# STCPConfig +# V1Routing -- `pk_table` () -- `local_address` (string) +- `setup_nodes` () +- `route_finder` (string) +- `route_finder_timeout` (Duration) -# AppConfig +# Common -- `name` (string) -- `args` ([]string) -- `auto_start` (bool) -- `port` (Port) +- `path` (string) +- `log` (*[MasterLogger](#MasterLogger)) +- `version` (string) +- `sk` (SecKey) +- `pk` (PubKey) # DmsgConfig @@ -136,6 +107,27 @@ - `-` (bool) +# RWMutex + +- `w` ([Mutex](#Mutex)) +- `writerSem` (uint32) +- `readerSem` (uint32) +- `readerCount` (int32) +- `readerWait` (int32) + + +# Mutex + +- `state` (int32) +- `sema` (uint32) + + +# STCPConfig + +- `pk_table` () +- `local_address` (string) + + # MasterLogger - `` (*[Logger](#Logger)) @@ -144,3 +136,11 @@ # Logger - `` (FieldLogger) + + +# AppConfig + +- `name` (string) +- `args` ([]string) +- `auto_start` (bool) +- `port` (Port)