Skip to content

Commit

Permalink
Merge pull request #649 from Darkren/feature/additional-app-stats
Browse files Browse the repository at this point in the history
Additional app stats
  • Loading branch information
jdknives authored Mar 17, 2021
2 parents 55f6797 + 6903886 commit 6c9a0dd
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 30 deletions.
16 changes: 13 additions & 3 deletions pkg/app/appnet/skywire_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,26 @@ func (c *SkywireConn) Latency() time.Duration {
return c.nrg.Latency()
}

// Throughput returns throughput till remote (bytes/s).
func (c *SkywireConn) Throughput() uint32 {
return c.nrg.Throughput()
// UploadSpeed returns upload speed (bytes/s).
func (c *SkywireConn) UploadSpeed() uint32 {
return c.nrg.UploadSpeed()
}

// DownloadSpeed returns download speed (bytes/s).
func (c *SkywireConn) DownloadSpeed() uint32 {
return c.nrg.DownloadSpeed()
}

// BandwidthSent returns amount of bandwidth sent (bytes).
func (c *SkywireConn) BandwidthSent() uint64 {
return c.nrg.BandwidthSent()
}

// BandwidthReceived returns amount of bandwidth received (bytes).
func (c *SkywireConn) BandwidthReceived() uint64 {
return c.nrg.BandwidthReceived()
}

// Close closes connection.
func (c *SkywireConn) Close() error {
var err error
Expand Down
11 changes: 11 additions & 0 deletions pkg/app/appserver/app_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package appserver

import (
"time"
)

// AppStats contains app runtime statistics.
type AppStats struct {
Connections []ConnectionSummary `json:"connections"`
StartTime *time.Time `json:"start_time,omitempty"`
}
41 changes: 32 additions & 9 deletions pkg/app/appserver/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type Proc struct {
m ProcManager
appName string

startTimeMx sync.RWMutex
startTime time.Time

statusMx sync.RWMutex
status string
}
Expand Down Expand Up @@ -91,6 +94,18 @@ func (p *Proc) Cmd() *exec.Cmd {
return p.cmd
}

// StartTime returns app start time.
func (p *Proc) StartTime() (time.Time, bool) {
if !p.IsRunning() {
return time.Time{}, false
}

p.startTimeMx.RLock()
defer p.startTimeMx.RUnlock()

return p.startTime, true
}

// InjectConn introduces the connection to the Proc after it is started.
// Only the first call will return true.
// It also prepares the RPC gateway.
Expand Down Expand Up @@ -158,6 +173,10 @@ func (p *Proc) Start() error {
return err
}

p.startTimeMx.Lock()
p.startTime = time.Now().UTC()
p.startTimeMx.Unlock()

go func() {
waitErrCh := make(chan error)
go func() {
Expand Down Expand Up @@ -283,11 +302,13 @@ func (p *Proc) DetailedStatus() string {

// ConnectionSummary sums up the connection stats.
type ConnectionSummary struct {
IsAlive bool `json:"is_alive"`
Latency time.Duration `json:"latency"`
Throughput uint32 `json:"throughput"`
BandwidthSent uint64 `json:"bandwidth_sent"`
Error string `json:"error"`
IsAlive bool `json:"is_alive"`
Latency time.Duration `json:"latency"`
UploadSpeed uint32 `json:"upload_speed"`
DownloadSpeed uint32 `json:"download_speed"`
BandwidthSent uint64 `json:"bandwidth_sent"`
BandwidthReceived uint64 `json:"bandwidth_received"`
Error string `json:"error"`
}

// ConnectionsSummary returns all of the proc's connections stats.
Expand Down Expand Up @@ -323,10 +344,12 @@ func (p *Proc) ConnectionsSummary() []ConnectionSummary {
}

summaries = append(summaries, ConnectionSummary{
IsAlive: skywireConn.IsAlive(),
Latency: skywireConn.Latency(),
Throughput: skywireConn.Throughput(),
BandwidthSent: skywireConn.BandwidthSent(),
IsAlive: skywireConn.IsAlive(),
Latency: skywireConn.Latency(),
UploadSpeed: skywireConn.UploadSpeed(),
DownloadSpeed: skywireConn.DownloadSpeed(),
BandwidthSent: skywireConn.BandwidthSent(),
BandwidthReceived: skywireConn.BandwidthReceived(),
})

return true
Expand Down
19 changes: 19 additions & 0 deletions pkg/app/appserver/proc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ProcManager interface {
Stop(appName string) error
Wait(appName string) error
Range(next func(appName string, proc *Proc) bool)
Stats(appName string) (AppStats, error)
SetDetailedStatus(appName, status string) error
DetailedStatus(appName string) (string, error)
ConnectionsSummary(appName string) ([]ConnectionSummary, error)
Expand Down Expand Up @@ -269,6 +270,24 @@ func (m *procManager) Range(next func(name string, proc *Proc) bool) {
}
}

func (m *procManager) Stats(appName string) (AppStats, error) {
p, err := m.get(appName)
if err != nil {
return AppStats{}, err
}

stats := AppStats{
Connections: p.ConnectionsSummary(),
}

startTime, ok := p.StartTime()
if ok {
stats.StartTime = &startTime
}

return stats, nil
}

// SetDetailedStatus sets detailed `status` for app `appName`.
func (m *procManager) SetDetailedStatus(appName, status string) error {
p, err := m.get(appName)
Expand Down
35 changes: 25 additions & 10 deletions pkg/router/network_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
)

type networkStats struct {
bandwidthSent uint64
bandwidthReceived uint64
latency uint32
throughput uint32
totalBandwidthSent uint64
totalBandwidthReceived uint64
bandwidthReceived uint64
latency uint32
uploadSpeed uint32
downloadSpeed uint32

bandwidthReceivedRecStartMu sync.Mutex
bandwidthReceivedRecStart time.Time
Expand All @@ -32,24 +34,37 @@ func (s *networkStats) Latency() time.Duration {
return time.Duration(latencyMs)
}

func (s *networkStats) SetLocalThroughput(throughput uint32) {
atomic.StoreUint32(&s.throughput, throughput)
func (s *networkStats) SetUploadSpeed(speed uint32) {
atomic.StoreUint32(&s.uploadSpeed, speed)
}

func (s *networkStats) LocalThroughput() uint32 {
return atomic.LoadUint32(&s.throughput)
func (s *networkStats) UploadSpeed() uint32 {
return atomic.LoadUint32(&s.uploadSpeed)
}

func (s *networkStats) SetDownloadSpeed(speed uint32) {
atomic.StoreUint32(&s.downloadSpeed, speed)
}

func (s *networkStats) DownloadSpeed() uint32 {
return atomic.LoadUint32(&s.downloadSpeed)
}

func (s *networkStats) BandwidthSent() uint64 {
return atomic.LoadUint64(&s.bandwidthSent)
return atomic.LoadUint64(&s.totalBandwidthSent)
}

func (s *networkStats) AddBandwidthSent(amount uint64) {
atomic.AddUint64(&s.bandwidthSent, amount)
atomic.AddUint64(&s.totalBandwidthSent, amount)
}

func (s *networkStats) BandwidthReceived() uint64 {
return atomic.LoadUint64(&s.totalBandwidthReceived)
}

func (s *networkStats) AddBandwidthReceived(amount uint64) {
atomic.AddUint64(&s.bandwidthReceived, amount)
atomic.AddUint64(&s.bandwidthReceived, amount)
}

func (s *networkStats) RemoteThroughput() int64 {
Expand Down
16 changes: 13 additions & 3 deletions pkg/router/noise_route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,26 @@ func (nrg *NoiseRouteGroup) Latency() time.Duration {
return nrg.rg.Latency()
}

// Throughput returns throughput till remote (bytes/s).
func (nrg *NoiseRouteGroup) Throughput() uint32 {
return nrg.rg.Throughput()
// UploadSpeed returns upload speed (bytes/s).
func (nrg *NoiseRouteGroup) UploadSpeed() uint32 {
return nrg.rg.UploadSpeed()
}

// DownloadSpeed returns upload speed (bytes/s).
func (nrg *NoiseRouteGroup) DownloadSpeed() uint32 {
return nrg.rg.DownloadSpeed()
}

// BandwidthSent returns amount of bandwidth sent (bytes).
func (nrg *NoiseRouteGroup) BandwidthSent() uint64 {
return nrg.rg.BandwidthSent()
}

// BandwidthReceived returns amount of bandwidth received (bytes).
func (nrg *NoiseRouteGroup) BandwidthReceived() uint64 {
return nrg.rg.BandwidthReceived()
}

func (nrg *NoiseRouteGroup) isClosed() bool {
return nrg.rg.isClosed()
}
Expand Down
20 changes: 16 additions & 4 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,26 @@ func (rg *RouteGroup) Latency() time.Duration {
return rg.networkStats.Latency()
}

// Throughput returns throughput till remote (bytes/s).
func (rg *RouteGroup) Throughput() uint32 {
return rg.networkStats.LocalThroughput()
// UploadSpeed returns upload speed (bytes/s).
func (rg *RouteGroup) UploadSpeed() uint32 {
return rg.networkStats.UploadSpeed()
}

// DownloadSpeed returns download speed (bytes/s).
func (rg *RouteGroup) DownloadSpeed() uint32 {
return rg.networkStats.DownloadSpeed()
}

// BandwidthSent returns amount of bandwidth sent (bytes).
func (rg *RouteGroup) BandwidthSent() uint64 {
return rg.networkStats.BandwidthSent()
}

// BandwidthReceived returns amount of bandwidth received (bytes).
func (rg *RouteGroup) BandwidthReceived() uint64 {
return rg.networkStats.BandwidthReceived()
}

// read reads incoming data. It tries to fetch the data from the internal buffer.
// If buffer is empty it blocks on receiving from the data channel
func (rg *RouteGroup) read(p []byte) (int, error) {
Expand Down Expand Up @@ -414,6 +424,8 @@ func (rg *RouteGroup) sendNetworkProbe() error {
throughput := rg.networkStats.RemoteThroughput()
timestamp := time.Now().UnixNano() / int64(time.Millisecond)

rg.networkStats.SetDownloadSpeed(uint32(throughput))

packet := routing.MakeNetworkProbePacket(rule.NextRouteID(), timestamp, throughput)

if err := rg.writePacket(context.Background(), tp, packet, rule.KeyRouteID()); err != nil {
Expand Down Expand Up @@ -605,7 +617,7 @@ func (rg *RouteGroup) handleNetworkProbePacket(packet routing.Packet) error {
sentAt := time.Unix(int64(sentAtMs/1000), int64(ms)*int64(time.Millisecond))

rg.networkStats.SetLatency(time.Since(sentAt))
rg.networkStats.SetLocalThroughput(uint32(throughput))
rg.networkStats.SetUploadSpeed(uint32(throughput))

return nil
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/visor/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type API interface {
SetAppSecure(appName string, isSecure bool) error
SetAppKillswitch(appName string, killswitch bool) error
LogsSince(timestamp time.Time, appName string) ([]string, error)
GetAppStats(appName string) (appserver.AppStats, error)
GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error)

TransportTypes() ([]string, error)
Expand Down Expand Up @@ -444,6 +445,16 @@ func (v *Visor) LogsSince(timestamp time.Time, appName string) ([]string, error)
return res, nil
}

// GetAppStats implements API.
func (v *Visor) GetAppStats(appName string) (appserver.AppStats, error) {
stats, err := v.procM.Stats(appName)
if err != nil {
return appserver.AppStats{}, err
}

return stats, nil
}

// GetAppConnectionsSummary implements API.
func (v *Visor) GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) {
summary, err := v.procM.ConnectionsSummary(appName)
Expand Down
13 changes: 13 additions & 0 deletions pkg/visor/hypervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func (hv *Hypervisor) makeMux() chi.Router {
r.Get("/visors/{pk}/apps/{app}", hv.getApp())
r.Put("/visors/{pk}/apps/{app}", hv.putApp())
r.Get("/visors/{pk}/apps/{app}/logs", hv.appLogsSince())
r.Get("/visors/{pk}/apps/{app}/stats", hv.getAppStats())
r.Get("/visors/{pk}/apps/{app}/connections", hv.appConnections())
r.Get("/visors/{pk}/transport-types", hv.getTransportTypes())
r.Get("/visors/{pk}/transports", hv.getTransports())
Expand Down Expand Up @@ -598,6 +599,18 @@ func (hv *Hypervisor) getApp() http.HandlerFunc {
})
}

func (hv *Hypervisor) getAppStats() http.HandlerFunc {
return hv.withCtx(hv.appCtx, func(w http.ResponseWriter, r *http.Request, ctx *httpCtx) {
stats, err := ctx.API.GetAppStats(ctx.App.Name)
if err != nil {
httputil.WriteJSON(w, r, http.StatusInternalServerError, err)
return
}

httputil.WriteJSON(w, r, http.StatusOK, &stats)
})
}

// TODO: simplify
// nolint: funlen,gocognit,godox
func (hv *Hypervisor) putApp() http.HandlerFunc {
Expand Down
12 changes: 12 additions & 0 deletions pkg/visor/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,18 @@ func (r *RPC) SetAppSecure(in *SetAppBoolIn, _ *struct{}) (err error) {
return r.visor.SetAppSecure(in.AppName, in.Val)
}

// GetAppStats gets app runtime statistics.
func (r *RPC) GetAppStats(appName *string, out *appserver.AppStats) (err error) {
defer rpcutil.LogCall(r.log, "GetAppStats", appName)(out, &err)

stats, err := r.visor.GetAppStats(*appName)
if err != nil {
*out = stats
}

return err
}

// GetAppConnectionsSummary returns connections stats for the app.
func (r *RPC) GetAppConnectionsSummary(appName *string, out *[]appserver.ConnectionSummary) (err error) {
defer rpcutil.LogCall(r.log, "GetAppConnectionsSummary", appName)(out, &err)
Expand Down
16 changes: 15 additions & 1 deletion pkg/visor/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ func (rc *rpcClient) LogsSince(timestamp time.Time, appName string) ([]string, e
return res, nil
}

func (rc *rpcClient) GetAppStats(appName string) (appserver.AppStats, error) {
var stats appserver.AppStats

if err := rc.Call("GetAppStats", &appName, &stats); err != nil {
return appserver.AppStats{}, err
}

return stats, nil
}

// GetAppConnectionsSummary get connections stats for the app.
func (rc *rpcClient) GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) {
var summary []appserver.ConnectionSummary
Expand Down Expand Up @@ -725,8 +735,12 @@ func (mc *mockRPCClient) LogsSince(timestamp time.Time, _ string) ([]string, err
return mc.logS.LogsSince(timestamp)
}

func (mc *mockRPCClient) GetAppStats(_ string) (appserver.AppStats, error) {
return appserver.AppStats{}, nil
}

// GetAppConnectionsSummary get connections stats for the app.
func (mc *mockRPCClient) GetAppConnectionsSummary(appName string) ([]appserver.ConnectionSummary, error) {
func (mc *mockRPCClient) GetAppConnectionsSummary(_ string) ([]appserver.ConnectionSummary, error) {
return nil, nil
}

Expand Down

0 comments on commit 6c9a0dd

Please sign in to comment.