Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stress-test framework to contrib repo. #138

Merged
merged 8 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
members = [
"opentelemetry-*",
"examples/*",
"stress",
]
resolver = "2"

Expand Down
24 changes: 24 additions & 0 deletions stress/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "stress"
version = "0.1.0"
edition = "2021"
publish = false

[[bin]] # Bin to run the metrics stress tests for Logs UserEvent Exporter
name = "user-events-enabled"
path = "src/user_events_enabled.rs"
doc = false

[dependencies]
ctrlc = "3.2.5"
lazy_static = "1.4.0"
num_cpus = "1.15.0"
num-format = "0.4.4"
sysinfo = { version = "0.32", optional = true }
eventheader_dynamic = "0.4.0"

[features]
stats = ["sysinfo"]

[profile.release]
debug = true
168 changes: 168 additions & 0 deletions stress/src/throughput.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use num_format::{Locale, ToFormattedString};
use std::cell::UnsafeCell;
use std::env;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
#[cfg(feature = "stats")]
use sysinfo::{Pid, System};

const SLIDING_WINDOW_SIZE: u64 = 2; // In seconds

static STOP: AtomicBool = AtomicBool::new(false);

#[repr(C)]
#[derive(Default)]
struct WorkerStats {
count: u64,
/// We use a padding for the struct to allow each thread to have exclusive access to each WorkerStat
/// Otherwise, there would be some cpu contention with threads needing to take ownership of the cache lines
padding: [u64; 15],
}

pub fn test_throughput<F>(func: F)
where
F: Fn() + Sync + Send + 'static,
{
ctrlc::set_handler(move || {
STOP.store(true, Ordering::SeqCst);
})
.expect("Error setting Ctrl-C handler");

let mut num_threads = num_cpus::get();
let mut args_iter = env::args();

Check warning on line 34 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L24-L34

Added lines #L24 - L34 were not covered by tests

if let Some(arg_str) = args_iter.nth(1) {
let arg = arg_str.parse::<usize>();

if arg.is_err() {
eprintln!("Invalid command line argument '{}' as number of threads. Make sure the value is a positive integer.", arg_str);
std::process::exit(1);
}

let arg_num = arg.unwrap();

if arg_num > 0 {
if arg_num > num_cpus::get() {
println!(
"Specified {} threads which is larger than the number of logical cores ({})!",
arg_num, num_threads
);
}
num_threads = arg_num;

Check warning on line 53 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L36-L53

Added lines #L36 - L53 were not covered by tests
} else {
eprintln!("Invalid command line argument {} as number of threads. Make sure the value is above 0 and less than or equal to number of available logical cores ({}).", arg_num, num_threads);
std::process::exit(1);

Check warning on line 56 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L55-L56

Added lines #L55 - L56 were not covered by tests
}
}

Check warning on line 58 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L58

Added line #L58 was not covered by tests

println!("Number of threads: {}\n", num_threads);
let func_arc = Arc::new(func);
let mut worker_stats_vec: Vec<WorkerStats> = Vec::new();

for _ in 0..num_threads {
worker_stats_vec.push(WorkerStats::default());
}

Check warning on line 66 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L60-L66

Added lines #L60 - L66 were not covered by tests

let shared_mutable_stats_slice = UnsafeSlice::new(&mut worker_stats_vec);

thread::scope(|s| {
s.spawn(|| {
let mut last_collect_time = Instant::now();
let mut total_count_old: u64 = 0;

#[cfg(feature = "stats")]
let pid = Pid::from(std::process::id() as usize);
#[cfg(feature = "stats")]
let mut system = System::new_all();

Check warning on line 78 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L68-L78

Added lines #L68 - L78 were not covered by tests

loop {
let current_time = Instant::now();
let elapsed = current_time.duration_since(last_collect_time).as_secs();
if elapsed >= SLIDING_WINDOW_SIZE {
let total_count_u64 = shared_mutable_stats_slice.sum();
last_collect_time = Instant::now();
let current_count = total_count_u64 - total_count_old;
total_count_old = total_count_u64;
let throughput = current_count / elapsed;
println!(
"Throughput: {} iterations/sec",
throughput.to_formatted_string(&Locale::en)
);

#[cfg(feature = "stats")]
{
system.refresh_all();
if let Some(process) = system.process(pid) {
println!(
"Memory usage: {:.2} MB",
process.memory() as f64 / (1024.0 * 1024.0)
);
println!("CPU usage: {}%", process.cpu_usage() / num_threads as f32);
println!(
"Virtual memory usage: {:.2} MB",
process.virtual_memory() as f64 / (1024.0 * 1024.0)
);
} else {
println!("Process not found");
}

Check warning on line 109 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L81-L109

Added lines #L81 - L109 were not covered by tests
}

println!("\n");
}

Check warning on line 113 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L112-L113

Added lines #L112 - L113 were not covered by tests

if STOP.load(Ordering::SeqCst) {
break;
}

thread::sleep(Duration::from_millis(5000));

Check warning on line 119 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L115-L119

Added lines #L115 - L119 were not covered by tests
}
});

Check warning on line 121 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L121

Added line #L121 was not covered by tests

for thread_index in 0..num_threads {
let func_arc_clone = Arc::clone(&func_arc);
s.spawn(move || loop {
func_arc_clone();
unsafe {
shared_mutable_stats_slice.increment(thread_index);
}
if STOP.load(Ordering::SeqCst) {
break;
}
});
}
});
}

Check warning on line 136 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L123-L136

Added lines #L123 - L136 were not covered by tests

#[derive(Copy, Clone)]
struct UnsafeSlice<'a> {
slice: &'a [UnsafeCell<WorkerStats>],
}

unsafe impl Send for UnsafeSlice<'_> {}
unsafe impl Sync for UnsafeSlice<'_> {}

impl<'a> UnsafeSlice<'a> {
fn new(slice: &'a mut [WorkerStats]) -> Self {
let ptr = slice as *mut [WorkerStats] as *const [UnsafeCell<WorkerStats>];
Self {
slice: unsafe { &*ptr },
}
}

Check warning on line 152 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L147-L152

Added lines #L147 - L152 were not covered by tests

// SAFETY: It's assumed that no two threads will write to the same index at the same time
#[inline(always)]
unsafe fn increment(&self, i: usize) {
let value = self.slice[i].get();
(*value).count += 1;
}

Check warning on line 159 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L156-L159

Added lines #L156 - L159 were not covered by tests

#[inline(always)]
fn sum(&self) -> u64 {
self.slice
.iter()
.map(|cell| unsafe { (*cell.get()).count })
.sum()
}

Check warning on line 167 in stress/src/throughput.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/throughput.rs#L162-L167

Added lines #L162 - L167 were not covered by tests
}
38 changes: 38 additions & 0 deletions stress/src/user_events_enabled.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// To run the test, execute the following command in the stress directory as sudo:
// sudo -E ~/.cargo/bin/cargo rnu --bin user-events-enabled --release

// TODO : Add stess result here.

mod throughput;
use eventheader_dynamic::{Provider, ProviderOptions};
use lazy_static::lazy_static;

// Global constants for level and keyword
const LEVEL: u8 = 4; // Example level (Informational)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of suggestions

  1. Lets add one test with and without tracepoint being enabled.
  2. Instead of testing just the event header, lets test via OTel itself, so we know the overall cost/contention, not just that of EventHeader crate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add one test with and without tracepoint being enabled.

Yes, I thought about it. This needs some thinking, as there needs to be some listener for the tracepoint. Can we do it as separate PR?

Instead of testing just the event header, lets test via OTel itself, so we know the overall cost/contention, not just that of EventHeader crate.

This test was specifically for cost incurred for eventheader crate. Plan is to add separate test for the actual export and enabled check through Otel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is only testing event header (that should be done by that repo!), lets rename to "eventheader_enabled" / similar.

const KEYWORD: u64 = 0x01; // Example keyword

lazy_static! {
static ref PROVIDER: Provider = {
// Initialize the Provider with dynamic options
let mut options = ProviderOptions::new();
options = *options.group_name("testprovider");
let mut provider = Provider::new("testprovider", &options);

// Register events with specific levels and keywords
provider.register_set(LEVEL.into(), KEYWORD);

provider
};
}

fn main() {
// Execute the throughput test with the test_log function
throughput::test_throughput(test_user_events_enabled);
}

Check warning on line 31 in stress/src/user_events_enabled.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/user_events_enabled.rs#L28-L31

Added lines #L28 - L31 were not covered by tests

fn test_user_events_enabled() {

Check warning on line 33 in stress/src/user_events_enabled.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/user_events_enabled.rs#L33

Added line #L33 was not covered by tests
// Find and check if the event is enabled
if let Some(event_set) = PROVIDER.find_set(LEVEL.into(), KEYWORD) {
let _ = event_set.enabled(); // Perform the enabled check
}
}

Check warning on line 38 in stress/src/user_events_enabled.rs

View check run for this annotation

Codecov / codecov/patch

stress/src/user_events_enabled.rs#L35-L38

Added lines #L35 - L38 were not covered by tests
Loading