Skip to content

Commit

Permalink
Merge pull request #76 from okjodom/bp
Browse files Browse the repository at this point in the history
batch print results to csv
  • Loading branch information
okjodom authored Aug 28, 2023
2 parents daec255 + 11bc783 commit a5003f2
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 13 deletions.
5 changes: 4 additions & 1 deletion sim-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ struct Cli {
config: PathBuf,
#[clap(long, short)]
total_time: Option<u32>,
/// Number of activity results to batch together before printing to csv file
#[clap(long, short)]
print_batch_size: Option<u32>,
#[clap(long, short)]
log_level: LevelFilter,
}
Expand Down Expand Up @@ -66,7 +69,7 @@ async fn main() -> anyhow::Result<()> {
}
}

let sim = Simulation::new(clients, activity, cli.total_time);
let sim = Simulation::new(clients, activity, cli.total_time, cli.print_batch_size);
let sim2 = sim.clone();

ctrlc::set_handler(move || {
Expand Down
42 changes: 30 additions & 12 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use lightning::ln::PaymentHash;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::marker::Send;
use std::mem::size_of;
use std::{collections::HashMap, sync::Arc, time::SystemTime};
use thiserror::Error;
use tokio::sync::mpsc::{channel, Receiver, Sender};
Expand Down Expand Up @@ -183,13 +182,18 @@ pub struct Simulation {
shutdown_listener: Listener,
// Total simulation time. The simulation will run forever if undefined.
total_time: Option<time::Duration>,
/// The number of activity results to batch before printing in CSV.
print_batch_size: u32,
}

const DEFAULT_PRINT_BATCH_SIZE: u32 = 500;

impl Simulation {
pub fn new(
nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode + Send>>>,
activity: Vec<ActivityDefinition>,
total_time: Option<u32>,
print_batch_size: Option<u32>,
) -> Self {
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
Self {
Expand All @@ -198,6 +202,7 @@ impl Simulation {
shutdown_trigger,
shutdown_listener,
total_time: total_time.map(|x| Duration::from_secs(x as u64)),
print_batch_size: print_batch_size.unwrap_or(DEFAULT_PRINT_BATCH_SIZE),
}
}

Expand Down Expand Up @@ -306,6 +311,7 @@ impl Simulation {
tasks: &mut JoinSet<()>,
) {
let listener = self.shutdown_listener.clone();
let print_batch_size = self.print_batch_size;
log::debug!("Simulator data recording starting.");

// Create a sender/receiver pair that will be used to report final results of action outcomes.
Expand All @@ -318,7 +324,11 @@ impl Simulation {
listener.clone(),
));

tasks.spawn(consume_simulation_results(results_receiver, listener));
tasks.spawn(consume_simulation_results(
results_receiver,
listener,
print_batch_size,
));
log::debug!("Simulator data recording exiting.");
}

Expand Down Expand Up @@ -487,10 +497,11 @@ async fn produce_events(
async fn consume_simulation_results(
receiver: Receiver<(DispatchedPayment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
) {
log::debug!("Simulation results consumer started.");

if let Err(e) = write_payment_results(receiver, listener).await {
if let Err(e) = write_payment_results(receiver, listener, print_batch_size).await {
log::error!("Error while reporting payment results: {:?}", e);
}

Expand All @@ -500,19 +511,19 @@ async fn consume_simulation_results(
async fn write_payment_results(
mut receiver: Receiver<(DispatchedPayment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
) -> Result<(), SimulationError> {
let mut writer = WriterBuilder::new()
.buffer_capacity(size_of::<(DispatchedPayment, PaymentResult)>() * 5)
.from_path(format!(
"simulation_{:?}.csv",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
))?;
let mut writer = WriterBuilder::new().from_path(format!(
"simulation_{:?}.csv",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
))?;

let mut result_logger = PaymentResultLogger::new();

let mut counter = 1;
loop {
tokio::select! {
biased;
Expand All @@ -530,6 +541,13 @@ async fn write_payment_results(
let _ = writer.flush();
SimulationError::CsvError(e)
})?;

if print_batch_size == counter {
writer.flush().map_err(|_| SimulationError::FileError)?;
counter = 1;
} else {
counter += 1;
}
continue;
},
None => {
Expand Down

0 comments on commit a5003f2

Please sign in to comment.