diff --git a/sim-lib/src/defined_activity.rs b/sim-lib/src/defined_activity.rs index c40b2762..7ce20d98 100644 --- a/sim-lib/src/defined_activity.rs +++ b/sim-lib/src/defined_activity.rs @@ -1,4 +1,6 @@ -use crate::{DestinationGenerator, NodeInfo, PaymentGenerationError, PaymentGenerator}; +use crate::{ + DestinationGenerator, NodeInfo, PayGenDisplay, PaymentGenerationError, PaymentGenerator, +}; use core::fmt; use std::fmt::Display; use tokio::time::Duration; @@ -20,6 +22,8 @@ impl DefinedPaymentActivity { } } +impl PayGenDisplay for DefinedPaymentActivity {} + impl Display for DefinedPaymentActivity { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index e97078d5..9c6aca7b 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -342,6 +342,19 @@ pub struct Simulation { no_results: bool, } +/// PayGenDisplay is a marker trait to allow creation of trait objects for printable payment generators. +pub trait PayGenDisplay: PaymentGenerator + Display {} + +/// ExecutorKit contains the components required to spin up an activity configured by the user, to be used to +/// spin up the appropriate producers and consumers for the activity. +struct ExecutorKit { + source_info: NodeInfo, + /// We use an arc mutex here because some implementations of the trait will be very expensive to clone. + /// See [NetworkGraphView] for details. + network_generator: Arc>, + payment_generator: Box, +} + impl Simulation { pub fn new( nodes: HashMap>>, @@ -467,39 +480,20 @@ impl Simulation { let (event_sender, event_receiver) = channel(1); self.run_data_collection(event_receiver, &mut tasks); - // Create consumers for every source node when dealing with activity descriptions, or only for nodes with - // sufficient capacity if generating random activity. Since we have to query the capacity of every node - // in our network for picking random activity nodes, we cache this value here to be used later when we spin - // up producers. - let mut random_activity_nodes = HashMap::new(); - let collecting_nodes = if !self.activity.is_empty() { - self.activity + // Get an execution kit per activity that we need to generate and spin up consumers for each source node. + let activities = self.activity_executors().await?; + let consumer_channels = self.dispatch_consumers( + activities .iter() - .map(|activity| activity.source.pubkey) - .collect() - } else { - random_activity_nodes.extend(self.random_activity_nodes().await?); - random_activity_nodes.keys().cloned().collect() - }; - - let consumer_channels = - self.dispatch_consumers(collecting_nodes, event_sender.clone(), &mut tasks); + .map(|generator| generator.source_info.pubkey) + .collect(), + event_sender.clone(), + &mut tasks, + ); - // Next, we'll spin up our actual activity generator that will be responsible for triggering the activity that - // has been configured (if any), passing in the channel that is used to notify data collection that events have - // been generated. Alternatively, we'll generate random activity if there is no activity specified. - if !self.activity.is_empty() { - self.dispatch_activity_producers(consumer_channels, &mut tasks) - .await; - } else { - log::info!( - "Generating random activity with multiplier: {}, average payment amount: {}.", - self.activity_multiplier, - self.expected_payment_msat - ); - self.dispatch_random_producers(random_activity_nodes, consumer_channels, &mut tasks) - .await?; - } + // Next, we'll spin up our actual producers that will be responsible for triggering the configured activity. + self.dispatch_producers(activities, consumer_channels, &mut tasks) + .await?; if let Some(total_time) = self.total_time { let t = self.shutdown_trigger.clone(); @@ -572,14 +566,45 @@ impl Simulation { log::debug!("Simulator data collection set up."); } + async fn activity_executors(&self) -> Result, SimulationError> { + let mut generators = Vec::new(); + + // Note: when we allow configuring both defined and random activity, this will no longer be an if/else, we'll + // just populate with each type as configured. + if !self.activity.is_empty() { + for description in self.activity.iter() { + let activity_generator = DefinedPaymentActivity::new( + description.destination.clone(), + Duration::from_secs(description.interval_secs.into()), + description.amount_msat, + ); + + generators.push(ExecutorKit { + source_info: description.source.clone(), + // Defined activities have very simple generators, so the traits required are implemented on + // a single struct which we just cheaply clone. + network_generator: Arc::new(Mutex::new(activity_generator.clone())), + payment_generator: Box::new(activity_generator), + }); + } + } else { + generators = self.random_activity_nodes().await?; + } + + Ok(generators) + } + /// Returns the list of nodes that are eligible for generating random activity on. This is the subset of nodes /// that have sufficient capacity to generate payments of our expected payment amount. - async fn random_activity_nodes( - &self, - ) -> Result, SimulationError> { + async fn random_activity_nodes(&self) -> Result, SimulationError> { // Collect capacity of each node from its view of its own channels. Total capacity is divided by two to // avoid double counting capacity (as each node has a counterparty in the channel). - let mut node_capacities = HashMap::new(); + let mut generators = Vec::new(); + let mut active_nodes = HashMap::new(); + + // Do a first pass to get the capacity of each node which we need to be able to create a network generator. + // While we're at it, we get the node info and store it with capacity to create activity generators in our + // second pass. for (pk, node) in self.nodes.iter() { let chan_capacity = node.lock().await.list_channels().await?.iter().sum::(); @@ -590,16 +615,39 @@ impl Simulation { continue; } - node_capacities.insert( - *pk, - ( - node.lock().await.get_node_info(pk).await?, - chan_capacity / 2, + // Don't double count channel capacity because each channel reports the total balance between counter + // parities. Track capacity separately to be used for our network generator. + let capacity = chan_capacity / 2; + let node_info = node.lock().await.get_node_info(pk).await?; + active_nodes.insert(node_info.pubkey, (node_info, capacity)); + } + + let network_generator = Arc::new(Mutex::new( + NetworkGraphView::new(active_nodes.values().cloned().collect()) + .map_err(SimulationError::RandomActivityError)?, + )); + + log::info!( + "Created network generator: {}.", + network_generator.lock().await + ); + + for (node_info, capacity) in active_nodes.values() { + generators.push(ExecutorKit { + source_info: node_info.clone(), + network_generator: network_generator.clone(), + payment_generator: Box::new( + RandomPaymentActivity::new( + *capacity, + self.expected_payment_msat, + self.activity_multiplier, + ) + .map_err(SimulationError::RandomActivityError)?, ), - ); + }); } - Ok(node_capacities) + Ok(generators) } /// Responsible for spinning up consumer tasks for each node specified in consuming_nodes. Assumes that validation @@ -636,73 +684,31 @@ impl Simulation { channels } - /// Responsible for spinning up producers for a set of activity descriptions. - async fn dispatch_activity_producers( + /// Responsible for spinning up producers for a set of activities. Requires that a consumer channel is present + /// for every source node in the set of executors. + async fn dispatch_producers( &self, + executors: Vec, producer_channels: HashMap>, tasks: &mut JoinSet<()>, - ) { - for description in self.activity.iter() { - let sender_chan = producer_channels.get(&description.source.pubkey).unwrap(); - let generator = DefinedPaymentActivity::new( - description.destination.clone(), - Duration::from_secs(description.interval_secs.into()), - description.amount_msat, - ); - - tasks.spawn(produce_events( - description.source.clone(), - Arc::new(Mutex::new(generator.clone())), - generator, - sender_chan.clone(), - self.shutdown_trigger.clone(), - self.shutdown_listener.clone(), - )); - } - } - - /// Responsible for spinning up producers for a set of activity descriptions. Requires that node capacities are - /// provided for each node represented in producer channels. - async fn dispatch_random_producers( - &self, - node_capacities: HashMap, - consumer_channels: HashMap>, - tasks: &mut JoinSet<()>, ) -> Result<(), SimulationError> { - let network_generator = Arc::new(Mutex::new( - NetworkGraphView::new(node_capacities.values().cloned().collect()) - .map_err(SimulationError::RandomActivityError)?, - )); - - log::info!( - "Created network generator: {}.", - network_generator.lock().await - ); - - for (pk, sender) in consumer_channels.into_iter() { - let (info, source_capacity) = match node_capacities.get(&pk) { - Some((info, capacity)) => (info.clone(), *capacity), + for executor in executors { + let sender = match producer_channels.get(&executor.source_info.pubkey) { + Some(producer) => producer, None => { return Err(SimulationError::RandomActivityError( RandomActivityError::ValueError(format!( - "Random activity generator run for: {} with unknown capacity.", - pk + "Activity producer for: {} not found.", + executor.source_info.pubkey, )), - )); + )) } }; - let node_generator = RandomPaymentActivity::new( - source_capacity, - self.expected_payment_msat, - self.activity_multiplier, - ) - .map_err(SimulationError::RandomActivityError)?; - tasks.spawn(produce_events( - info, - network_generator.clone(), - node_generator, + executor.source_info, + executor.network_generator, + executor.payment_generator, sender.clone(), self.shutdown_trigger.clone(), self.shutdown_listener.clone(), @@ -784,10 +790,10 @@ async fn consume_events( } } -async fn produce_events( +async fn produce_events( source: NodeInfo, network_generator: Arc>, - node_generator: A, + node_generator: Box, sender: Sender, shutdown: Trigger, listener: Listener, diff --git a/sim-lib/src/random_activity.rs b/sim-lib/src/random_activity.rs index d2e29c00..b8cfc64d 100644 --- a/sim-lib/src/random_activity.rs +++ b/sim-lib/src/random_activity.rs @@ -6,7 +6,9 @@ use bitcoin::secp256k1::PublicKey; use rand_distr::{Distribution, Exp, LogNormal, WeightedIndex}; use std::time::Duration; -use crate::{DestinationGenerator, NodeInfo, PaymentGenerationError, PaymentGenerator}; +use crate::{ + DestinationGenerator, NodeInfo, PayGenDisplay, PaymentGenerationError, PaymentGenerator, +}; const HOURS_PER_MONTH: u64 = 30 * 24; const SECONDS_PER_MONTH: u64 = HOURS_PER_MONTH * 60 * 60; @@ -260,6 +262,8 @@ impl Display for RandomPaymentActivity { } } +impl PayGenDisplay for RandomPaymentActivity {} + #[cfg(test)] mod tests { mod test_network_graph_view {