Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional app stats #649

Merged
merged 4 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions pkg/app/appnet/skywire_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,26 @@ func (c *SkywireConn) Latency() time.Duration {
return c.nrg.Latency()
}

// Throughput returns throughput till remote (bytes/s).
func (c *SkywireConn) Throughput() uint32 {
return c.nrg.Throughput()
// UploadSpeed returns upload speed (bytes/s).
func (c *SkywireConn) UploadSpeed() uint32 {
return c.nrg.UploadSpeed()
}

// DownloadSpeed returns download speed (bytes/s).
func (c *SkywireConn) DownloadSpeed() uint32 {
return c.nrg.DownloadSpeed()
}

// BandwidthSent returns amount of bandwidth sent (bytes).
func (c *SkywireConn) BandwidthSent() uint64 {
return c.nrg.BandwidthSent()
}

// BandwidthReceived returns amount of bandwidth received (bytes).
func (c *SkywireConn) BandwidthReceived() uint64 {
return c.nrg.BandwidthReceived()
}

// Close closes connection.
func (c *SkywireConn) Close() error {
var err error
Expand Down
11 changes: 11 additions & 0 deletions pkg/app/appserver/app_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package appserver

import (
"time"
)

// AppStats contains app runtime statistics.
type AppStats struct {
Connections []ConnectionSummary `json:"connections"`
StartTime *time.Time `json:"start_time,omitempty"`
}
41 changes: 32 additions & 9 deletions pkg/app/appserver/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Proc struct {

m ProcManager
appName string

startTimeMx sync.RWMutex
startTime time.Time
}

// NewProc constructs `Proc`.
Expand Down Expand Up @@ -88,6 +91,18 @@ func (p *Proc) Cmd() *exec.Cmd {
return p.cmd
}

// StartTime returns app start time.
func (p *Proc) StartTime() (time.Time, bool) {
if !p.IsRunning() {
return time.Time{}, false
}

p.startTimeMx.RLock()
defer p.startTimeMx.RUnlock()

return p.startTime, true
}

// InjectConn introduces the connection to the Proc after it is started.
// Only the first call will return true.
// It also prepares the RPC gateway.
Expand Down Expand Up @@ -155,6 +170,10 @@ func (p *Proc) Start() error {
return err
}

p.startTimeMx.Lock()
p.startTime = time.Now().UTC()
p.startTimeMx.Unlock()

go func() {
waitErrCh := make(chan error)
go func() {
Expand Down Expand Up @@ -264,11 +283,13 @@ func (p *Proc) IsRunning() bool {

// ConnectionSummary sums up the connection stats.
type ConnectionSummary struct {
IsAlive bool `json:"is_alive"`
Latency time.Duration `json:"latency"`
Throughput uint32 `json:"throughput"`
BandwidthSent uint64 `json:"bandwidth_sent"`
Error string `json:"error"`
IsAlive bool `json:"is_alive"`
Latency time.Duration `json:"latency"`
UploadSpeed uint32 `json:"upload_speed"`
DownloadSpeed uint32 `json:"download_speed"`
BandwidthSent uint64 `json:"bandwidth_sent"`
BandwidthReceived uint64 `json:"bandwidth_received"`
Error string `json:"error"`
}

// ConnectionsSummary returns all of the proc's connections stats.
Expand Down Expand Up @@ -304,10 +325,12 @@ func (p *Proc) ConnectionsSummary() []ConnectionSummary {
}

summaries = append(summaries, ConnectionSummary{
IsAlive: skywireConn.IsAlive(),
Latency: skywireConn.Latency(),
Throughput: skywireConn.Throughput(),
BandwidthSent: skywireConn.BandwidthSent(),
IsAlive: skywireConn.IsAlive(),
Latency: skywireConn.Latency(),
UploadSpeed: skywireConn.UploadSpeed(),
DownloadSpeed: skywireConn.DownloadSpeed(),
BandwidthSent: skywireConn.BandwidthSent(),
BandwidthReceived: skywireConn.BandwidthReceived(),
})

return true
Expand Down
19 changes: 19 additions & 0 deletions pkg/app/appserver/proc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ProcManager interface {
Stop(appName string) error
Wait(appName string) error
Range(next func(appName string, proc *Proc) bool)
Stats(appName string) (AppStats, error)
ConnectionsSummary(appName string) ([]ConnectionSummary, error)
Addr() net.Addr
}
Expand Down Expand Up @@ -267,6 +268,24 @@ func (m *procManager) Range(next func(name string, proc *Proc) bool) {
}
}

func (m *procManager) Stats(appName string) (AppStats, error) {
p, err := m.get(appName)
if err != nil {
return AppStats{}, err
}

stats := AppStats{
Connections: p.ConnectionsSummary(),
}

startTime, ok := p.StartTime()
if ok {
stats.StartTime = &startTime
}

return stats, nil
}

func (m *procManager) ConnectionsSummary(appName string) ([]ConnectionSummary, error) {
p, err := m.get(appName)
if err != nil {
Expand Down
35 changes: 25 additions & 10 deletions pkg/router/network_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
)

type networkStats struct {
bandwidthSent uint64
bandwidthReceived uint64
latency uint32
throughput uint32
totalBandwidthSent uint64
totalBandwidthReceived uint64
bandwidthReceived uint64
latency uint32
uploadSpeed uint32
downloadSpeed uint32

bandwidthReceivedRecStartMu sync.Mutex
bandwidthReceivedRecStart time.Time
Expand All @@ -32,24 +34,37 @@ func (s *networkStats) Latency() time.Duration {
return time.Duration(latencyMs)
}

func (s *networkStats) SetLocalThroughput(throughput uint32) {
atomic.StoreUint32(&s.throughput, throughput)
func (s *networkStats) SetUploadSpeed(speed uint32) {
atomic.StoreUint32(&s.uploadSpeed, speed)
}

func (s *networkStats) LocalThroughput() uint32 {
return atomic.LoadUint32(&s.throughput)
func (s *networkStats) UploadSpeed() uint32 {
return atomic.LoadUint32(&s.uploadSpeed)
}

func (s *networkStats) SetDownloadSpeed(speed uint32) {
atomic.StoreUint32(&s.downloadSpeed, speed)
}

func (s *networkStats) DownloadSpeed() uint32 {
return atomic.LoadUint32(&s.downloadSpeed)
}

func (s *networkStats) BandwidthSent() uint64 {
return atomic.LoadUint64(&s.bandwidthSent)
return atomic.LoadUint64(&s.totalBandwidthSent)
}

func (s *networkStats) AddBandwidthSent(amount uint64) {
atomic.AddUint64(&s.bandwidthSent, amount)
atomic.AddUint64(&s.totalBandwidthSent, amount)
}

func (s *networkStats) BandwidthReceived() uint64 {
return atomic.LoadUint64(&s.totalBandwidthReceived)
}

func (s *networkStats) AddBandwidthReceived(amount uint64) {
atomic.AddUint64(&s.bandwidthReceived, amount)
atomic.AddUint64(&s.bandwidthReceived, amount)
}

func (s *networkStats) RemoteThroughput() int64 {
Expand Down
16 changes: 13 additions & 3 deletions pkg/router/noise_route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,26 @@ func (nrg *NoiseRouteGroup) Latency() time.Duration {
return nrg.rg.Latency()
}

// Throughput returns throughput till remote (bytes/s).
func (nrg *NoiseRouteGroup) Throughput() uint32 {
return nrg.rg.Throughput()
// UploadSpeed returns upload speed (bytes/s).
func (nrg *NoiseRouteGroup) UploadSpeed() uint32 {
return nrg.rg.UploadSpeed()
}

// DownloadSpeed returns upload speed (bytes/s).
func (nrg *NoiseRouteGroup) DownloadSpeed() uint32 {
return nrg.rg.DownloadSpeed()
}

// BandwidthSent returns amount of bandwidth sent (bytes).
func (nrg *NoiseRouteGroup) BandwidthSent() uint64 {
return nrg.rg.BandwidthSent()
}

// BandwidthReceived returns amount of bandwidth received (bytes).
func (nrg *NoiseRouteGroup) BandwidthReceived() uint64 {
return nrg.rg.BandwidthReceived()
}

func (nrg *NoiseRouteGroup) isClosed() bool {
return nrg.rg.isClosed()
}
Expand Down
20 changes: 16 additions & 4 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,26 @@ func (rg *RouteGroup) Latency() time.Duration {
return rg.networkStats.Latency()
}

// Throughput returns throughput till remote (bytes/s).
func (rg *RouteGroup) Throughput() uint32 {
return rg.networkStats.LocalThroughput()
// UploadSpeed returns upload speed (bytes/s).
func (rg *RouteGroup) UploadSpeed() uint32 {
return rg.networkStats.UploadSpeed()
}

// DownloadSpeed returns download speed (bytes/s).
func (rg *RouteGroup) DownloadSpeed() uint32 {
return rg.networkStats.DownloadSpeed()
}

// BandwidthSent returns amount of bandwidth sent (bytes).
func (rg *RouteGroup) BandwidthSent() uint64 {
return rg.networkStats.BandwidthSent()
}

// BandwidthReceived returns amount of bandwidth received (bytes).
func (rg *RouteGroup) BandwidthReceived() uint64 {
return rg.networkStats.BandwidthReceived()
}

// read reads incoming data. It tries to fetch the data from the internal buffer.
// If buffer is empty it blocks on receiving from the data channel
func (rg *RouteGroup) read(p []byte) (int, error) {
Expand Down Expand Up @@ -414,6 +424,8 @@ func (rg *RouteGroup) sendNetworkProbe() error {
throughput := rg.networkStats.RemoteThroughput()
timestamp := time.Now().UnixNano() / int64(time.Millisecond)

rg.networkStats.SetDownloadSpeed(uint32(throughput))

packet := routing.MakeNetworkProbePacket(rule.NextRouteID(), timestamp, throughput)

if err := rg.writePacket(context.Background(), tp, packet, rule.KeyRouteID()); err != nil {
Expand Down Expand Up @@ -605,7 +617,7 @@ func (rg *RouteGroup) handleNetworkProbePacket(packet routing.Packet) error {
sentAt := time.Unix(int64(sentAtMs/1000), int64(ms)*int64(time.Millisecond))

rg.networkStats.SetLatency(time.Since(sentAt))
rg.networkStats.SetLocalThroughput(uint32(throughput))
rg.networkStats.SetUploadSpeed(uint32(throughput))

return nil
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/visor/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type API interface {
SetAppSecure(appName string, isSecure bool) error
SetAppKillswitch(appName string, killswitch bool) error
LogsSince(timestamp time.Time, appName string) ([]string, error)
GetAppStats(appName string) (appserver.AppStats, error)
GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error)

TransportTypes() ([]string, error)
Expand Down Expand Up @@ -413,6 +414,16 @@ func (v *Visor) LogsSince(timestamp time.Time, appName string) ([]string, error)
return res, nil
}

// GetAppStats implements API.
func (v *Visor) GetAppStats(appName string) (appserver.AppStats, error) {
stats, err := v.procM.Stats(appName)
if err != nil {
return appserver.AppStats{}, err
}

return stats, nil
}

// GetAppConnectionsSummary implements API.
func (v *Visor) GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) {
summary, err := v.procM.ConnectionsSummary(appName)
Expand Down
13 changes: 13 additions & 0 deletions pkg/visor/hypervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (hv *Hypervisor) makeMux() chi.Router {
r.Get("/visors/{pk}/apps/{app}", hv.getApp())
r.Put("/visors/{pk}/apps/{app}", hv.putApp())
r.Get("/visors/{pk}/apps/{app}/logs", hv.appLogsSince())
r.Get("/visors/{pk}/apps/{app}/stats", hv.getAppStats())
r.Get("/visors/{pk}/apps/{app}/connections", hv.appConnections())
r.Get("/visors/{pk}/transport-types", hv.getTransportTypes())
r.Get("/visors/{pk}/transports", hv.getTransports())
Expand Down Expand Up @@ -509,6 +510,18 @@ func (hv *Hypervisor) getApp() http.HandlerFunc {
})
}

func (hv *Hypervisor) getAppStats() http.HandlerFunc {
return hv.withCtx(hv.appCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) {
stats, err := ctx.API.GetAppStats(ctx.App.Name)
if err != nil {
httputil.WriteJSON(w, r, http.StatusInternalServerError, err)
return
}

httputil.WriteJSON(w, r, http.StatusOK, &stats)
})
}

// TODO: simplify
// nolint: funlen,gocognit,godox
func (hv *Hypervisor) putApp() http.HandlerFunc {
Expand Down
12 changes: 12 additions & 0 deletions pkg/visor/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,18 @@ func (r *RPC) SetAppSecure(in *SetAppBoolIn, _ *struct{}) (err error) {
return r.visor.SetAppSecure(in.AppName, in.Val)
}

// GetAppStats gets app runtime statistics.
func (r *RPC) GetAppStats(appName *string, out *appserver.AppStats) (err error) {
defer rpcutil.LogCall(r.log, "GetAppStats", appName)(out, &err)

stats, err := r.visor.GetAppStats(*appName)
if err != nil {
*out = stats
}

return err
}

// GetAppConnectionsSummary returns connections stats for the app.
func (r *RPC) GetAppConnectionsSummary(appName *string, out *[]appserver.ConnectionSummary) (err error) {
defer rpcutil.LogCall(r.log, "GetAppConnectionsSummary", appName)(out, &err)
Expand Down
16 changes: 15 additions & 1 deletion pkg/visor/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,16 @@ func (rc *rpcClient) LogsSince(timestamp time.Time, appName string) ([]string, e
return res, nil
}

func (rc *rpcClient) GetAppStats(appName string) (appserver.AppStats, error) {
var stats appserver.AppStats

if err := rc.Call("GetAppStats", &appName, &stats); err != nil {
return appserver.AppStats{}, err
}

return stats, nil
}

// GetAppConnectionsSummary get connections stats for the app.
func (rc *rpcClient) GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) {
var summary []appserver.ConnectionSummary
Expand Down Expand Up @@ -703,8 +713,12 @@ func (mc *mockRPCClient) LogsSince(timestamp time.Time, _ string) ([]string, err
return mc.logS.LogsSince(timestamp)
}

func (mc *mockRPCClient) GetAppStats(_ string) (appserver.AppStats, error) {
return appserver.AppStats{}, nil
}

// GetAppConnectionsSummary get connections stats for the app.
func (mc *mockRPCClient) GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) {
func (mc *mockRPCClient) GetAppConnectionsSummary(_ string) ([]appserver.ConnectionSummary, error) {
return nil, nil
}

Expand Down