From dd0956376256d5e3d8e3792e22b26b1a8346b74a Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 22 Dec 2020 23:03:31 +0300 Subject: [PATCH 1/3] Add `AppStats` endpoint to hypervisor This endpoint provides app start time along with the stats for open app connections --- pkg/app/appserver/app_stats.go | 10 ++++++++++ pkg/app/appserver/proc.go | 18 ++++++++++++++++++ pkg/app/appserver/proc_manager.go | 19 +++++++++++++++++++ pkg/visor/api.go | 10 ++++++++++ pkg/visor/hypervisor.go | 13 +++++++++++++ pkg/visor/rpc.go | 11 +++++++++++ pkg/visor/rpc_client.go | 16 +++++++++++++++- 7 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 pkg/app/appserver/app_stats.go diff --git a/pkg/app/appserver/app_stats.go b/pkg/app/appserver/app_stats.go new file mode 100644 index 0000000000..2277fb3a89 --- /dev/null +++ b/pkg/app/appserver/app_stats.go @@ -0,0 +1,10 @@ +package appserver + +import ( + "time" +) + +type AppStats struct { + Connections []ConnectionSummary `json:"connections"` + StartTime *time.Time `json:"start_time,omitempty"` +} diff --git a/pkg/app/appserver/proc.go b/pkg/app/appserver/proc.go index 5ae26d4ce8..3f600f0899 100644 --- a/pkg/app/appserver/proc.go +++ b/pkg/app/appserver/proc.go @@ -48,6 +48,9 @@ type Proc struct { m ProcManager appName string + + startTimeMx sync.RWMutex + startTime time.Time } // NewProc constructs `Proc`. @@ -88,6 +91,17 @@ func (p *Proc) Cmd() *exec.Cmd { return p.cmd } +func (p *Proc) StartTime() (time.Time, bool) { + if !p.IsRunning() { + return time.Time{}, false + } + + p.startTimeMx.RLock() + defer p.startTimeMx.RUnlock() + + return p.startTime, true +} + // InjectConn introduces the connection to the Proc after it is started. // Only the first call will return true. // It also prepares the RPC gateway. @@ -155,6 +169,10 @@ func (p *Proc) Start() error { return err } + p.startTimeMx.Lock() + p.startTime = time.Now().UTC() + p.startTimeMx.Unlock() + go func() { waitErrCh := make(chan error) go func() { diff --git a/pkg/app/appserver/proc_manager.go b/pkg/app/appserver/proc_manager.go index 05e64bca03..ad4b90383a 100644 --- a/pkg/app/appserver/proc_manager.go +++ b/pkg/app/appserver/proc_manager.go @@ -41,6 +41,7 @@ type ProcManager interface { Stop(appName string) error Wait(appName string) error Range(next func(appName string, proc *Proc) bool) + Stats(appName string) (AppStats, error) ConnectionsSummary(appName string) ([]ConnectionSummary, error) Addr() net.Addr } @@ -267,6 +268,24 @@ func (m *procManager) Range(next func(name string, proc *Proc) bool) { } } +func (m *procManager) Stats(appName string) (AppStats, error) { + p, err := m.get(appName) + if err != nil { + return AppStats{}, err + } + + stats := AppStats{ + Connections: p.ConnectionsSummary(), + } + + startTime, ok := p.StartTime() + if ok { + stats.StartTime = &startTime + } + + return stats, nil +} + func (m *procManager) ConnectionsSummary(appName string) ([]ConnectionSummary, error) { p, err := m.get(appName) if err != nil { diff --git a/pkg/visor/api.go b/pkg/visor/api.go index 2415bd58ac..747831e30f 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -41,6 +41,7 @@ type API interface { SetAppSecure(appName string, isSecure bool) error SetAppKillswitch(appName string, killswitch bool) error LogsSince(timestamp time.Time, appName string) ([]string, error) + GetAppStats(appName string) (appserver.AppStats, error) GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) TransportTypes() ([]string, error) @@ -413,6 +414,15 @@ func (v *Visor) LogsSince(timestamp time.Time, appName string) ([]string, error) return res, nil } +func (v *Visor) GetAppStats(appName string) (appserver.AppStats, error) { + stats, err := v.procM.Stats(appName) + if err != nil { + return appserver.AppStats{}, err + } + + return stats, nil +} + // GetAppConnectionsSummary implements API. func (v *Visor) GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) { summary, err := v.procM.ConnectionsSummary(appName) diff --git a/pkg/visor/hypervisor.go b/pkg/visor/hypervisor.go index a89364dbce..f96ec74327 100644 --- a/pkg/visor/hypervisor.go +++ b/pkg/visor/hypervisor.go @@ -236,6 +236,7 @@ func (hv *Hypervisor) makeMux() chi.Router { r.Get("/visors/{pk}/apps/{app}", hv.getApp()) r.Put("/visors/{pk}/apps/{app}", hv.putApp()) r.Get("/visors/{pk}/apps/{app}/logs", hv.appLogsSince()) + r.Get("/visors/{pk}/apps/{app}/stats", hv.getAppStats()) r.Get("/visors/{pk}/apps/{app}/connections", hv.appConnections()) r.Get("/visors/{pk}/transport-types", hv.getTransportTypes()) r.Get("/visors/{pk}/transports", hv.getTransports()) @@ -509,6 +510,18 @@ func (hv *Hypervisor) getApp() http.HandlerFunc { }) } +func (hv *Hypervisor) getAppStats() http.HandlerFunc { + return hv.withCtx(hv.appCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) { + stats, err := ctx.API.GetAppStats(ctx.App.Name) + if err != nil { + httputil.WriteJSON(w, r, http.StatusInternalServerError, err) + return + } + + httputil.WriteJSON(w, r, http.StatusOK, &stats) + }) +} + // TODO: simplify // nolint: funlen,gocognit,godox func (hv *Hypervisor) putApp() http.HandlerFunc { diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index faf2d86e64..a99e712582 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -286,6 +286,17 @@ func (r *RPC) SetAppSecure(in *SetAppBoolIn, _ *struct{}) (err error) { return r.visor.SetAppSecure(in.AppName, in.Val) } +func (r *RPC) GetAppStats(appName *string, out *appserver.AppStats) (err error) { + defer rpcutil.LogCall(r.log, "GetAppStats", appName)(out, &err) + + stats, err := r.visor.GetAppStats(*appName) + if err != nil { + *out = stats + } + + return err +} + // GetAppConnectionsSummary returns connections stats for the app. func (r *RPC) GetAppConnectionsSummary(appName *string, out *[]appserver.ConnectionSummary) (err error) { defer rpcutil.LogCall(r.log, "GetAppConnectionsSummary", appName)(out, &err) diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index 4d8f0b9d1a..5057828e86 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -197,6 +197,16 @@ func (rc *rpcClient) LogsSince(timestamp time.Time, appName string) ([]string, e return res, nil } +func (rc *rpcClient) GetAppStats(appName string) (appserver.AppStats, error) { + var stats appserver.AppStats + + if err := rc.Call("GetAppStats", &appName, &stats); err != nil { + return appserver.AppStats{}, err + } + + return stats, nil +} + // GetAppConnectionsSummary get connections stats for the app. func (rc *rpcClient) GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) { var summary []appserver.ConnectionSummary @@ -703,8 +713,12 @@ func (mc *mockRPCClient) LogsSince(timestamp time.Time, _ string) ([]string, err return mc.logS.LogsSince(timestamp) } +func (mc *mockRPCClient) GetAppStats(_ string) (appserver.AppStats, error) { + return appserver.AppStats{}, nil +} + // GetAppConnectionsSummary get connections stats for the app. -func (mc *mockRPCClient) GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) { +func (mc *mockRPCClient) GetAppConnectionsSummary(_ string) ([]appserver.ConnectionSummary, error) { return nil, nil } From c83de9e074f67ddb00f833881009acb52a84815c Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 22 Dec 2020 23:56:37 +0300 Subject: [PATCH 2/3] Add `BandwidthReceived` field to app stats reponse --- pkg/app/appnet/skywire_conn.go | 4 ++++ pkg/app/appserver/proc.go | 20 +++++++++++--------- pkg/router/network_stats.go | 18 ++++++++++++------ pkg/router/noise_route_group.go | 4 ++++ pkg/router/route_group.go | 4 ++++ 5 files changed, 35 insertions(+), 15 deletions(-) diff --git a/pkg/app/appnet/skywire_conn.go b/pkg/app/appnet/skywire_conn.go index b925625b87..613f734278 100644 --- a/pkg/app/appnet/skywire_conn.go +++ b/pkg/app/appnet/skywire_conn.go @@ -37,6 +37,10 @@ func (c *SkywireConn) BandwidthSent() uint64 { return c.nrg.BandwidthSent() } +func (c *SkywireConn) BandwidthReceived() uint64 { + return c.nrg.BandwidthReceived() +} + // Close closes connection. func (c *SkywireConn) Close() error { var err error diff --git a/pkg/app/appserver/proc.go b/pkg/app/appserver/proc.go index 3f600f0899..15f6a2244a 100644 --- a/pkg/app/appserver/proc.go +++ b/pkg/app/appserver/proc.go @@ -282,11 +282,12 @@ func (p *Proc) IsRunning() bool { // ConnectionSummary sums up the connection stats. type ConnectionSummary struct { - IsAlive bool `json:"is_alive"` - Latency time.Duration `json:"latency"` - Throughput uint32 `json:"throughput"` - BandwidthSent uint64 `json:"bandwidth_sent"` - Error string `json:"error"` + IsAlive bool `json:"is_alive"` + Latency time.Duration `json:"latency"` + Throughput uint32 `json:"throughput"` + BandwidthSent uint64 `json:"bandwidth_sent"` + BandwidthReceived uint64 `json:"bandwidth_received"` + Error string `json:"error"` } // ConnectionsSummary returns all of the proc's connections stats. @@ -322,10 +323,11 @@ func (p *Proc) ConnectionsSummary() []ConnectionSummary { } summaries = append(summaries, ConnectionSummary{ - IsAlive: skywireConn.IsAlive(), - Latency: skywireConn.Latency(), - Throughput: skywireConn.Throughput(), - BandwidthSent: skywireConn.BandwidthSent(), + IsAlive: skywireConn.IsAlive(), + Latency: skywireConn.Latency(), + Throughput: skywireConn.Throughput(), + BandwidthSent: skywireConn.BandwidthSent(), + BandwidthReceived: skywireConn.BandwidthReceived(), }) return true diff --git a/pkg/router/network_stats.go b/pkg/router/network_stats.go index f7862d0d9e..f08747ab32 100644 --- a/pkg/router/network_stats.go +++ b/pkg/router/network_stats.go @@ -7,10 +7,11 @@ import ( ) type networkStats struct { - bandwidthSent uint64 - bandwidthReceived uint64 - latency uint32 - throughput uint32 + totalBandwidthSent uint64 + totalBandwidthReceived uint64 + bandwidthReceived uint64 + latency uint32 + throughput uint32 bandwidthReceivedRecStartMu sync.Mutex bandwidthReceivedRecStart time.Time @@ -41,15 +42,20 @@ func (s *networkStats) LocalThroughput() uint32 { } func (s *networkStats) BandwidthSent() uint64 { - return atomic.LoadUint64(&s.bandwidthSent) + return atomic.LoadUint64(&s.totalBandwidthSent) } func (s *networkStats) AddBandwidthSent(amount uint64) { - atomic.AddUint64(&s.bandwidthSent, amount) + atomic.AddUint64(&s.totalBandwidthSent, amount) +} + +func (s *networkStats) BandwidthReceived() uint64 { + return atomic.LoadUint64(&s.totalBandwidthReceived) } func (s *networkStats) AddBandwidthReceived(amount uint64) { atomic.AddUint64(&s.bandwidthReceived, amount) + atomic.AddUint64(&s.bandwidthReceived, amount) } func (s *networkStats) RemoteThroughput() int64 { diff --git a/pkg/router/noise_route_group.go b/pkg/router/noise_route_group.go index 637e9d6260..a51ae1f81e 100644 --- a/pkg/router/noise_route_group.go +++ b/pkg/router/noise_route_group.go @@ -44,6 +44,10 @@ func (nrg *NoiseRouteGroup) BandwidthSent() uint64 { return nrg.rg.BandwidthSent() } +func (nrg *NoiseRouteGroup) BandwidthReceived() uint64 { + return nrg.rg.BandwidthReceived() +} + func (nrg *NoiseRouteGroup) isClosed() bool { return nrg.rg.isClosed() } diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 9fabace397..4e3c2509e7 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -266,6 +266,10 @@ func (rg *RouteGroup) BandwidthSent() uint64 { return rg.networkStats.BandwidthSent() } +func (rg *RouteGroup) BandwidthReceived() uint64 { + return rg.networkStats.BandwidthReceived() +} + // read reads incoming data. It tries to fetch the data from the internal buffer. // If buffer is empty it blocks on receiving from the data channel func (rg *RouteGroup) read(p []byte) (int, error) { From 358f7bbfc7891c91631f0fa7dc1372ada1ea592e Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Wed, 23 Dec 2020 01:22:33 +0300 Subject: [PATCH 3/3] Divide throughput to upload/download speed --- pkg/app/appnet/skywire_conn.go | 12 +++++++++--- pkg/app/appserver/app_stats.go | 1 + pkg/app/appserver/proc.go | 7 +++++-- pkg/router/network_stats.go | 19 ++++++++++++++----- pkg/router/noise_route_group.go | 12 +++++++++--- pkg/router/route_group.go | 16 ++++++++++++---- pkg/visor/api.go | 1 + pkg/visor/rpc.go | 1 + 8 files changed, 52 insertions(+), 17 deletions(-) diff --git a/pkg/app/appnet/skywire_conn.go b/pkg/app/appnet/skywire_conn.go index 613f734278..e4d616d247 100644 --- a/pkg/app/appnet/skywire_conn.go +++ b/pkg/app/appnet/skywire_conn.go @@ -27,9 +27,14 @@ func (c *SkywireConn) Latency() time.Duration { return c.nrg.Latency() } -// Throughput returns throughput till remote (bytes/s). -func (c *SkywireConn) Throughput() uint32 { - return c.nrg.Throughput() +// UploadSpeed returns upload speed (bytes/s). +func (c *SkywireConn) UploadSpeed() uint32 { + return c.nrg.UploadSpeed() +} + +// DownloadSpeed returns download speed (bytes/s). +func (c *SkywireConn) DownloadSpeed() uint32 { + return c.nrg.DownloadSpeed() } // BandwidthSent returns amount of bandwidth sent (bytes). @@ -37,6 +42,7 @@ func (c *SkywireConn) BandwidthSent() uint64 { return c.nrg.BandwidthSent() } +// BandwidthReceived returns amount of bandwidth received (bytes). func (c *SkywireConn) BandwidthReceived() uint64 { return c.nrg.BandwidthReceived() } diff --git a/pkg/app/appserver/app_stats.go b/pkg/app/appserver/app_stats.go index 2277fb3a89..2a454555fb 100644 --- a/pkg/app/appserver/app_stats.go +++ b/pkg/app/appserver/app_stats.go @@ -4,6 +4,7 @@ import ( "time" ) +// AppStats contains app runtime statistics. type AppStats struct { Connections []ConnectionSummary `json:"connections"` StartTime *time.Time `json:"start_time,omitempty"` diff --git a/pkg/app/appserver/proc.go b/pkg/app/appserver/proc.go index 15f6a2244a..e6b94e6c46 100644 --- a/pkg/app/appserver/proc.go +++ b/pkg/app/appserver/proc.go @@ -91,6 +91,7 @@ func (p *Proc) Cmd() *exec.Cmd { return p.cmd } +// StartTime returns app start time. func (p *Proc) StartTime() (time.Time, bool) { if !p.IsRunning() { return time.Time{}, false @@ -284,7 +285,8 @@ func (p *Proc) IsRunning() bool { type ConnectionSummary struct { IsAlive bool `json:"is_alive"` Latency time.Duration `json:"latency"` - Throughput uint32 `json:"throughput"` + UploadSpeed uint32 `json:"upload_speed"` + DownloadSpeed uint32 `json:"download_speed"` BandwidthSent uint64 `json:"bandwidth_sent"` BandwidthReceived uint64 `json:"bandwidth_received"` Error string `json:"error"` @@ -325,7 +327,8 @@ func (p *Proc) ConnectionsSummary() []ConnectionSummary { summaries = append(summaries, ConnectionSummary{ IsAlive: skywireConn.IsAlive(), Latency: skywireConn.Latency(), - Throughput: skywireConn.Throughput(), + UploadSpeed: skywireConn.UploadSpeed(), + DownloadSpeed: skywireConn.DownloadSpeed(), BandwidthSent: skywireConn.BandwidthSent(), BandwidthReceived: skywireConn.BandwidthReceived(), }) diff --git a/pkg/router/network_stats.go b/pkg/router/network_stats.go index f08747ab32..75cf28afc0 100644 --- a/pkg/router/network_stats.go +++ b/pkg/router/network_stats.go @@ -11,7 +11,8 @@ type networkStats struct { totalBandwidthReceived uint64 bandwidthReceived uint64 latency uint32 - throughput uint32 + uploadSpeed uint32 + downloadSpeed uint32 bandwidthReceivedRecStartMu sync.Mutex bandwidthReceivedRecStart time.Time @@ -33,12 +34,20 @@ func (s *networkStats) Latency() time.Duration { return time.Duration(latencyMs) } -func (s *networkStats) SetLocalThroughput(throughput uint32) { - atomic.StoreUint32(&s.throughput, throughput) +func (s *networkStats) SetUploadSpeed(speed uint32) { + atomic.StoreUint32(&s.uploadSpeed, speed) } -func (s *networkStats) LocalThroughput() uint32 { - return atomic.LoadUint32(&s.throughput) +func (s *networkStats) UploadSpeed() uint32 { + return atomic.LoadUint32(&s.uploadSpeed) +} + +func (s *networkStats) SetDownloadSpeed(speed uint32) { + atomic.StoreUint32(&s.downloadSpeed, speed) +} + +func (s *networkStats) DownloadSpeed() uint32 { + return atomic.LoadUint32(&s.downloadSpeed) } func (s *networkStats) BandwidthSent() uint64 { diff --git a/pkg/router/noise_route_group.go b/pkg/router/noise_route_group.go index a51ae1f81e..d28c1019b8 100644 --- a/pkg/router/noise_route_group.go +++ b/pkg/router/noise_route_group.go @@ -34,9 +34,14 @@ func (nrg *NoiseRouteGroup) Latency() time.Duration { return nrg.rg.Latency() } -// Throughput returns throughput till remote (bytes/s). -func (nrg *NoiseRouteGroup) Throughput() uint32 { - return nrg.rg.Throughput() +// UploadSpeed returns upload speed (bytes/s). +func (nrg *NoiseRouteGroup) UploadSpeed() uint32 { + return nrg.rg.UploadSpeed() +} + +// DownloadSpeed returns upload speed (bytes/s). +func (nrg *NoiseRouteGroup) DownloadSpeed() uint32 { + return nrg.rg.DownloadSpeed() } // BandwidthSent returns amount of bandwidth sent (bytes). @@ -44,6 +49,7 @@ func (nrg *NoiseRouteGroup) BandwidthSent() uint64 { return nrg.rg.BandwidthSent() } +// BandwidthReceived returns amount of bandwidth received (bytes). func (nrg *NoiseRouteGroup) BandwidthReceived() uint64 { return nrg.rg.BandwidthReceived() } diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 4e3c2509e7..882a51b987 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -256,9 +256,14 @@ func (rg *RouteGroup) Latency() time.Duration { return rg.networkStats.Latency() } -// Throughput returns throughput till remote (bytes/s). -func (rg *RouteGroup) Throughput() uint32 { - return rg.networkStats.LocalThroughput() +// UploadSpeed returns upload speed (bytes/s). +func (rg *RouteGroup) UploadSpeed() uint32 { + return rg.networkStats.UploadSpeed() +} + +// DownloadSpeed returns download speed (bytes/s). +func (rg *RouteGroup) DownloadSpeed() uint32 { + return rg.networkStats.DownloadSpeed() } // BandwidthSent returns amount of bandwidth sent (bytes). @@ -266,6 +271,7 @@ func (rg *RouteGroup) BandwidthSent() uint64 { return rg.networkStats.BandwidthSent() } +// BandwidthReceived returns amount of bandwidth received (bytes). func (rg *RouteGroup) BandwidthReceived() uint64 { return rg.networkStats.BandwidthReceived() } @@ -418,6 +424,8 @@ func (rg *RouteGroup) sendNetworkProbe() error { throughput := rg.networkStats.RemoteThroughput() timestamp := time.Now().UnixNano() / int64(time.Millisecond) + rg.networkStats.SetDownloadSpeed(uint32(throughput)) + packet := routing.MakeNetworkProbePacket(rule.NextRouteID(), timestamp, throughput) if err := rg.writePacket(context.Background(), tp, packet, rule.KeyRouteID()); err != nil { @@ -609,7 +617,7 @@ func (rg *RouteGroup) handleNetworkProbePacket(packet routing.Packet) error { sentAt := time.Unix(int64(sentAtMs/1000), int64(ms)*int64(time.Millisecond)) rg.networkStats.SetLatency(time.Since(sentAt)) - rg.networkStats.SetLocalThroughput(uint32(throughput)) + rg.networkStats.SetUploadSpeed(uint32(throughput)) return nil } diff --git a/pkg/visor/api.go b/pkg/visor/api.go index 747831e30f..c01f3064b2 100644 --- a/pkg/visor/api.go +++ b/pkg/visor/api.go @@ -414,6 +414,7 @@ func (v *Visor) LogsSince(timestamp time.Time, appName string) ([]string, error) return res, nil } +// GetAppStats implements API. func (v *Visor) GetAppStats(appName string) (appserver.AppStats, error) { stats, err := v.procM.Stats(appName) if err != nil { diff --git a/pkg/visor/rpc.go b/pkg/visor/rpc.go index a99e712582..81308c2860 100644 --- a/pkg/visor/rpc.go +++ b/pkg/visor/rpc.go @@ -286,6 +286,7 @@ func (r *RPC) SetAppSecure(in *SetAppBoolIn, _ *struct{}) (err error) { return r.visor.SetAppSecure(in.AppName, in.Val) } +// GetAppStats gets app runtime statistics. func (r *RPC) GetAppStats(appName *string, out *appserver.AppStats) (err error) { defer rpcutil.LogCall(r.log, "GetAppStats", appName)(out, &err)