diff --git a/rust/scx_stats/README.md b/rust/scx_stats/README.md index 77ee234e0..9e42f200f 100644 --- a/rust/scx_stats/README.md +++ b/rust/scx_stats/README.md @@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; #[derive(Clone, Debug, Serialize, Deserialize, Stats)] -#[stat(desc = "domain statistics")] +#[stat(desc = "domain statistics", _om_prefix="d_", _om_label="domain_name")] struct DomainStats { pub name: String, #[stat(desc = "an event counter")] @@ -39,14 +39,14 @@ struct DomainStats { } #[derive(Clone, Debug, Serialize, Deserialize, Stats)] -#[stat(desc = "cluster statistics")] +#[stat(desc = "cluster statistics", top)] struct ClusterStats { pub name: String, #[stat(desc = "update timestamp")] pub at: u64, #[stat(desc = "some bitmap we want to report")] pub bitmap: Vec, - #[stat(desc = "domain statistics", om_prefix="d_")] + #[stat(desc = "domain statistics")] pub doms_dict: BTreeMap, } ``` @@ -68,18 +68,18 @@ socket can be launched as follows: .set_path(&path) .add_stats_meta(ClusterStats::meta()) .add_stats_meta(DomainStats::meta()) - .add_stats("all", Box::new(move |_| stats.to_json())) + .add_stats("top", Box::new(move |_| stats.to_json())) .launch() .unwrap(); ``` -The `stat_stats::Meta::meta()` trait function is automatically implemented -by the `scx_stats::Meta` derive macro for each statistics struct. Adding -them to the statistics server allows implementing generic clients which -don't have the definitions of the statistics structs - e.g. to relay the +The `scx_stats::Meta::meta()` trait function is automatically implemented by +the `scx_stats::Meta` derive macro for each statistics struct. Adding them +to the statistics server allows implementing generic clients which don't +have the definitions of the statistics structs - e.g. to relay the statistics to another framework such as OpenMetrics. -`all` is the default statistics reported when no specific target is +`top` is the default statistics reported when no specific target is specified and should always be added to the server. The closure should return `serde_json::Value`. Note that `scx_stats::ToJson` automatically adds `.to_json()` to structs which implement both `scx_stats::Meta` and @@ -100,11 +100,11 @@ The above creates a client instance. Let's query the statistics: println!("{:#?}", &resp); ``` -The above is equivalent to querying the `all` target: +The above is equivalent to querying the `top` target: ```rust - println!("\n===== Requesting \"stat\" with \"target\"=\"all\":"); - let resp = client.request::("stat", vec![("target".into(), "all".into())]); + println!("\n===== Requesting \"stat\" with \"target\"=\"top\":"); + let resp = client.request::("stat", vec![("target".into(), "top".into())]); println!("{:#?}", &resp); ``` @@ -197,7 +197,7 @@ Press any key to exit. $ RUST_LOG=trace cargo run --example client -- ~/tmp/socket ... ===== Requesting "stats" but receiving with serde_json::Value: -2024-08-15T22:13:23.769Z TRACE [scx_stats::client] Sending: {"req":"stats","args":{"target":"all"}} +2024-08-15T22:13:23.769Z TRACE [scx_stats::client] Sending: {"req":"stats","args":{"target":"top"}} 2024-08-15T22:13:23.769Z TRACE [scx_stats::client] Received: {"errno":0,"args":{"resp":{"at":12345,"bitmap":[3735928559,3203391149],"doms_dict":{"0":{"events":1234,"name":"domain 0","pressure":1.234},"3":{"events":5678,"name":"domain 3","pressure":5.678}},"name":"test cluster"}}} Ok( Object { diff --git a/rust/scx_stats/examples/client.rs b/rust/scx_stats/examples/client.rs index 8c0228984..ec5726f05 100644 --- a/rust/scx_stats/examples/client.rs +++ b/rust/scx_stats/examples/client.rs @@ -4,29 +4,8 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::env::args; -// DomainStat and ClusterStat definitions must match the ones in server.rs. -// -#[derive(Clone, Debug, Serialize, Deserialize, Stats)] -#[stat(desc = "domain statistics", field_prefix="d_")] -struct DomainStats { - pub name: String, - #[stat(desc = "an event counter")] - pub events: u64, - #[stat(desc = "a gauge number")] - pub pressure: f64, -} - -#[derive(Clone, Debug, Serialize, Deserialize, Stats)] -#[stat(desc = "cluster statistics", all)] -struct ClusterStats { - pub name: String, - #[stat(desc = "update timestamp")] - pub at: u64, - #[stat(desc = "some bitmap we want to report")] - pub bitmap: Vec, - #[stat(desc = "domain statistics")] - pub doms_dict: BTreeMap, -} +// Hacky definition sharing. See stats_def.rs.h. +include!("stats_defs.rs.h"); fn main() { simple_logger::SimpleLogger::new() diff --git a/rust/scx_stats/examples/server.rs b/rust/scx_stats/examples/server.rs index d1290286e..e6241e477 100644 --- a/rust/scx_stats/examples/server.rs +++ b/rust/scx_stats/examples/server.rs @@ -5,29 +5,8 @@ use std::collections::BTreeMap; use std::env::args; use std::io::Read; -// DomainStat and ClusterStat definitions must match the ones in client.rs. -// -#[derive(Clone, Debug, Serialize, Deserialize, Stats)] -#[stat(desc = "domain statistics", field_prefix="d_")] -struct DomainStats { - pub name: String, - #[stat(desc = "an event counter")] - pub events: u64, - #[stat(desc = "a gauge number")] - pub pressure: f64, -} - -#[derive(Clone, Debug, Serialize, Deserialize, Stats)] -#[stat(desc = "cluster statistics", all)] -struct ClusterStats { - pub name: String, - #[stat(desc = "update timestamp")] - pub at: u64, - #[stat(desc = "some bitmap we want to report")] - pub bitmap: Vec, - #[stat(desc = "domain statistics")] - pub doms_dict: BTreeMap, -} +// Hacky definition sharing. See stats_def.rs.h. +include!("stats_defs.rs.h"); fn main() { let stats = ClusterStats { @@ -61,7 +40,7 @@ fn main() { .set_path(&path) .add_stats_meta(ClusterStats::meta()) .add_stats_meta(DomainStats::meta()) - .add_stats("all", Box::new(move |_| stats.to_json())) + .add_stats("top", Box::new(move |_| stats.to_json())) .launch() .unwrap(); diff --git a/rust/scx_stats/examples/stats_defs.rs.h b/rust/scx_stats/examples/stats_defs.rs.h new file mode 100644 index 000000000..f6e6748f8 --- /dev/null +++ b/rust/scx_stats/examples/stats_defs.rs.h @@ -0,0 +1,25 @@ +// To be included from server.rs and client.rs examples. This would usually +// be done through the usual pub struct definitions but it's cumbersome to +// do in the examples directory, so work around with c-like includes. + +#[derive(Clone, Debug, Serialize, Deserialize, Stats)] +#[stat(desc = "domain statistics", _om_prefix="d_", _om_label="domain_name")] +struct DomainStats { + pub name: String, + #[stat(desc = "an event counter")] + pub events: u64, + #[stat(desc = "a gauge number")] + pub pressure: f64, +} + +#[derive(Clone, Debug, Serialize, Deserialize, Stats)] +#[stat(desc = "cluster statistics", top)] +struct ClusterStats { + pub name: String, + #[stat(desc = "update timestamp")] + pub at: u64, + #[stat(desc = "some bitmap we want to report")] + pub bitmap: Vec, + #[stat(desc = "domain statistics")] + pub doms_dict: BTreeMap, +} diff --git a/rust/scx_stats/scripts/scxstats_to_openmetrics.py b/rust/scx_stats/scripts/scxstats_to_openmetrics.py new file mode 100755 index 000000000..6c51862af --- /dev/null +++ b/rust/scx_stats/scripts/scxstats_to_openmetrics.py @@ -0,0 +1,155 @@ +#!/usr/bin/python3 +import argparse +import sys +import json +import socket +import time +import tempfile +from prometheus_client import Gauge, CollectorRegistry, write_to_textfile +from pprint import pprint + +verbose = 0 + +def info(line): + print('[INFO] ' + line, file=sys.stderr) + +def dbg(line): + if verbose: + print('[DBG] ' + line, file=sys.stderr) + +def request(f, req, args={}): + f.write(json.dumps({ 'req': req, 'args': args }) + '\n') + f.flush() + resp = json.loads(f.readline()) + if resp['errno'] != 0: + raise Exception(f'req: {req} args: {args} failed with {resp['errno']} ({resp['args']['resp']})') + return resp['args']['resp'] + +def make_om_metrics(sname, omid, field, labels, meta_db, registry): + # @sname: The name of the current struct. + # + # @omid: The field path down from the top level struct. e.g. '.A.B' + # means that the top level's field 'A' is a dict and the current one is + # the field 'B' of the struct inside that dict. + # + # @field: The corresponding field part of the stats_meta. + # + # @labels: The collected $om_labels as this function descends down + # nested dicts. + desc = field['desc'] if 'desc' in field else '' + prefix = meta_db[sname]['om_prefix'] + + if 'datum' in field: + match field['datum']: + # Single value that can become a Gauge. Gauge name is $om_prefix + # + the leaf level field name. The combination must be unique. + case 'i64' | 'u64' | 'float': + gname = prefix + omid.rsplit('.', 1)[-1] + dbg(f'creating OM metric {gname}@{omid} {labels} "{desc}"') + return { omid: Gauge(gname, desc, labels, registry=registry) } + elif 'dict' in field and 'datum' in field['dict'] and 'struct' in field['dict']['datum']: + # The only allowed nesting is struct inside dict. + sname = field['dict']['datum']['struct'] + struct = meta_db[sname] + # $om_label's will distinguish different members of the dict by + # pointing to the dict keys. + if not struct['om_label']: + raise Exception(f'{omid} is nested inside but does not have _om_label') + # Recurse into the nested struct. + oms = {} + for fname, field in struct['fields'].items(): + oms |= make_om_metrics(sname, f'{omid}.{fname}', field, + labels + [struct['om_label']], meta_db, registry) + return oms + + info(f'field "{omid}" has unsupported type, skipping') + return {} + +def update_om_metrics(resp, omid, labels, meta_db, om_metrics): + for k, v in resp.items(): + k_omid = f'{omid}.{k}' + if type(v) == dict: + # Descend into dict. + for dk, dv in v.items(): + update_om_metrics(dv, k_omid, labels + [dk], meta_db, om_metrics); + elif k_omid in om_metrics: + # Update known metrics. + dbg(f'updating {k_omid} {labels} to {v}') + if len(labels): + om_metrics[k_omid].labels(labels).set(v) + else: + om_metrics[k_omid].set(v) + else: + dbg(f'skpping {k_omid}') + +def main(): + global verbose + + parser = argparse.ArgumentParser( + prog='scxstats_to_openmetrics', + description='Read from scx_stats server and output in OpenMetrics format') + parser.add_argument('-i', '--intv', metavar='SECS', type=float, default='2.0', + help='Polling interval (default: %(default)s)') + parser.add_argument('-v', '--verbose', action='count') + parser.add_argument('-p', '--path', metavar='PATH', default='/var/run/scx/root/stats', + help='UNIX domain socket path to connect to (default: %(default)s)') + + args = parser.parse_args() + verbose = args.verbose + + # Connect to the stats server. + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(args.path) + f = sock.makefile(mode='rw') + + # Query metadata and build meta_db. + meta_db = {} + resp = request(f, 'stats_meta') + + top_sname = None + for sname, struct in resp.items(): + # Find the top-level struct. + if 'top' in struct and struct['top']: + top_sname = sname + + struct['om_prefix'] = '' + struct['om_label'] = '' + + if 'user' in struct: + # om_prefix is used to build unique metric name from field names. + if 'om_prefix' in struct['user']: + struct['om_prefix'] = struct['user']['om_prefix'] + # om_label is used to distinguish structs nested inside dicts. + if 'om_label' in struct['user']: + struct['om_label'] = struct['user']['om_label'] + del struct['user'] + + meta_db[sname] = struct + + if verbose: + dbg('dumping meta_db:') + pprint(meta_db) + + # Instantiate OpenMetrics Gauges. + registry = CollectorRegistry() + om_metrics = {} + for name, field in meta_db[top_sname]['fields'].items(): + om_metrics |= make_om_metrics(top_sname, f'.{name}', field, [], meta_db, registry) + + # Loop and translate stats. + while True: + resp = request(f, 'stats') + if verbose: + dbg('dumping stats response:') + pprint(resp) + update_om_metrics(resp, '', [], meta_db, om_metrics) + + with tempfile.NamedTemporaryFile() as out_file: + write_to_textfile(out_file.name, registry) + with open(out_file.name) as in_file: + sys.stdout.write(in_file.read()) + sys.stdout.flush() + + time.sleep(args.intv) + +main() diff --git a/rust/scx_stats/scx_stats_derive/src/lib.rs b/rust/scx_stats/scx_stats_derive/src/lib.rs index ec949ad52..a08e21eb1 100644 --- a/rust/scx_stats/scx_stats_derive/src/lib.rs +++ b/rust/scx_stats/scx_stats_derive/src/lib.rs @@ -1,4 +1,4 @@ -use quote::{format_ident, quote, quote_spanned}; +use quote::{quote, quote_spanned}; use scx_stats::{ScxStatsData, ScxStatsKind, ScxStatsMetaAux}; use syn::parse_macro_input; use syn::spanned::Spanned; @@ -10,17 +10,16 @@ pub fn stat(input: proc_macro::TokenStream) -> proc_macro::TokenStream { let mut output = proc_macro2::TokenStream::new(); - for (idx, field) in meta.fields.iter().enumerate() { + for (_fname, field) in meta.fields.iter() { match &field.data { ScxStatsData::Datum(datum) | ScxStatsData::Array(datum) | ScxStatsData::Dict { key: _, datum } => { if let ScxStatsKind::Struct(name) = &datum { let path = &paths[name.as_str()]; - let assert_id = format_ident!("_AssertScxStatsMeta_{}", idx); #[rustfmt::skip] let assert = quote_spanned! {path.span()=> - struct #assert_id where #path: scx_stats::Meta; + struct _AssertScxStatsMeta where #path: scx_stats::Meta; }; output.extend(assert.into_iter()); } diff --git a/rust/scx_stats/src/server.rs b/rust/scx_stats/src/server.rs index 3f7be1202..3b9a03013 100644 --- a/rust/scx_stats/src/server.rs +++ b/rust/scx_stats/src/server.rs @@ -51,7 +51,7 @@ impl std::fmt::Debug for ScxStatsErrno { } struct ScxStatsServerData { - stats_meta: Vec, + stats_meta: BTreeMap, stats: StatMap, } @@ -61,7 +61,11 @@ struct ScxStatsServerInner { } impl ScxStatsServerInner { - fn new(listener: UnixListener, stats_meta: Vec, stats: StatMap) -> Self { + fn new( + listener: UnixListener, + stats_meta: BTreeMap, + stats: StatMap, + ) -> Self { Self { listener, data: Arc::new(Mutex::new(ScxStatsServerData { stats_meta, stats })), @@ -90,7 +94,7 @@ impl ScxStatsServerInner { "stats" => { let target = match req.args.get("target") { Some(v) => v, - None => "all", + None => "top", }; let handler = match data.lock().unwrap().stats.get(target) { @@ -159,7 +163,7 @@ pub struct ScxStatsServer { stats_path: PathBuf, path: Option, - stats_meta_holder: Vec, + stats_meta_holder: BTreeMap, stats_holder: StatMap, } @@ -171,13 +175,13 @@ impl ScxStatsServer { stats_path: PathBuf::from("stats"), path: None, - stats_meta_holder: vec![], + stats_meta_holder: BTreeMap::new(), stats_holder: BTreeMap::new(), } } pub fn add_stats_meta(mut self, meta: ScxStatsMeta) -> Self { - self.stats_meta_holder.push(meta); + self.stats_meta_holder.insert(meta.name.clone(), meta); self } @@ -231,7 +235,7 @@ impl ScxStatsServer { let listener = UnixListener::bind(path).with_context(|| format!("creating UNIX socket {:?}", path))?; - let mut stats_meta = vec![]; + let mut stats_meta = BTreeMap::new(); let mut stats = BTreeMap::new(); std::mem::swap(&mut stats_meta, &mut self.stats_meta_holder); std::mem::swap(&mut stats, &mut self.stats_holder); diff --git a/rust/scx_stats/src/stats.rs b/rust/scx_stats/src/stats.rs index c38182595..cf5abf874 100644 --- a/rust/scx_stats/src/stats.rs +++ b/rust/scx_stats/src/stats.rs @@ -174,9 +174,9 @@ impl ScxStatsData { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ScxStatsAttr { - All, + Top, Desc(String), - FieldPrefix(String), + User(String, String), } struct ScxStatsAttrVec { @@ -189,14 +189,24 @@ impl Parse for ScxStatsAttrVec { loop { let ident = input.parse::()?; match ident.to_string().as_str() { - "all" => attrs.push(ScxStatsAttr::All), + "top" => attrs.push(ScxStatsAttr::Top), "desc" => { input.parse::()?; attrs.push(ScxStatsAttr::Desc(input.parse::()?.value())) } - "field_prefix" => { + key if key.starts_with("_") => { + let key = &key[1..]; + if key.len() == 0 { + Err(Error::new( + ident.span(), + "scx_stats: User attribute name missing", + ))? + } input.parse::()?; - attrs.push(ScxStatsAttr::FieldPrefix(input.parse::()?.value())) + attrs.push(ScxStatsAttr::User( + key.to_string(), + input.parse::()?.value(), + )) } _ => Err(Error::new(ident.span(), "scx_stats: Unknown attribute"))?, } @@ -214,9 +224,11 @@ impl Parse for ScxStatsAttrVec { #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct ScxStatsFieldAttrs { #[serde(default, skip_serializing_if = "Option::is_none")] - pub all: Option, + pub top: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub desc: Option, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub user: BTreeMap, } impl ScxStatsFieldAttrs { @@ -229,6 +241,9 @@ impl ScxStatsFieldAttrs { for elem in vec.attrs.into_iter() { match elem { ScxStatsAttr::Desc(v) => fattrs.desc = Some(v), + ScxStatsAttr::User(k, v) => { + fattrs.user.insert(k, v); + } v => Err(Error::new( attr.span(), format!("Not a field attribute: {:?}", &v), @@ -244,7 +259,6 @@ impl ScxStatsFieldAttrs { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ScxStatsField { - pub name: String, #[serde(flatten)] pub data: ScxStatsData, #[serde(flatten)] @@ -252,23 +266,25 @@ pub struct ScxStatsField { } impl ScxStatsField { - pub fn new(field: &Field, paths: &mut BTreeMap) -> syn::Result { - Ok(Self { - name: field.ident.as_ref().unwrap().to_string(), - data: ScxStatsData::new(&field.ty, paths)?, - attrs: ScxStatsFieldAttrs::new(&field.attrs)?, - }) + pub fn new(field: &Field, paths: &mut BTreeMap) -> syn::Result<(String, Self)> { + Ok(( + field.ident.as_ref().unwrap().to_string(), + Self { + data: ScxStatsData::new(&field.ty, paths)?, + attrs: ScxStatsFieldAttrs::new(&field.attrs)?, + }, + )) } } #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct ScxStatsStructAttrs { #[serde(default, skip_serializing_if = "Option::is_none")] - pub all: Option, + pub top: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub desc: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub field_prefix: Option, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub user: BTreeMap, } impl ScxStatsStructAttrs { @@ -280,9 +296,11 @@ impl ScxStatsStructAttrs { let vec = attr.parse_args::()?; for elem in vec.attrs.into_iter() { match elem { - ScxStatsAttr::All => sattrs.all = Some(true), + ScxStatsAttr::Top => sattrs.top = Some(true), ScxStatsAttr::Desc(v) => sattrs.desc = Some(v), - ScxStatsAttr::FieldPrefix(v) => sattrs.field_prefix = Some(v), + ScxStatsAttr::User(k, v) => { + sattrs.user.insert(k, v); + } } } } @@ -297,7 +315,7 @@ pub struct ScxStatsMeta { pub name: String, #[serde(flatten)] pub attrs: ScxStatsStructAttrs, - pub fields: Vec, + pub fields: BTreeMap, } #[derive(Clone, Debug)] @@ -310,21 +328,22 @@ pub struct ScxStatsMetaAux { impl Parse for ScxStatsMetaAux { fn parse(input: &ParseBuffer) -> syn::Result { let mut paths = BTreeMap::new(); - let mut fields = vec![]; + let mut fields = BTreeMap::new(); let item_struct: ItemStruct = input.parse()?; let attrs = ScxStatsStructAttrs::new(&item_struct.attrs)?; if let Fields::Named(named_fields) = &item_struct.fields { for field in named_fields.named.iter() { - fields.push(ScxStatsField::new(field, &mut paths)?); + let (name, sf) = ScxStatsField::new(field, &mut paths)?; + fields.insert(name, sf); } } Ok(Self { meta: ScxStatsMeta { name: item_struct.ident.to_string(), - attrs, + attrs, fields, }, ident: item_struct.ident, diff --git a/scheds/rust/scx_layered/src/stats.rs b/scheds/rust/scx_layered/src/stats.rs index f87d61790..9e28a1a84 100644 --- a/scheds/rust/scx_layered/src/stats.rs +++ b/scheds/rust/scx_layered/src/stats.rs @@ -44,6 +44,7 @@ fn fmt_num(v: u64) -> String { } #[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)] +#[stat(_om_prefix = "l_", _om_label="layer_name")] pub struct LayerStats { #[stat(desc = "layer: CPU utilization (100% means one full CPU)")] pub util: f64, @@ -303,7 +304,7 @@ impl LayerStats { } #[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)] -#[stat(all)] +#[stat(top)] pub struct SysStats { #[stat(desc = "update interval")] pub intv: f64, @@ -335,7 +336,7 @@ pub struct SysStats { pub load: f64, #[stat(desc = "fallback CPU")] pub fallback_cpu: u32, - #[stat(desc = "per-layer statistics", om_prefix = "l_")] + #[stat(desc = "per-layer statistics")] pub layers: BTreeMap, } @@ -433,7 +434,7 @@ pub fn launch_server(sys_stats: Arc>) -> Result<()> { .add_stats_meta(LayerStats::meta()) .add_stats_meta(SysStats::meta()) .add_stats( - "all", + "top", Box::new(move |_| sys_stats.lock().unwrap().to_json()), ) .launch()?;