Skip to content

Commit

Permalink
sim-lib: unify dispatch of events across random and defined activity
Browse files Browse the repository at this point in the history
  • Loading branch information
carlaKC committed Jan 19, 2024
1 parent d5018eb commit a548819
Showing 1 changed file with 13 additions and 54 deletions.
67 changes: 13 additions & 54 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tokio::task::JoinSet;
use tokio::{select, time, time::Duration};
use triggered::{Listener, Trigger};

use self::defined_activity::DefinedPaymentActivity;
use self::random_activity::{NetworkGraphView, RandomPaymentActivity};

pub mod cln;
Expand Down Expand Up @@ -647,8 +648,16 @@ impl Simulation {
) {
for description in self.activity.iter() {
let sender_chan = producer_channels.get(&description.source.pubkey).unwrap();
let generator = DefinedPaymentActivity::new(
description.destination.clone(),
Duration::from_secs(description.interval_secs.into()),
description.amount_msat,
);

tasks.spawn(produce_events(
description.clone(),
description.source.clone(),
Arc::new(Mutex::new(generator.clone())),
generator,
sender_chan.clone(),
self.shutdown_trigger.clone(),
self.shutdown_listener.clone(),
Expand Down Expand Up @@ -694,7 +703,7 @@ impl Simulation {
)
.map_err(SimulationError::RandomActivityError)?;

tasks.spawn(produce_random_events(
tasks.spawn(produce_events(
info,
network_generator.clone(),
node_generator,
Expand Down Expand Up @@ -779,65 +788,15 @@ async fn consume_events(
}
}

// produce events generates events for the activity description provided. It accepts a shutdown listener so it can
// exit if other threads signal that they have errored out.
async fn produce_events(
act: ActivityDefinition,
sender: Sender<SimulationEvent>,
shutdown: Trigger,
listener: Listener,
) {
let interval = time::Duration::from_secs(act.interval_secs as u64);

log::debug!(
"Started producer for {} every {}s: {} -> {}.",
act.amount_msat,
act.interval_secs,
act.source,
act.destination
);

loop {
tokio::select! {
biased;
_ = time::sleep(interval) => {
// Consumer was dropped
if sender.send(SimulationEvent::SendPayment(act.destination.clone(), act.amount_msat)).await.is_err() {
log::debug!(
"Stopped producer for {}: {} -> {}. Consumer cannot be reached.",
act.amount_msat,
act.source,
act.destination
);
break;
}
}
_ = listener.clone() => {
// Shutdown was signaled
log::debug!(
"Stopped producer for {}: {} -> {}. Received shutdown signal.",
act.amount_msat,
act.source,
act.destination
);
break;
}
}
}

// On exit call our shutdown trigger to inform other threads that we have exited, and they need to shut down.
shutdown.trigger();
}

async fn produce_random_events<N: DestinationGenerator, A: PaymentGenerator + Display>(
async fn produce_events<N: DestinationGenerator, A: PaymentGenerator + Display>(
source: NodeInfo,
network_generator: Arc<Mutex<N>>,
node_generator: A,
sender: Sender<SimulationEvent>,
shutdown: Trigger,
listener: Listener,
) {
log::info!("Started random activity producer for {source}: {node_generator}.");
log::info!("Started activity producer for {source}: {node_generator}.");

loop {
let wait = node_generator.next_payment_wait();
Expand Down

0 comments on commit a548819

Please sign in to comment.