diff --git a/sim-cli/src/main.rs b/sim-cli/src/main.rs index 2a5bb942..1d3d7c1c 100644 --- a/sim-cli/src/main.rs +++ b/sim-cli/src/main.rs @@ -16,6 +16,9 @@ struct Cli { config: PathBuf, #[clap(long, short)] total_time: Option, + /// Number of activity results to batch together before printing to csv file + #[clap(long, short)] + print_batch_size: Option, #[clap(long, short)] log_level: LevelFilter, } @@ -66,7 +69,7 @@ async fn main() -> anyhow::Result<()> { } } - let sim = Simulation::new(clients, activity, cli.total_time); + let sim = Simulation::new(clients, activity, cli.total_time, cli.print_batch_size); let sim2 = sim.clone(); ctrlc::set_handler(move || { diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 805221c9..eab9e4fc 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -6,7 +6,6 @@ use lightning::ln::PaymentHash; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::marker::Send; -use std::mem::size_of; use std::{collections::HashMap, sync::Arc, time::SystemTime}; use thiserror::Error; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -183,13 +182,18 @@ pub struct Simulation { shutdown_listener: Listener, // Total simulation time. The simulation will run forever if undefined. total_time: Option, + /// The number of activity results to batch before printing in CSV. + print_batch_size: u32, } +const DEFAULT_PRINT_BATCH_SIZE: u32 = 500; + impl Simulation { pub fn new( nodes: HashMap>>, activity: Vec, total_time: Option, + print_batch_size: Option, ) -> Self { let (shutdown_trigger, shutdown_listener) = triggered::trigger(); Self { @@ -198,6 +202,7 @@ impl Simulation { shutdown_trigger, shutdown_listener, total_time: total_time.map(|x| Duration::from_secs(x as u64)), + print_batch_size: print_batch_size.unwrap_or(DEFAULT_PRINT_BATCH_SIZE), } } @@ -306,6 +311,7 @@ impl Simulation { tasks: &mut JoinSet<()>, ) { let listener = self.shutdown_listener.clone(); + let print_batch_size = self.print_batch_size; log::debug!("Simulator data recording starting."); // Create a sender/receiver pair that will be used to report final results of action outcomes. @@ -318,7 +324,11 @@ impl Simulation { listener.clone(), )); - tasks.spawn(consume_simulation_results(results_receiver, listener)); + tasks.spawn(consume_simulation_results( + results_receiver, + listener, + print_batch_size, + )); log::debug!("Simulator data recording exiting."); } @@ -487,10 +497,11 @@ async fn produce_events( async fn consume_simulation_results( receiver: Receiver<(DispatchedPayment, PaymentResult)>, listener: Listener, + print_batch_size: u32, ) { log::debug!("Simulation results consumer started."); - if let Err(e) = write_payment_results(receiver, listener).await { + if let Err(e) = write_payment_results(receiver, listener, print_batch_size).await { log::error!("Error while reporting payment results: {:?}", e); } @@ -500,19 +511,19 @@ async fn consume_simulation_results( async fn write_payment_results( mut receiver: Receiver<(DispatchedPayment, PaymentResult)>, listener: Listener, + print_batch_size: u32, ) -> Result<(), SimulationError> { - let mut writer = WriterBuilder::new() - .buffer_capacity(size_of::<(DispatchedPayment, PaymentResult)>() * 5) - .from_path(format!( - "simulation_{:?}.csv", - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() - ))?; + let mut writer = WriterBuilder::new().from_path(format!( + "simulation_{:?}.csv", + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() + ))?; let mut result_logger = PaymentResultLogger::new(); + let mut counter = 1; loop { tokio::select! { biased; @@ -530,6 +541,13 @@ async fn write_payment_results( let _ = writer.flush(); SimulationError::CsvError(e) })?; + + if print_batch_size == counter { + writer.flush().map_err(|_| SimulationError::FileError)?; + counter = 1; + } else { + counter += 1; + } continue; }, None => {