Skip to content

Commit

Permalink
feat: metrics RPC endpoint and parser
Browse files Browse the repository at this point in the history
  • Loading branch information
PhearZero committed Oct 22, 2024
1 parent 2b99566 commit 90e12f0
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 17 deletions.
9 changes: 5 additions & 4 deletions internal/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ type Account struct {
LastModified int
}

// AccountsFromParticipationKeys maps an array of api.ParticipationKey to a keyed map of Account
func AccountsFromState(state *StateModel) map[string]Account {
type Accounts map[string]Account

// AccountsFromState maps the StateModel to a keyed map of Account
func AccountsFromState(state *StateModel) Accounts {
values := make(map[string]Account)
if state == nil || state.ParticipationKeys == nil {
return values
}
for _, key := range *state.ParticipationKeys {
val, ok := values[key.Address]
if !ok {

// TODO: update from State
values[key.Address] = Account{
Address: key.Address,
Status: "NA",
Expand All @@ -41,7 +43,6 @@ func AccountsFromState(state *StateModel) map[string]Account {
}
} else {
val.Keys++
//val.
values[key.Address] = val
}
}
Expand Down
61 changes: 60 additions & 1 deletion internal/metrics.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,70 @@
package internal

import "time"
import (
"context"
"errors"
"github.com/algorandfoundation/hack-tui/api"
"regexp"
"strconv"
"strings"
"time"
)

type MetricsModel struct {
Enabled bool
Window int
RoundTime time.Duration
TPS float64
RX int
TX int
}

type MetricsResponse map[string]int

func parseMetricsContent(content string) (MetricsResponse, error) {
var err error
result := MetricsResponse{}

// Validate the Content
var isValid bool
isValid, err = regexp.MatchString(`^#`, content)
isValid = isValid && err == nil && content != ""
if !isValid {
return nil, errors.New("invalid metrics content")
}

// Regex for Metrics Format,
// selects all content that does not start with # in multiline mode
re := regexp.MustCompile(`(?m)^[^#].*`)
rows := re.FindAllString(content, -1)

// Add the strings to the map
for _, row := range rows {
var value int
keyValue := strings.Split(row, " ")
value, err = strconv.Atoi(keyValue[1])
result[keyValue[0]] = value
}

// Handle any error results
if err != nil {
return nil, err
}

// Give the user what they asked for
return result, nil
}

// GetMetrics parses the /metrics endpoint from algod into a map
func GetMetrics(ctx context.Context, client *api.ClientWithResponses) (MetricsResponse, error) {
res, err := client.MetricsWithResponse(ctx)
if err != nil {
return nil, err
}

if res.StatusCode() != 200 {
return nil, errors.New("invalid status code")
}

return parseMetricsContent(string(res.Body))
}
78 changes: 78 additions & 0 deletions internal/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package internal

import (
"context"
"github.com/algorandfoundation/hack-tui/api"
"github.com/oapi-codegen/oapi-codegen/v2/pkg/securityprovider"
"strconv"
"testing"
)

func Test_GetMetrics(t *testing.T) {
// Setup elevated client
apiToken, err := securityprovider.NewSecurityProviderApiKey("header", "X-Algo-API-Token", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
if err != nil {
t.Fatal(err)
}
client, err := api.NewClientWithResponses("http://localhost:4001", api.WithRequestEditorFn(apiToken.Intercept))

metrics, err := GetMetrics(context.Background(), client)
if err != nil {
t.Fatal(err)
}

// TODO: ensure localnet is running before tests
metrics, err = GetMetrics(context.Background(), client)
if err != nil {
t.Fatal(err)
}

if metrics["algod_agreement_dropped"] != 0 {
t.Fatal(strconv.Itoa(metrics["algod_agreement_dropped"]) + " is not zero")
}
}

func Test_parseMetrics(t *testing.T) {
content := `# HELP algod_telemetry_drops_total telemetry messages dropped due to full queues
# TYPE algod_telemetry_drops_total counter
algod_telemetry_drops_total 0
# HELP algod_telemetry_errs_total telemetry messages dropped due to server error
# TYPE algod_telemetry_errs_total counter
algod_telemetry_errs_total 0
# HELP algod_ram_usage number of bytes runtime.ReadMemStats().HeapInuse
# TYPE algod_ram_usage gauge
algod_ram_usage 0
# HELP algod_crypto_vrf_generate_total Total number of calls to GenerateVRFSecrets
# TYPE algod_crypto_vrf_generate_total counter
algod_crypto_vrf_generate_total 0
# HELP algod_crypto_vrf_prove_total Total number of calls to VRFSecrets.Prove
# TYPE algod_crypto_vrf_prove_total counter
algod_crypto_vrf_prove_total 0
# HELP algod_crypto_vrf_hash_total Total number of calls to VRFProof.Hash
# TYPE algod_crypto_vrf_hash_total counter
algod_crypto_vrf_hash_total 0
`
metrics, err := parseMetricsContent(content)

if err != nil {
t.Fatal(err)
}

if metrics["algod_telemetry_drops_total"] != 0 {
t.Fatal(strconv.Itoa(metrics["algod_telemetry_drops_total"]) + " is not 0")
}

content = `INVALID`
_, err = parseMetricsContent(content)
if err == nil {
t.Fatal(err)
}

content = `# HELP algod_telemetry_drops_total telemetry messages dropped due to full queues
# TYPE algod_telemetry_drops_total counter
algod_telemetry_drops_total NAN`
_, err = parseMetricsContent(content)
if err == nil {
t.Fatal(err)
}
}
72 changes: 60 additions & 12 deletions internal/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ type StateModel struct {
Metrics MetricsModel
Accounts map[string]Account
ParticipationKeys *[]api.ParticipationKey
// TODO: handle contexts instead of adding it to state
Admin bool
Watching bool
}

func getAverage(data []float64) float64 {
Expand All @@ -30,7 +33,10 @@ func getAverageDuration(timings []time.Duration) time.Duration {
return time.Duration(avg * float64(time.Second))
}

// TODO: allow context to handle loop
func (s *StateModel) Watch(cb func(model *StateModel, err error), ctx context.Context, client *api.ClientWithResponses) {
s.Watching = true

err := s.Status.Fetch(ctx, client)
if err != nil {
cb(nil, err)
Expand All @@ -44,6 +50,9 @@ func (s *StateModel) Watch(cb func(model *StateModel, err error), ctx context.Co
txns := make([]float64, 0)

for {
if !s.Watching {
break
}
// Collect Time of Round
startTime := time.Now()
status, err := client.WaitForBlockWithResponse(ctx, int(lastRound))
Expand All @@ -62,13 +71,7 @@ func (s *StateModel) Watch(cb func(model *StateModel, err error), ctx context.Co
s.Status.LastRound = uint64(status.JSON200.LastRound)

// Fetch Keys
s.ParticipationKeys, err = GetPartKeys(ctx, client)
if err != nil {
cb(nil, err)
}

// Get Accounts
s.Accounts = AccountsFromState(s)
s.UpdateKeys(ctx, client)

// Fetch Block
var format api.GetBlockParamsFormat = "json"
Expand All @@ -88,11 +91,10 @@ func (s *StateModel) Watch(cb func(model *StateModel, err error), ctx context.Co
txns = append(txns, 0)
}

// Set Metrics
s.Metrics.RoundTime = getAverageDuration(timings)
s.Metrics.Window = len(timings)
s.Metrics.TPS = getAverage(txns)

// Fetch RX/TX every 5th round
if s.Status.LastRound%5 == 0 {
s.UpdateMetrics(ctx, client, timings, txns)
}
// Trim data
if len(timings) >= 100 {
timings = timings[1:]
Expand All @@ -103,3 +105,49 @@ func (s *StateModel) Watch(cb func(model *StateModel, err error), ctx context.Co
cb(s, nil)
}
}

func (s *StateModel) Stop() {
s.Watching = false
}

func (s *StateModel) UpdateMetrics(
ctx context.Context,
client *api.ClientWithResponses,
timings []time.Duration,
txns []float64,
) {
if s == nil {
panic("StateModel is nil while UpdateMetrics is called")
}
// Set Metrics
s.Metrics.RoundTime = getAverageDuration(timings)
s.Metrics.Window = len(timings)
s.Metrics.TPS = getAverage(txns)

// Fetch RX/TX
res, err := GetMetrics(ctx, client)
if err != nil {
s.Metrics.Enabled = false
}
if err == nil {
s.Metrics.Enabled = true
s.Metrics.TX = res["algod_network_sent_bytes_total"]
s.Metrics.RX = res["algod_network_received_bytes_total"]
}
}

func (s *StateModel) UpdateAccounts() {
s.Accounts = AccountsFromState(s)
}

func (s *StateModel) UpdateKeys(ctx context.Context, client *api.ClientWithResponses) {
var err error
s.ParticipationKeys, err = GetPartKeys(ctx, client)
if err != nil {
s.Admin = false
}
if err == nil {
s.Admin = true
s.UpdateAccounts()
}
}
59 changes: 59 additions & 0 deletions internal/state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package internal

import (
"context"
"github.com/algorandfoundation/hack-tui/api"
"github.com/oapi-codegen/oapi-codegen/v2/pkg/securityprovider"
"testing"
"time"
)

func Test_StateModel(t *testing.T) {
// Setup elevated client
apiToken, err := securityprovider.NewSecurityProviderApiKey("header", "X-Algo-API-Token", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
if err != nil {
t.Fatal(err)
}
client, err := api.NewClientWithResponses("http://localhost:8080", api.WithRequestEditorFn(apiToken.Intercept))

state := StateModel{
Watching: true,
Status: StatusModel{
LastRound: 1337,
NeedsUpdate: true,
State: "SYNCING",
},
Metrics: MetricsModel{
RoundTime: 0,
TX: 0,
RX: 0,
TPS: 0,
},
}
count := 0
go state.Watch(func(model *StateModel, err error) {
if err != nil || model == nil {
t.Error("Failed")
return
}
count++
}, context.Background(), client)
time.Sleep(5 * time.Second)
// Stop the watcher
state.Stop()
if count == 0 {
t.Fatal("Did not receive any updates")
}
if state.Status.LastRound <= 0 {
t.Fatal("LastRound is stale")
}
t.Log(
"Watching: ", state.Watching,
"LastRound: ", state.Status.LastRound,
"NeedsUpdate: ", state.Status.NeedsUpdate,
"State: ", state.Status.State,
"RoundTime: ", state.Metrics.RoundTime,
"RX: ", state.Metrics.RX,
"TX: ", state.Metrics.TX,
)
}

0 comments on commit 90e12f0

Please sign in to comment.