diff --git a/Cargo.toml b/Cargo.toml index 4eeb6c3..d9d36a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://github.com/haixuanTao/opentelemetry-system-metrics" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -opentelemetry = { version = "0.23.0", features = ["metrics"] } +opentelemetry = { version = "0.27.1", features = ["metrics"] } sysinfo = "0.29" indexmap = "1.8" nvml-wrapper = "0.9.0" @@ -18,12 +18,12 @@ eyre = "0.6.8" tracing = "0.1.40" [dev-dependencies] -opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio", "metrics"] } +opentelemetry_sdk = { version = "0.27.1", features = ["rt-tokio", "metrics"] } futures = "0.3.12" tokio = { version = "1.17.0", features = ["full"] } -opentelemetry-otlp = { version = "0.16.0", features = ["tonic", "metrics"] } +opentelemetry-otlp = { version = "0.27.0", features = ["tonic", "metrics"] } eyre = "0.6.8" -opentelemetry-stdout = { version = "0.4.0", features = ["metrics"] } +opentelemetry-stdout = { version = "0.27.0", features = ["metrics"] } [[example]] name = "otlp-tokio-metrics" diff --git a/examples/otlp-tokio-metrics/main.rs b/examples/otlp-tokio-metrics/main.rs index 5fcc9f2..cd8c953 100644 --- a/examples/otlp-tokio-metrics/main.rs +++ b/examples/otlp-tokio-metrics/main.rs @@ -1,35 +1,38 @@ -use opentelemetry::{ - metrics::MeterProvider as _, - metrics::{self}, -}; +use opentelemetry::metrics::MeterProvider as _; -use opentelemetry_otlp::{ExportConfig, WithExportConfig}; -use opentelemetry_sdk::{metrics::SdkMeterProvider, runtime}; -use opentelemetry_system_metrics::init_process_observer; +use opentelemetry_otlp::Protocol; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::metrics::{MetricResult, SdkMeterProvider}; +use opentelemetry_system_metrics::build_process_observer; use std::time::Duration; -fn init_metrics() -> metrics::Result { - let export_config = ExportConfig { - endpoint: "http://localhost:4317".to_string(), - ..ExportConfig::default() - }; - - opentelemetry_otlp::new_pipeline() - .metrics(runtime::Tokio) - .with_exporter( - opentelemetry_otlp::new_exporter() - .tonic() - .with_export_config(export_config), - ) - .with_period(Duration::from_secs(10)) +fn init_metrics() -> MetricResult { + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint("http://localhost:4317") + .with_protocol(Protocol::Grpc) + .with_timeout(Duration::from_secs(10)) .build() + .unwrap(); + + let reader = opentelemetry_sdk::metrics::PeriodicReader::builder( + exporter, + opentelemetry_sdk::runtime::Tokio, + ) + .with_interval(std::time::Duration::from_secs(10)) + .with_timeout(Duration::from_secs(10)) + .build(); + + Ok(opentelemetry_sdk::metrics::SdkMeterProvider::builder() + .with_reader(reader) + .build()) } #[tokio::main] async fn main() { let meter_provider = init_metrics().unwrap(); let meter = meter_provider.meter("mylibraryname"); - init_process_observer(meter).unwrap(); + build_process_observer(meter).unwrap(); // Do some work diff --git a/examples/stdout-tokio-metrics/main.rs b/examples/stdout-tokio-metrics/main.rs index 78ac838..df11910 100644 --- a/examples/stdout-tokio-metrics/main.rs +++ b/examples/stdout-tokio-metrics/main.rs @@ -1,23 +1,37 @@ use opentelemetry::metrics::MeterProvider as _; -use opentelemetry_sdk::{ - metrics::{PeriodicReader, SdkMeterProvider}, - runtime, -}; -use opentelemetry_system_metrics::init_process_observer; +use opentelemetry_otlp::Protocol; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::metrics::{MetricResult, SdkMeterProvider}; + +use opentelemetry_system_metrics::build_process_observer; use std::time::Duration; -fn init_metrics() -> SdkMeterProvider { - let exporter = opentelemetry_stdout::MetricsExporter::default(); - let reader = PeriodicReader::builder(exporter, runtime::Tokio) - .with_interval(Duration::from_secs(1)) - .build(); - SdkMeterProvider::builder().with_reader(reader).build() +fn init_metrics() -> MetricResult { + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint("http://localhost:4317") + .with_protocol(Protocol::Grpc) + .with_timeout(Duration::from_secs(10)) + .build() + .unwrap(); + + let reader = opentelemetry_sdk::metrics::PeriodicReader::builder( + exporter, + opentelemetry_sdk::runtime::Tokio, + ) + .with_interval(std::time::Duration::from_secs(1)) + .with_timeout(Duration::from_secs(10)) + .build(); + + Ok(opentelemetry_sdk::metrics::SdkMeterProvider::builder() + .with_reader(reader) + .build()) } #[tokio::main] async fn main() { - let meter_provider = init_metrics(); + let meter_provider = init_metrics().expect("meter_provider"); let meter = meter_provider.meter("mylibraryname"); - let _ = init_process_observer(meter).unwrap(); + let _ = build_process_observer(meter).unwrap(); tokio::time::sleep(Duration::from_secs(60)).await; meter_provider.shutdown().unwrap(); diff --git a/src/lib.rs b/src/lib.rs index f527fb9..da32525 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,19 +10,16 @@ //! //! ``` //! use opentelemetry::global; -//! use opentelemetry_system_metrics::init_process_observer; +//! use opentelemetry_system_metrics::build_process_observer; //! //! let meter = global::meter("process-meter"); -//! init_process_observer(meter); +//! build_process_observer(meter); //! ``` //! - -use eyre::Context; use eyre::ContextCompat; use eyre::Result; use nvml_wrapper::enums::device::UsedGpuMemory; use nvml_wrapper::Nvml; -use opentelemetry::metrics::Unit; use sysinfo::PidExt; @@ -31,12 +28,7 @@ use sysinfo::SystemExt; use sysinfo::{get_current_pid, System}; use opentelemetry::metrics::Meter; -use opentelemetry::Key; - -const PROCESS_PID: Key = Key::from_static_str("process.pid"); -const PROCESS_EXECUTABLE_NAME: Key = Key::from_static_str("process.executable.name"); -const PROCESS_EXECUTABLE_PATH: Key = Key::from_static_str("process.executable.path"); -const PROCESS_COMMAND: Key = Key::from_static_str("process.command"); +use opentelemetry::KeyValue; // Not implemented yet! // @@ -50,7 +42,7 @@ const PROCESS_MEMORY_USAGE: &str = "process.memory.usage"; const PROCESS_MEMORY_VIRTUAL: &str = "process.memory.virtual"; const PROCESS_DISK_IO: &str = "process.disk.io"; // const PROCESS_NETWORK_IO: &str = "process.network.io"; -const DIRECTION: Key = Key::from_static_str("direction"); +const DIRECTION: &str = "direction"; // const PROCESS_GPU_USAGE: &str = "process.gpu.usage"; const PROCESS_GPU_MEMORY_USAGE: &str = "process.gpu.memory.usage"; @@ -60,13 +52,13 @@ const PROCESS_GPU_MEMORY_USAGE: &str = "process.gpu.memory.usage"; /// /// ``` /// use opentelemetry::global; -/// use opentelemetry_system_metrics::init_process_observer; +/// use opentelemetry_system_metrics::build_process_observer; /// /// let meter = global::meter("process-meter"); -/// init_process_observer(meter); +/// build_process_observer(meter); /// ``` /// -pub fn init_process_observer(meter: Meter) -> Result<()> { +pub fn build_process_observer(meter: Meter) -> Result<()> { let pid = get_current_pid().map_err(|err| eyre::eyre!("could not get current pid. Error: {err}"))?; register_metrics(meter, pid) @@ -77,14 +69,14 @@ pub fn init_process_observer(meter: Meter) -> Result<()> { /// /// ``` /// use opentelemetry::global; -/// use opentelemetry_system_metrics::init_process_observer_for_pid; +/// use opentelemetry_system_metrics::build_process_observer_for_pid; /// /// let meter = global::meter("process-meter"); /// let pid = 1234; // replace with the actual PID -/// init_process_observer_for_pid(meter, pid); +/// build_process_observer_for_pid(meter, pid); /// ``` /// -pub fn init_process_observer_for_pid(meter: Meter, pid: u32) -> Result<()> { +pub fn build_process_observer_for_pid(meter: Meter, pid: u32) -> Result<()> { let pid = sysinfo::Pid::from_u32(pid); register_metrics(meter, pid) } @@ -97,150 +89,125 @@ fn register_metrics(meter: Meter, pid: sysinfo::Pid) -> Result<()> { let nvml = Nvml::init(); - let process_cpu_utilization = meter + // CPU Usage + meter .f64_observable_gauge(PROCESS_CPU_USAGE) - .with_description("The percentage of CPU in use.") - .init(); - let process_cpu_usage = meter + .with_description("CPU usage of the process") + .with_callback(move |observer| { + let mut sys = System::new_all(); + sys.refresh_process(pid); + + if let Some(process) = sys.process(pid) { + let cpu_usage = process.cpu_usage(); + observer.observe(cpu_usage.into(), &[]); + } + }) + .build(); + + // CPU Utilization + meter .f64_observable_gauge(PROCESS_CPU_UTILIZATION) - .with_description("The amount of CPU in use.") - .init(); - let process_memory_usage = meter + .with_description("CPU utilization of the process as a fraction of core count") + .with_callback(move |observer| { + let mut sys = System::new_all(); + sys.refresh_process(pid); + + if let Some(process) = sys.process(pid) { + let cpu_utilization = process.cpu_usage() / core_count as f32; + observer.observe(cpu_utilization.into(), &[]); + } + }) + .build(); + + // Memory Usage + meter .i64_observable_gauge(PROCESS_MEMORY_USAGE) - .with_description("The amount of physical memory in use.") - .with_unit(Unit::new("byte")) - .init(); - let process_memory_virtual = meter + .with_description("Memory usage of the process") + .with_callback(move |observer| { + let mut sys = System::new_all(); + sys.refresh_process(pid); + + if let Some(process) = sys.process(pid) { + observer.observe(process.memory() as i64, &[]); + } + }) + .build(); + + // Virtual Memory Usage + meter .i64_observable_gauge(PROCESS_MEMORY_VIRTUAL) - .with_description("The amount of committed virtual memory.") - .with_unit(Unit::new("byte")) - .init(); - let process_disk_io = meter + .with_description("Virtual memory usage of the process") + .with_callback(move |observer| { + let mut sys = System::new_all(); + sys.refresh_process(pid); + + if let Some(process) = sys.process(pid) { + observer.observe(process.virtual_memory() as i64, &[]); + } + }) + .build(); + + // Disk I/O Read + meter .i64_observable_gauge(PROCESS_DISK_IO) - .with_description("Disk bytes transferred.") - .with_unit(Unit::new("byte")) - .init(); - - let process_gpu_memory_usage = meter - .u64_observable_gauge(PROCESS_GPU_MEMORY_USAGE) - .with_description("The amount of physical GPU memory in use.") - .with_unit(Unit::new("byte")) - .init(); - + .with_description("Disk I/O read bytes of the process") + .with_callback(move |observer| { + let mut sys = System::new_all(); + sys.refresh_process(pid); + + if let Some(process) = sys.process(pid) { + observer.observe( + process.disk_usage().read_bytes as i64, + &[KeyValue::new(DIRECTION, "read")], + ); + } + }) + .build(); + + // Disk I/O Write meter - .register_callback( - &[ - process_cpu_utilization.as_any(), - process_cpu_usage.as_any(), - process_memory_usage.as_any(), - process_memory_virtual.as_any(), - process_disk_io.as_any(), - process_gpu_memory_usage.as_any(), - ], - move |context| { - let mut sys = System::new_all(); - sys.refresh_processes(); - - let common_attributes = if let Some(process) = sys.process(pid) { - [ - PROCESS_PID.i64(pid.as_u32().into()), - PROCESS_EXECUTABLE_NAME.string(process.name().to_string()), - PROCESS_EXECUTABLE_PATH.string(process.exe().to_str().unwrap().to_string()), - PROCESS_COMMAND.string(process.cmd().join(" ").to_string()), - ] - } else { - unimplemented!() - }; - - sys.refresh_process(pid); - - if let Some(process) = sys.process(pid) { - let cpu_usage = process.cpu_usage(); - let disk_io = process.disk_usage(); - // let network_io = process.network_usage(); - - context.observe_f64(&process_cpu_usage, cpu_usage.into(), &[]); - context.observe_f64( - &process_cpu_utilization, - (cpu_usage / core_count as f32).into(), - &common_attributes, - ); - context.observe_i64( - &process_memory_usage, - (process.memory()).try_into().unwrap(), - &common_attributes, - ); - context.observe_i64( - &process_memory_virtual, - (process.virtual_memory()).try_into().unwrap(), - &common_attributes, - ); - context.observe_i64( - &process_disk_io, - disk_io.read_bytes.try_into().unwrap(), - &[common_attributes.as_slice(), &[DIRECTION.string("read")]].concat(), - ); - context.observe_i64( - &process_disk_io, - disk_io.written_bytes.try_into().unwrap(), - &[common_attributes.as_slice(), &[DIRECTION.string("write")]].concat(), - ); - - // result.observe( - // &[common_attributes.as_slice(), &[DIRECTION.string("receive")]].concat(), - // &[process_network_io - // .observe(context,(network_io.received_bytes.try_into().unwrap())], - // ); - // result.observe( - // &[ - // common_attributes.as_slice(), - // &[DIRECTION.string("transmit")], - // ] - // .concat(), - // &[process_network_io - // .observe(context,(network_io.transmitted_bytes.try_into().unwrap())], - // ); - } - - // let mut last_timestamp = last_timestamp.lock().unwrap().clone(); - match &nvml { - Ok(nvml) => { - // Get the first `Device` (GPU) in the system - if let Ok(device) = nvml.device_by_index(0) { - if let Ok(gpu_stats) = device.running_compute_processes() { - for stat in gpu_stats.iter() { - if stat.pid == pid.as_u32() { - let memory_used = match stat.used_gpu_memory { - UsedGpuMemory::Used(bytes) => bytes, - UsedGpuMemory::Unavailable => 0, - }; - - context.observe_u64( - &process_gpu_memory_usage, - memory_used, - &common_attributes, - ); - - break; - } + .i64_observable_gauge(PROCESS_DISK_IO) + .with_description("Disk I/O write bytes of the process") + .with_callback(move |observer| { + let mut sys = System::new_all(); + sys.refresh_process(pid); + + if let Some(process) = sys.process(pid) { + observer.observe( + process.disk_usage().written_bytes as i64, + &[KeyValue::new(DIRECTION, "write")], + ); + } + }) + .build(); + + // GPU Memory Usage + meter + .u64_observable_gauge(PROCESS_GPU_MEMORY_USAGE) + .with_description("GPU memory usage of the process") + .with_callback({ + move |observer| { + if let Ok(nvml) = &nvml { + if let Ok(device) = nvml.device_by_index(0) { + if let Ok(gpu_stats) = device.running_compute_processes() { + for stat in gpu_stats { + if stat.pid == pid.as_u32() { + let memory_used = match stat.used_gpu_memory { + UsedGpuMemory::Used(bytes) => bytes, + UsedGpuMemory::Unavailable => 0, + }; + observer.observe(memory_used, &[]); + return; } - - // If the loop finishes and no pid matched our pid, put 0. - context.observe_u64( - &process_gpu_memory_usage, - 0, - &common_attributes, - ); - }; + } } } - Err(err) => tracing::info!( - "Could not initiate NVML for observing GPU memory usage. Error: {:?}", - err - ), } - }, - ) - .context("could not register traceback")?; + // Default to 0 if no matching GPU stats found + observer.observe(0, &[]); + } + }) + .build(); + Ok(()) }