Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scx_stats: Refine scx_stats and implement scxstats_to_openmetrics.py #502

Merged
merged 7 commits into from
Aug 16, 2024
26 changes: 13 additions & 13 deletions rust/scx_stats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

#[derive(Clone, Debug, Serialize, Deserialize, Stats)]
#[stat(desc = "domain statistics")]
#[stat(desc = "domain statistics", _om_prefix="d_", _om_label="domain_name")]
struct DomainStats {
pub name: String,
#[stat(desc = "an event counter")]
Expand All @@ -39,14 +39,14 @@ struct DomainStats {
}

#[derive(Clone, Debug, Serialize, Deserialize, Stats)]
#[stat(desc = "cluster statistics")]
#[stat(desc = "cluster statistics", top)]
struct ClusterStats {
pub name: String,
#[stat(desc = "update timestamp")]
pub at: u64,
#[stat(desc = "some bitmap we want to report")]
pub bitmap: Vec<u32>,
#[stat(desc = "domain statistics", om_prefix="d_")]
#[stat(desc = "domain statistics")]
pub doms_dict: BTreeMap<usize, DomainStats>,
}
```
Expand All @@ -68,18 +68,18 @@ socket can be launched as follows:
.set_path(&path)
.add_stats_meta(ClusterStats::meta())
.add_stats_meta(DomainStats::meta())
.add_stats("all", Box::new(move |_| stats.to_json()))
.add_stats("top", Box::new(move |_| stats.to_json()))
.launch()
.unwrap();
```

The `stat_stats::Meta::meta()` trait function is automatically implemented
by the `scx_stats::Meta` derive macro for each statistics struct. Adding
them to the statistics server allows implementing generic clients which
don't have the definitions of the statistics structs - e.g. to relay the
The `scx_stats::Meta::meta()` trait function is automatically implemented by
the `scx_stats::Meta` derive macro for each statistics struct. Adding them
to the statistics server allows implementing generic clients which don't
have the definitions of the statistics structs - e.g. to relay the
statistics to another framework such as OpenMetrics.

`all` is the default statistics reported when no specific target is
`top` is the default statistics reported when no specific target is
specified and should always be added to the server. The closure should
return `serde_json::Value`. Note that `scx_stats::ToJson` automatically adds
`.to_json()` to structs which implement both `scx_stats::Meta` and
Expand All @@ -100,11 +100,11 @@ The above creates a client instance. Let's query the statistics:
println!("{:#?}", &resp);
```

The above is equivalent to querying the `all` target:
The above is equivalent to querying the `top` target:

```rust
println!("\n===== Requesting \"stat\" with \"target\"=\"all\":");
let resp = client.request::<ClusterStats>("stat", vec![("target".into(), "all".into())]);
println!("\n===== Requesting \"stat\" with \"target\"=\"top\":");
let resp = client.request::<ClusterStats>("stat", vec![("target".into(), "top".into())]);
println!("{:#?}", &resp);
```

Expand Down Expand Up @@ -197,7 +197,7 @@ Press any key to exit.
$ RUST_LOG=trace cargo run --example client -- ~/tmp/socket
...
===== Requesting "stats" but receiving with serde_json::Value:
2024-08-15T22:13:23.769Z TRACE [scx_stats::client] Sending: {"req":"stats","args":{"target":"all"}}
2024-08-15T22:13:23.769Z TRACE [scx_stats::client] Sending: {"req":"stats","args":{"target":"top"}}
2024-08-15T22:13:23.769Z TRACE [scx_stats::client] Received: {"errno":0,"args":{"resp":{"at":12345,"bitmap":[3735928559,3203391149],"doms_dict":{"0":{"events":1234,"name":"domain 0","pressure":1.234},"3":{"events":5678,"name":"domain 3","pressure":5.678}},"name":"test cluster"}}}
Ok(
Object {
Expand Down
25 changes: 2 additions & 23 deletions rust/scx_stats/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,8 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::env::args;

// DomainStat and ClusterStat definitions must match the ones in server.rs.
//
#[derive(Clone, Debug, Serialize, Deserialize, Stats)]
#[stat(desc = "domain statistics", field_prefix="d_")]
struct DomainStats {
pub name: String,
#[stat(desc = "an event counter")]
pub events: u64,
#[stat(desc = "a gauge number")]
pub pressure: f64,
}

#[derive(Clone, Debug, Serialize, Deserialize, Stats)]
#[stat(desc = "cluster statistics", all)]
struct ClusterStats {
pub name: String,
#[stat(desc = "update timestamp")]
pub at: u64,
#[stat(desc = "some bitmap we want to report")]
pub bitmap: Vec<u32>,
#[stat(desc = "domain statistics")]
pub doms_dict: BTreeMap<usize, DomainStats>,
}
// Hacky definition sharing. See stats_def.rs.h.
include!("stats_defs.rs.h");

fn main() {
simple_logger::SimpleLogger::new()
Expand Down
27 changes: 3 additions & 24 deletions rust/scx_stats/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,8 @@ use std::collections::BTreeMap;
use std::env::args;
use std::io::Read;

// DomainStat and ClusterStat definitions must match the ones in client.rs.
//
#[derive(Clone, Debug, Serialize, Deserialize, Stats)]
#[stat(desc = "domain statistics", field_prefix="d_")]
struct DomainStats {
pub name: String,
#[stat(desc = "an event counter")]
pub events: u64,
#[stat(desc = "a gauge number")]
pub pressure: f64,
}

#[derive(Clone, Debug, Serialize, Deserialize, Stats)]
#[stat(desc = "cluster statistics", all)]
struct ClusterStats {
pub name: String,
#[stat(desc = "update timestamp")]
pub at: u64,
#[stat(desc = "some bitmap we want to report")]
pub bitmap: Vec<u32>,
#[stat(desc = "domain statistics")]
pub doms_dict: BTreeMap<usize, DomainStats>,
}
// Hacky definition sharing. See stats_def.rs.h.
include!("stats_defs.rs.h");

fn main() {
let stats = ClusterStats {
Expand Down Expand Up @@ -61,7 +40,7 @@ fn main() {
.set_path(&path)
.add_stats_meta(ClusterStats::meta())
.add_stats_meta(DomainStats::meta())
.add_stats("all", Box::new(move |_| stats.to_json()))
.add_stats("top", Box::new(move |_| stats.to_json()))
.launch()
.unwrap();

Expand Down
25 changes: 25 additions & 0 deletions rust/scx_stats/examples/stats_defs.rs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// To be included from server.rs and client.rs examples. This would usually
// be done through the usual pub struct definitions but it's cumbersome to
// do in the examples directory, so work around with c-like includes.

#[derive(Clone, Debug, Serialize, Deserialize, Stats)]
#[stat(desc = "domain statistics", _om_prefix="d_", _om_label="domain_name")]
struct DomainStats {
pub name: String,
#[stat(desc = "an event counter")]
pub events: u64,
#[stat(desc = "a gauge number")]
pub pressure: f64,
}

#[derive(Clone, Debug, Serialize, Deserialize, Stats)]
#[stat(desc = "cluster statistics", top)]
struct ClusterStats {
pub name: String,
#[stat(desc = "update timestamp")]
pub at: u64,
#[stat(desc = "some bitmap we want to report")]
pub bitmap: Vec<u32>,
#[stat(desc = "domain statistics")]
pub doms_dict: BTreeMap<usize, DomainStats>,
}
155 changes: 155 additions & 0 deletions rust/scx_stats/scripts/scxstats_to_openmetrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#!/usr/bin/python3
import argparse
import sys
import json
import socket
import time
import tempfile
from prometheus_client import Gauge, CollectorRegistry, write_to_textfile
from pprint import pprint

verbose = 0

def info(line):
print('[INFO] ' + line, file=sys.stderr)

def dbg(line):
if verbose:
print('[DBG] ' + line, file=sys.stderr)

def request(f, req, args={}):
f.write(json.dumps({ 'req': req, 'args': args }) + '\n')
f.flush()
resp = json.loads(f.readline())
if resp['errno'] != 0:
raise Exception(f'req: {req} args: {args} failed with {resp['errno']} ({resp['args']['resp']})')
return resp['args']['resp']

def make_om_metrics(sname, omid, field, labels, meta_db, registry):
# @sname: The name of the current struct.
#
# @omid: The field path down from the top level struct. e.g. '.A.B'
# means that the top level's field 'A' is a dict and the current one is
# the field 'B' of the struct inside that dict.
#
# @field: The corresponding field part of the stats_meta.
#
# @labels: The collected $om_labels as this function descends down
# nested dicts.
desc = field['desc'] if 'desc' in field else ''
prefix = meta_db[sname]['om_prefix']

if 'datum' in field:
match field['datum']:
# Single value that can become a Gauge. Gauge name is $om_prefix
# + the leaf level field name. The combination must be unique.
case 'i64' | 'u64' | 'float':
gname = prefix + omid.rsplit('.', 1)[-1]
dbg(f'creating OM metric {gname}@{omid} {labels} "{desc}"')
return { omid: Gauge(gname, desc, labels, registry=registry) }
elif 'dict' in field and 'datum' in field['dict'] and 'struct' in field['dict']['datum']:
# The only allowed nesting is struct inside dict.
sname = field['dict']['datum']['struct']
struct = meta_db[sname]
# $om_label's will distinguish different members of the dict by
# pointing to the dict keys.
if not struct['om_label']:
raise Exception(f'{omid} is nested inside but does not have _om_label')
# Recurse into the nested struct.
oms = {}
for fname, field in struct['fields'].items():
oms |= make_om_metrics(sname, f'{omid}.{fname}', field,
labels + [struct['om_label']], meta_db, registry)
return oms

info(f'field "{omid}" has unsupported type, skipping')
return {}

def update_om_metrics(resp, omid, labels, meta_db, om_metrics):
for k, v in resp.items():
k_omid = f'{omid}.{k}'
if type(v) == dict:
# Descend into dict.
for dk, dv in v.items():
update_om_metrics(dv, k_omid, labels + [dk], meta_db, om_metrics);
elif k_omid in om_metrics:
# Update known metrics.
dbg(f'updating {k_omid} {labels} to {v}')
if len(labels):
om_metrics[k_omid].labels(labels).set(v)
else:
om_metrics[k_omid].set(v)
else:
dbg(f'skpping {k_omid}')

def main():
global verbose

parser = argparse.ArgumentParser(
prog='scxstats_to_openmetrics',
description='Read from scx_stats server and output in OpenMetrics format')
parser.add_argument('-i', '--intv', metavar='SECS', type=float, default='2.0',
help='Polling interval (default: %(default)s)')
parser.add_argument('-v', '--verbose', action='count')
parser.add_argument('-p', '--path', metavar='PATH', default='/var/run/scx/root/stats',
help='UNIX domain socket path to connect to (default: %(default)s)')

args = parser.parse_args()
verbose = args.verbose

# Connect to the stats server.
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(args.path)
f = sock.makefile(mode='rw')

# Query metadata and build meta_db.
meta_db = {}
resp = request(f, 'stats_meta')

top_sname = None
for sname, struct in resp.items():
# Find the top-level struct.
if 'top' in struct and struct['top']:
top_sname = sname

struct['om_prefix'] = ''
struct['om_label'] = ''

if 'user' in struct:
# om_prefix is used to build unique metric name from field names.
if 'om_prefix' in struct['user']:
struct['om_prefix'] = struct['user']['om_prefix']
# om_label is used to distinguish structs nested inside dicts.
if 'om_label' in struct['user']:
struct['om_label'] = struct['user']['om_label']
del struct['user']

meta_db[sname] = struct

if verbose:
dbg('dumping meta_db:')
pprint(meta_db)

# Instantiate OpenMetrics Gauges.
registry = CollectorRegistry()
om_metrics = {}
for name, field in meta_db[top_sname]['fields'].items():
om_metrics |= make_om_metrics(top_sname, f'.{name}', field, [], meta_db, registry)

# Loop and translate stats.
while True:
resp = request(f, 'stats')
if verbose:
dbg('dumping stats response:')
pprint(resp)
update_om_metrics(resp, '', [], meta_db, om_metrics)

with tempfile.NamedTemporaryFile() as out_file:
write_to_textfile(out_file.name, registry)
with open(out_file.name) as in_file:
sys.stdout.write(in_file.read())
sys.stdout.flush()

time.sleep(args.intv)

main()
7 changes: 3 additions & 4 deletions rust/scx_stats/scx_stats_derive/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use quote::{format_ident, quote, quote_spanned};
use quote::{quote, quote_spanned};
use scx_stats::{ScxStatsData, ScxStatsKind, ScxStatsMetaAux};
use syn::parse_macro_input;
use syn::spanned::Spanned;
Expand All @@ -10,17 +10,16 @@ pub fn stat(input: proc_macro::TokenStream) -> proc_macro::TokenStream {

let mut output = proc_macro2::TokenStream::new();

for (idx, field) in meta.fields.iter().enumerate() {
for (_fname, field) in meta.fields.iter() {
match &field.data {
ScxStatsData::Datum(datum)
| ScxStatsData::Array(datum)
| ScxStatsData::Dict { key: _, datum } => {
if let ScxStatsKind::Struct(name) = &datum {
let path = &paths[name.as_str()];
let assert_id = format_ident!("_AssertScxStatsMeta_{}", idx);
#[rustfmt::skip]
let assert = quote_spanned! {path.span()=>
struct #assert_id where #path: scx_stats::Meta;
struct _AssertScxStatsMeta where #path: scx_stats::Meta;
};
output.extend(assert.into_iter());
}
Expand Down
Loading