From d7e7900796d3757a819679d790f4aa03c3f0e28d Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Fri, 19 Jan 2024 09:36:36 -0500 Subject: [PATCH 01/12] sim-lib/refactor: rename NetworkGenerator to DestinationGenerator This trait is used to pick destinations for sending payments, so we rename it to more accurately reflect this. --- sim-lib/src/lib.rs | 4 ++-- sim-lib/src/random_activity.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 51f9e893..5cdc8fcd 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -201,7 +201,7 @@ pub trait LightningNode { async fn list_channels(&mut self) -> Result, LightningError>; } -pub trait NetworkGenerator { +pub trait DestinationGenerator { // sample_node_by_capacity randomly picks a node within the network weighted by its capacity deployed to the // network in channels. It returns the node's public key and its capacity in millisatoshis. fn sample_node_by_capacity(&self, source: PublicKey) -> (NodeInfo, u64); @@ -826,7 +826,7 @@ async fn produce_events( shutdown.trigger(); } -async fn produce_random_events( +async fn produce_random_events( source: NodeInfo, network_generator: Arc>, node_generator: A, diff --git a/sim-lib/src/random_activity.rs b/sim-lib/src/random_activity.rs index 63606d72..88173926 100644 --- a/sim-lib/src/random_activity.rs +++ b/sim-lib/src/random_activity.rs @@ -6,7 +6,7 @@ use bitcoin::secp256k1::PublicKey; use rand_distr::{Distribution, Exp, LogNormal, WeightedIndex}; use std::time::Duration; -use crate::{NetworkGenerator, NodeInfo, PaymentGenerationError, PaymentGenerator}; +use crate::{DestinationGenerator, NodeInfo, PaymentGenerationError, PaymentGenerator}; const HOURS_PER_MONTH: u64 = 30 * 24; const SECONDS_PER_MONTH: u64 = HOURS_PER_MONTH * 60 * 60; @@ -56,7 +56,7 @@ impl NetworkGraphView { } } -impl NetworkGenerator for NetworkGraphView { +impl DestinationGenerator for NetworkGraphView { /// Randomly samples the network for a node, weighted by capacity. Using a single graph view means that it's /// possible for a source node to select itself. After sufficient retries, this is highly improbable (even with /// very small graphs, or those with one node significantly more capitalized than others). From 7f31747130996bb614fd5da5dcb1db8844b141bb Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Fri, 19 Jan 2024 09:38:51 -0500 Subject: [PATCH 02/12] sim-lib/refactor: generalize naming of sample_node_by_capacity In preparation for different implementations of this trait, we rename sample_node_by_capacity to a more general choose_destination to accommodate different ways of picking destinations. --- sim-lib/src/lib.rs | 8 ++++---- sim-lib/src/random_activity.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 5cdc8fcd..b9fbba4d 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -202,9 +202,9 @@ pub trait LightningNode { } pub trait DestinationGenerator { - // sample_node_by_capacity randomly picks a node within the network weighted by its capacity deployed to the - // network in channels. It returns the node's public key and its capacity in millisatoshis. - fn sample_node_by_capacity(&self, source: PublicKey) -> (NodeInfo, u64); + // choose_destination picks a destination node within the network, returning the node's information and its + // capacity. + fn choose_destination(&self, source: PublicKey) -> (NodeInfo, u64); } #[derive(Debug, Error)] @@ -849,7 +849,7 @@ async fn produce_random_events { - let (destination, capacity) = network_generator.lock().await.sample_node_by_capacity(source.pubkey); + let (destination, capacity) = network_generator.lock().await.choose_destination(source.pubkey); // Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get // a payment amount something has gone wrong (because we should have validated that we can always diff --git a/sim-lib/src/random_activity.rs b/sim-lib/src/random_activity.rs index 88173926..5467f3b1 100644 --- a/sim-lib/src/random_activity.rs +++ b/sim-lib/src/random_activity.rs @@ -60,7 +60,7 @@ impl DestinationGenerator for NetworkGraphView { /// Randomly samples the network for a node, weighted by capacity. Using a single graph view means that it's /// possible for a source node to select itself. After sufficient retries, this is highly improbable (even with /// very small graphs, or those with one node significantly more capitalized than others). - fn sample_node_by_capacity(&self, source: PublicKey) -> (NodeInfo, u64) { + fn choose_destination(&self, source: PublicKey) -> (NodeInfo, u64) { let mut rng = rand::thread_rng(); // While it's very unlikely that we can't pick a destination that is not our source, it's possible that there's @@ -333,7 +333,7 @@ mod tests { let view = NetworkGraphView::new(nodes).unwrap(); for _ in 0..10 { - view.sample_node_by_capacity(big_node); + view.choose_destination(big_node); } } } From 734b799c88d764bef6c881bad33d3cfebb491633 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Fri, 19 Jan 2024 09:40:59 -0500 Subject: [PATCH 03/12] sim-lib/refactor: rename PaymentActivityGenerator to RandomPaymentActivity In preparation for other types of payment activity generators, rename random implementation to be more specific. --- sim-lib/src/lib.rs | 11 +++++------ sim-lib/src/random_activity.rs | 35 +++++++++++++--------------------- 2 files changed, 18 insertions(+), 28 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index b9fbba4d..42736181 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -19,7 +19,7 @@ use tokio::task::JoinSet; use tokio::{select, time, time::Duration}; use triggered::{Listener, Trigger}; -use self::random_activity::{NetworkGraphView, PaymentActivityGenerator}; +use self::random_activity::{NetworkGraphView, RandomPaymentActivity}; pub mod cln; pub mod lnd; @@ -582,10 +582,9 @@ impl Simulation { for (pk, node) in self.nodes.iter() { let chan_capacity = node.lock().await.list_channels().await?.iter().sum::(); - if let Err(e) = PaymentActivityGenerator::validate_capacity( - chan_capacity, - self.expected_payment_msat, - ) { + if let Err(e) = + RandomPaymentActivity::validate_capacity(chan_capacity, self.expected_payment_msat) + { log::warn!("Node: {} not eligible for activity generation: {e}.", *pk); continue; } @@ -684,7 +683,7 @@ impl Simulation { } }; - let node_generator = PaymentActivityGenerator::new( + let node_generator = RandomPaymentActivity::new( source_capacity, self.expected_payment_msat, self.activity_multiplier, diff --git a/sim-lib/src/random_activity.rs b/sim-lib/src/random_activity.rs index 5467f3b1..23e38ec1 100644 --- a/sim-lib/src/random_activity.rs +++ b/sim-lib/src/random_activity.rs @@ -95,14 +95,14 @@ impl Display for NetworkGraphView { /// While the expected amount to be sent in a month and the mean payment amount are set, the generator will introduce /// randomness both in the time between events and the variance of payment amounts sent to mimic more realistic /// payment flows. -pub struct PaymentActivityGenerator { +pub struct RandomPaymentActivity { multiplier: f64, expected_payment_amt: u64, source_capacity: u64, event_dist: Exp, } -impl PaymentActivityGenerator { +impl RandomPaymentActivity { /// Creates a new activity generator for a node, returning an error if the node has insufficient capacity deployed /// for the expected payment amount provided. Capacity is defined as the sum of the channels that the node has /// open in the network divided by two (so that capacity is not double counted with channel counterparties). @@ -129,7 +129,7 @@ impl PaymentActivityGenerator { )); } - PaymentActivityGenerator::validate_capacity(source_capacity_msat, expected_payment_amt)?; + RandomPaymentActivity::validate_capacity(source_capacity_msat, expected_payment_amt)?; // Lamda for the exponential distribution that we'll use to randomly time events is equal to the number of // events that we expect to see within our set period. @@ -140,7 +140,7 @@ impl PaymentActivityGenerator { let event_dist = Exp::new(lamda).map_err(|e| RandomActivityError::ValueError(e.to_string()))?; - Ok(PaymentActivityGenerator { + Ok(RandomPaymentActivity { multiplier, expected_payment_amt, source_capacity: source_capacity_msat, @@ -193,7 +193,7 @@ fn events_per_month(source_capacity_msat: u64, multiplier: f64, expected_payment (source_capacity_msat as f64 * multiplier) / expected_payment_amt as f64 } -impl PaymentGenerator for PaymentActivityGenerator { +impl PaymentGenerator for RandomPaymentActivity { /// Returns the amount of time until the next payment should be scheduled for the node. fn next_payment_wait(&self) -> Duration { let mut rng = rand::thread_rng(); @@ -230,7 +230,7 @@ impl PaymentGenerator for PaymentActivityGenerator { } } -impl Display for PaymentActivityGenerator { +impl Display for RandomPaymentActivity { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let monthly_events = events_per_month( self.source_capacity, @@ -349,33 +349,25 @@ mod tests { // here. Mainly, if the `capacity < expected_payment_amnt / 2`, the generator will fail building let expected_payment = get_random_int(1, 100); assert!( - PaymentActivityGenerator::new(2 * expected_payment, expected_payment, 1.0).is_ok() + RandomPaymentActivity::new(2 * expected_payment, expected_payment, 1.0).is_ok() ); assert!(matches!( - PaymentActivityGenerator::new(2 * expected_payment, expected_payment + 1, 1.0), + RandomPaymentActivity::new(2 * expected_payment, expected_payment + 1, 1.0), Err(RandomActivityError::InsufficientCapacity { .. }) )); // Respecting the internal exponential distribution creation, neither of the parameters can be zero. Otherwise we may try to create an exponential // function with lambda = NaN, which will error out, or with lambda = Inf, which does not make sense for our use-case assert!(matches!( - PaymentActivityGenerator::new( - 0, - get_random_int(1, 10), - get_random_int(1, 10) as f64 - ), + RandomPaymentActivity::new(0, get_random_int(1, 10), get_random_int(1, 10) as f64), Err(RandomActivityError::ValueError { .. }) )); assert!(matches!( - PaymentActivityGenerator::new( - get_random_int(1, 10), - 0, - get_random_int(1, 10) as f64 - ), + RandomPaymentActivity::new(get_random_int(1, 10), 0, get_random_int(1, 10) as f64), Err(RandomActivityError::ValueError { .. }) )); assert!(matches!( - PaymentActivityGenerator::new(get_random_int(1, 10), get_random_int(1, 10), 0.0), + RandomPaymentActivity::new(get_random_int(1, 10), get_random_int(1, 10), 0.0), Err(RandomActivityError::ValueError { .. }) )); } @@ -388,7 +380,7 @@ mod tests { let capacity = get_random_int(0, 100); let payment_amt = get_random_int(0, 100); - let r = PaymentActivityGenerator::validate_capacity(capacity, payment_amt); + let r = RandomPaymentActivity::validate_capacity(capacity, payment_amt); if capacity < 2 * payment_amt { assert!(matches!( r, @@ -408,8 +400,7 @@ mod tests { // All of them will yield a sigma squared smaller than 0, which we have a sanity check for. let expected_payment = get_random_int(1, 100); let source_capacity = 2 * expected_payment; - let pag = - PaymentActivityGenerator::new(source_capacity, expected_payment, 1.0).unwrap(); + let pag = RandomPaymentActivity::new(source_capacity, expected_payment, 1.0).unwrap(); // Wrong cases for i in 0..source_capacity { From db8220e45bd9a6b74c51c2ede40369e5feb7b842 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Fri, 17 Nov 2023 15:44:03 -0500 Subject: [PATCH 04/12] sim-lib: allow optional payment amount in DestinationGenerator In preparation for implementing our generation traits for defined activities, we loosen the tie to destination capacity in our trait definition (which is a characteristic of the random activity generation implementation). --- sim-lib/src/lib.rs | 11 +++++++---- sim-lib/src/random_activity.rs | 28 ++++++++++++++++++++-------- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 42736181..8d9f1909 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -203,8 +203,8 @@ pub trait LightningNode { pub trait DestinationGenerator { // choose_destination picks a destination node within the network, returning the node's information and its - // capacity. - fn choose_destination(&self, source: PublicKey) -> (NodeInfo, u64); + // capacity (if available). + fn choose_destination(&self, source: PublicKey) -> (NodeInfo, Option); } #[derive(Debug, Error)] @@ -214,8 +214,11 @@ pub trait PaymentGenerator { // Returns the number of seconds that a node should wait until firing its next payment. fn next_payment_wait(&self) -> time::Duration; - // Returns a payment amount based on the capacity of the sending and receiving node. - fn payment_amount(&self, destination_capacity: u64) -> Result; + // Returns a payment amount based, with a destination capacity optionally provided to inform the amount picked. + fn payment_amount( + &self, + destination_capacity: Option, + ) -> Result; } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/sim-lib/src/random_activity.rs b/sim-lib/src/random_activity.rs index 23e38ec1..12a6c4a7 100644 --- a/sim-lib/src/random_activity.rs +++ b/sim-lib/src/random_activity.rs @@ -60,7 +60,7 @@ impl DestinationGenerator for NetworkGraphView { /// Randomly samples the network for a node, weighted by capacity. Using a single graph view means that it's /// possible for a source node to select itself. After sufficient retries, this is highly improbable (even with /// very small graphs, or those with one node significantly more capitalized than others). - fn choose_destination(&self, source: PublicKey) -> (NodeInfo, u64) { + fn choose_destination(&self, source: PublicKey) -> (NodeInfo, Option) { let mut rng = rand::thread_rng(); // While it's very unlikely that we can't pick a destination that is not our source, it's possible that there's @@ -70,10 +70,10 @@ impl DestinationGenerator for NetworkGraphView { loop { let index = self.node_picker.sample(&mut rng); // Unwrapping is safe given `NetworkGraphView` has the same amount of elements for `nodes` and `node_picker` - let destination = self.nodes.get(index).unwrap(); + let (node_info, capacity) = self.nodes.get(index).unwrap(); - if destination.0.pubkey != source { - return destination.clone(); + if node_info.pubkey != source { + return (node_info.clone(), Some(*capacity)); } if i % 50 == 0 { @@ -207,7 +207,14 @@ impl PaymentGenerator for RandomPaymentActivity { /// capacity. While the expected value of payments remains the same, scaling variance by node capacity means that /// nodes with more deployed capital will see a larger range of payment values than those with smaller total /// channel capacity. - fn payment_amount(&self, destination_capacity: u64) -> Result { + fn payment_amount( + &self, + destination_capacity: Option, + ) -> Result { + let destination_capacity = destination_capacity.ok_or(PaymentGenerationError( + "destination amount required for payment activity generator".to_string(), + ))?; + let payment_limit = std::cmp::min(self.source_capacity, destination_capacity) / 2; let ln_pmt_amt = (self.expected_payment_amt as f64).ln(); @@ -405,20 +412,25 @@ mod tests { // Wrong cases for i in 0..source_capacity { assert!(matches!( - pag.payment_amount(i), + pag.payment_amount(Some(i)), Err(PaymentGenerationError(..)) )) } // All other cases will work. We are not going to exhaustively test for the rest up to u64::MAX, let just pick a bunch for i in source_capacity + 1..100 * source_capacity { - assert!(pag.payment_amount(i).is_ok()) + assert!(pag.payment_amount(Some(i)).is_ok()) } // We can even try really high numbers to make sure they are not troublesome for i in u64::MAX - 10000..u64::MAX { - assert!(pag.payment_amount(i).is_ok()) + assert!(pag.payment_amount(Some(i)).is_ok()) } + + assert!(matches!( + pag.payment_amount(None), + Err(PaymentGenerationError(..)) + )); } } } From 156579320309fc1409648e81062aabacfbec8339 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Fri, 17 Nov 2023 16:03:47 -0500 Subject: [PATCH 05/12] sim-lib/refactor: move create nodes function into test utils --- sim-lib/src/random_activity.rs | 19 +------------------ sim-lib/src/test_utils.rs | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/sim-lib/src/random_activity.rs b/sim-lib/src/random_activity.rs index 12a6c4a7..09f6d7f8 100644 --- a/sim-lib/src/random_activity.rs +++ b/sim-lib/src/random_activity.rs @@ -259,27 +259,10 @@ impl Display for RandomPaymentActivity { #[cfg(test)] mod tests { mod test_network_graph_view { - use lightning::ln::features::Features; use ntest::timeout; use super::super::*; - use crate::test_utils::get_random_keypair; - use crate::NodeInfo; - - fn create_nodes(n: usize, node_capacity: u64) -> Vec<(NodeInfo, u64)> { - (1..=n) - .map(|_| { - ( - NodeInfo { - pubkey: get_random_keypair().1, - alias: String::new(), - features: Features::empty(), - }, - node_capacity, - ) - }) - .collect() - } + use crate::test_utils::create_nodes; #[test] fn test_new() { diff --git a/sim-lib/src/test_utils.rs b/sim-lib/src/test_utils.rs index 876b33c8..8d3505bc 100644 --- a/sim-lib/src/test_utils.rs +++ b/sim-lib/src/test_utils.rs @@ -1,8 +1,11 @@ +use lightning::ln::features::Features; use rand::distributions::Uniform; use rand::Rng; use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; +use crate::NodeInfo; + /// Utility function to create a vector of pseudo random bytes. /// /// Mainly used for testing purposes. @@ -26,3 +29,19 @@ pub fn get_random_keypair() -> (SecretKey, PublicKey) { } } } + +/// Creates n nodes with the capacity specified. +pub fn create_nodes(n: usize, node_capacity: u64) -> Vec<(NodeInfo, u64)> { + (1..=n) + .map(|_| { + ( + NodeInfo { + pubkey: get_random_keypair().1, + alias: String::new(), + features: Features::empty(), + }, + node_capacity, + ) + }) + .collect() +} From 9eaf42080482eb9ad70b33eb48e56883af786888 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Wed, 18 Oct 2023 14:04:39 -0400 Subject: [PATCH 06/12] sim-lib: add generator implementations for defined activities --- sim-lib/src/defined_activity.rs | 85 +++++++++++++++++++++++++++++++++ sim-lib/src/lib.rs | 1 + 2 files changed, 86 insertions(+) create mode 100644 sim-lib/src/defined_activity.rs diff --git a/sim-lib/src/defined_activity.rs b/sim-lib/src/defined_activity.rs new file mode 100644 index 00000000..98bbe07e --- /dev/null +++ b/sim-lib/src/defined_activity.rs @@ -0,0 +1,85 @@ +use crate::{DestinationGenerator, NodeInfo, PaymentGenerationError, PaymentGenerator}; +use std::fmt; +use tokio::time::Duration; + +#[derive(Clone)] +pub struct DefinedPaymentActivity { + destination: NodeInfo, + wait: Duration, + amount: u64, +} + +impl DefinedPaymentActivity { + pub fn new(destination: NodeInfo, wait: Duration, amount: u64) -> Self { + DefinedPaymentActivity { + destination, + wait, + amount, + } + } +} + +impl fmt::Display for DefinedPaymentActivity { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "static payment of {} to {} every {:?}", + self.amount, self.destination, self.wait + ) + } +} + +impl DestinationGenerator for DefinedPaymentActivity { + fn choose_destination(&self, _: bitcoin::secp256k1::PublicKey) -> (NodeInfo, Option) { + (self.destination.clone(), None) + } +} + +impl PaymentGenerator for DefinedPaymentActivity { + fn next_payment_wait(&self) -> Duration { + self.wait + } + + fn payment_amount( + &self, + destination_capacity: Option, + ) -> Result { + if destination_capacity.is_some() { + Err(PaymentGenerationError( + "destination amount must not be set for defined activity generator".to_string(), + )) + } else { + Ok(self.amount) + } + } +} + +#[cfg(test)] +mod tests { + use super::DefinedPaymentActivity; + use crate::test_utils::{create_nodes, get_random_keypair}; + use crate::{DestinationGenerator, PaymentGenerationError, PaymentGenerator}; + use std::time::Duration; + + #[test] + fn test_defined_activity_generator() { + let node = create_nodes(1, 100000); + let node = &node.first().unwrap().0; + + let source = get_random_keypair(); + let payment_amt = 50; + + let generator = + DefinedPaymentActivity::new(node.clone(), Duration::from_secs(60), payment_amt); + + let (dest, dest_capacity) = generator.choose_destination(source.1); + assert_eq!(node.pubkey, dest.pubkey); + assert!(dest_capacity.is_none()); + + assert_eq!(payment_amt, generator.payment_amount(None).unwrap()); + assert!(matches!( + generator.payment_amount(Some(10)), + Err(PaymentGenerationError(..)) + )); + } +} diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 8d9f1909..ab01dfc6 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -22,6 +22,7 @@ use triggered::{Listener, Trigger}; use self::random_activity::{NetworkGraphView, RandomPaymentActivity}; pub mod cln; +mod defined_activity; pub mod lnd; mod random_activity; mod serializers; From ac54d1cb395bea0ac9b4be9890af55e2b10477ca Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Wed, 18 Oct 2023 14:20:20 -0400 Subject: [PATCH 07/12] sim-lib: unify dispatch of events across random and defined activity --- sim-lib/src/lib.rs | 65 ++++++++++------------------------------------ 1 file changed, 13 insertions(+), 52 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index ab01dfc6..56d92495 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -19,6 +19,7 @@ use tokio::task::JoinSet; use tokio::{select, time, time::Duration}; use triggered::{Listener, Trigger}; +use self::defined_activity::DefinedPaymentActivity; use self::random_activity::{NetworkGraphView, RandomPaymentActivity}; pub mod cln; @@ -647,8 +648,16 @@ impl Simulation { ) { for description in self.activity.iter() { let sender_chan = producer_channels.get(&description.source.pubkey).unwrap(); + let generator = DefinedPaymentActivity::new( + description.destination.clone(), + Duration::from_secs(description.interval_secs.into()), + description.amount_msat, + ); + tasks.spawn(produce_events( - description.clone(), + description.source.clone(), + Arc::new(Mutex::new(generator.clone())), + generator, sender_chan.clone(), self.shutdown_trigger.clone(), self.shutdown_listener.clone(), @@ -694,7 +703,7 @@ impl Simulation { ) .map_err(SimulationError::RandomActivityError)?; - tasks.spawn(produce_random_events( + tasks.spawn(produce_events( info, network_generator.clone(), node_generator, @@ -781,55 +790,7 @@ async fn consume_events( // produce events generates events for the activity description provided. It accepts a shutdown listener so it can // exit if other threads signal that they have errored out. -async fn produce_events( - act: ActivityDefinition, - sender: Sender, - shutdown: Trigger, - listener: Listener, -) { - let interval = time::Duration::from_secs(act.interval_secs as u64); - - log::debug!( - "Started producer for {} every {}s: {} -> {}.", - act.amount_msat, - act.interval_secs, - act.source, - act.destination - ); - - loop { - tokio::select! { - biased; - _ = time::sleep(interval) => { - // Consumer was dropped - if sender.send(SimulationEvent::SendPayment(act.destination.clone(), act.amount_msat)).await.is_err() { - log::debug!( - "Stopped producer for {}: {} -> {}. Consumer cannot be reached.", - act.amount_msat, - act.source, - act.destination - ); - break; - } - } - _ = listener.clone() => { - // Shutdown was signaled - log::debug!( - "Stopped producer for {}: {} -> {}. Received shutdown signal.", - act.amount_msat, - act.source, - act.destination - ); - break; - } - } - } - - // On exit call our shutdown trigger to inform other threads that we have exited, and they need to shut down. - shutdown.trigger(); -} - -async fn produce_random_events( +async fn produce_events( source: NodeInfo, network_generator: Arc>, node_generator: A, @@ -837,7 +798,7 @@ async fn produce_random_events Date: Fri, 17 Nov 2023 15:15:50 -0500 Subject: [PATCH 08/12] sim-lib/refactor: fix naming of consumer channels for clarity --- sim-lib/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 56d92495..2c4376f8 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -487,14 +487,14 @@ impl Simulation { random_activity_nodes.keys().cloned().collect() }; - let producer_senders = + let consumer_channels = self.dispatch_consumers(collecting_nodes, event_sender.clone(), &mut tasks); // Next, we'll spin up our actual activity generator that will be responsible for triggering the activity that // has been configured (if any), passing in the channel that is used to notify data collection that events have // been generated. Alternatively, we'll generate random activity if there is no activity specified. if !self.activity.is_empty() { - self.dispatch_activity_producers(producer_senders, &mut tasks) + self.dispatch_activity_producers(consumer_channels, &mut tasks) .await; } else { log::info!( @@ -502,7 +502,7 @@ impl Simulation { self.activity_multiplier, self.expected_payment_msat ); - self.dispatch_random_producers(random_activity_nodes, producer_senders, &mut tasks) + self.dispatch_random_producers(random_activity_nodes, consumer_channels, &mut tasks) .await?; } @@ -670,7 +670,7 @@ impl Simulation { async fn dispatch_random_producers( &self, node_capacities: HashMap, - producer_channels: HashMap>, + consumer_channels: HashMap>, tasks: &mut JoinSet<()>, ) -> Result<(), SimulationError> { let network_generator = Arc::new(Mutex::new( @@ -683,7 +683,7 @@ impl Simulation { network_generator.lock().await ); - for (pk, sender) in producer_channels.into_iter() { + for (pk, sender) in consumer_channels.into_iter() { let (info, source_capacity) = match node_capacities.get(&pk) { Some((info, capacity)) => (info.clone(), *capacity), None => { From 01c31ec05ff943cbfb70a5de0f674ff2e04e8b18 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Fri, 17 Nov 2023 10:46:25 -0500 Subject: [PATCH 09/12] sim-lib/refactor: use single dispatch for random and define activities --- sim-lib/src/lib.rs | 213 ++++++++++++++++++++++----------------------- 1 file changed, 106 insertions(+), 107 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 2c4376f8..bf301ca5 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -203,7 +203,7 @@ pub trait LightningNode { async fn list_channels(&mut self) -> Result, LightningError>; } -pub trait DestinationGenerator { +pub trait DestinationGenerator: Send { // choose_destination picks a destination node within the network, returning the node's information and its // capacity (if available). fn choose_destination(&self, source: PublicKey) -> (NodeInfo, Option); @@ -212,7 +212,8 @@ pub trait DestinationGenerator { #[derive(Debug, Error)] #[error("Payment generation error: {0}")] pub struct PaymentGenerationError(String); -pub trait PaymentGenerator { + +pub trait PaymentGenerator: Display + Send { // Returns the number of seconds that a node should wait until firing its next payment. fn next_payment_wait(&self) -> time::Duration; @@ -348,6 +349,16 @@ pub struct WriteResults { /// The number of activity results to batch before printing in CSV. pub batch_size: u32, } +/// +/// ExecutorKit contains the components required to spin up an activity configured by the user, to be used to +/// spin up the appropriate producers and consumers for the activity. +struct ExecutorKit { + source_info: NodeInfo, + /// We use an arc mutex here because some implementations of the trait will be very expensive to clone. + /// See [NetworkGraphView] for details. + network_generator: Arc>, + payment_generator: Box, +} impl Simulation { pub fn new( @@ -472,39 +483,20 @@ impl Simulation { let (event_sender, event_receiver) = channel(1); self.run_data_collection(event_receiver, &mut tasks); - // Create consumers for every source node when dealing with activity descriptions, or only for nodes with - // sufficient capacity if generating random activity. Since we have to query the capacity of every node - // in our network for picking random activity nodes, we cache this value here to be used later when we spin - // up producers. - let mut random_activity_nodes = HashMap::new(); - let collecting_nodes = if !self.activity.is_empty() { - self.activity + // Get an execution kit per activity that we need to generate and spin up consumers for each source node. + let activities = self.activity_executors().await?; + let consumer_channels = self.dispatch_consumers( + activities .iter() - .map(|activity| activity.source.pubkey) - .collect() - } else { - random_activity_nodes.extend(self.random_activity_nodes().await?); - random_activity_nodes.keys().cloned().collect() - }; + .map(|generator| generator.source_info.pubkey) + .collect(), + event_sender.clone(), + &mut tasks, + ); - let consumer_channels = - self.dispatch_consumers(collecting_nodes, event_sender.clone(), &mut tasks); - - // Next, we'll spin up our actual activity generator that will be responsible for triggering the activity that - // has been configured (if any), passing in the channel that is used to notify data collection that events have - // been generated. Alternatively, we'll generate random activity if there is no activity specified. - if !self.activity.is_empty() { - self.dispatch_activity_producers(consumer_channels, &mut tasks) - .await; - } else { - log::info!( - "Generating random activity with multiplier: {}, average payment amount: {}.", - self.activity_multiplier, - self.expected_payment_msat - ); - self.dispatch_random_producers(random_activity_nodes, consumer_channels, &mut tasks) - .await?; - } + // Next, we'll spin up our actual producers that will be responsible for triggering the configured activity. + self.dispatch_producers(activities, consumer_channels, &mut tasks) + .await?; if let Some(total_time) = self.total_time { let t = self.shutdown_trigger.clone(); @@ -576,14 +568,45 @@ impl Simulation { log::debug!("Simulator data collection set up."); } + async fn activity_executors(&self) -> Result, SimulationError> { + let mut generators = Vec::new(); + + // Note: when we allow configuring both defined and random activity, this will no longer be an if/else, we'll + // just populate with each type as configured. + if !self.activity.is_empty() { + for description in self.activity.iter() { + let activity_generator = DefinedPaymentActivity::new( + description.destination.clone(), + Duration::from_secs(description.interval_secs.into()), + description.amount_msat, + ); + + generators.push(ExecutorKit { + source_info: description.source.clone(), + // Defined activities have very simple generators, so the traits required are implemented on + // a single struct which we just cheaply clone. + network_generator: Arc::new(Mutex::new(activity_generator.clone())), + payment_generator: Box::new(activity_generator), + }); + } + } else { + generators = self.random_activity_nodes().await?; + } + + Ok(generators) + } + /// Returns the list of nodes that are eligible for generating random activity on. This is the subset of nodes /// that have sufficient capacity to generate payments of our expected payment amount. - async fn random_activity_nodes( - &self, - ) -> Result, SimulationError> { + async fn random_activity_nodes(&self) -> Result, SimulationError> { // Collect capacity of each node from its view of its own channels. Total capacity is divided by two to // avoid double counting capacity (as each node has a counterparty in the channel). - let mut node_capacities = HashMap::new(); + let mut generators = Vec::new(); + let mut active_nodes = HashMap::new(); + + // Do a first pass to get the capacity of each node which we need to be able to create a network generator. + // While we're at it, we get the node info and store it with capacity to create activity generators in our + // second pass. for (pk, node) in self.nodes.iter() { let chan_capacity = node.lock().await.list_channels().await?.iter().sum::(); @@ -594,16 +617,39 @@ impl Simulation { continue; } - node_capacities.insert( - *pk, - ( - node.lock().await.get_node_info(pk).await?, - chan_capacity / 2, + // Don't double count channel capacity because each channel reports the total balance between counter + // parities. Track capacity separately to be used for our network generator. + let capacity = chan_capacity / 2; + let node_info = node.lock().await.get_node_info(pk).await?; + active_nodes.insert(node_info.pubkey, (node_info, capacity)); + } + + let network_generator = Arc::new(Mutex::new( + NetworkGraphView::new(active_nodes.values().cloned().collect()) + .map_err(SimulationError::RandomActivityError)?, + )); + + log::info!( + "Created network generator: {}.", + network_generator.lock().await + ); + + for (node_info, capacity) in active_nodes.values() { + generators.push(ExecutorKit { + source_info: node_info.clone(), + network_generator: network_generator.clone(), + payment_generator: Box::new( + RandomPaymentActivity::new( + *capacity, + self.expected_payment_msat, + self.activity_multiplier, + ) + .map_err(SimulationError::RandomActivityError)?, ), - ); + }); } - Ok(node_capacities) + Ok(generators) } /// Responsible for spinning up consumer tasks for each node specified in consuming_nodes. Assumes that validation @@ -640,73 +686,26 @@ impl Simulation { channels } - /// Responsible for spinning up producers for a set of activity descriptions. - async fn dispatch_activity_producers( + /// Responsible for spinning up producers for a set of activities. Requires that a consumer channel is present + /// for every source node in the set of executors. + async fn dispatch_producers( &self, + executors: Vec, producer_channels: HashMap>, tasks: &mut JoinSet<()>, - ) { - for description in self.activity.iter() { - let sender_chan = producer_channels.get(&description.source.pubkey).unwrap(); - let generator = DefinedPaymentActivity::new( - description.destination.clone(), - Duration::from_secs(description.interval_secs.into()), - description.amount_msat, - ); - - tasks.spawn(produce_events( - description.source.clone(), - Arc::new(Mutex::new(generator.clone())), - generator, - sender_chan.clone(), - self.shutdown_trigger.clone(), - self.shutdown_listener.clone(), - )); - } - } - - /// Responsible for spinning up producers for a set of activity descriptions. Requires that node capacities are - /// provided for each node represented in producer channels. - async fn dispatch_random_producers( - &self, - node_capacities: HashMap, - consumer_channels: HashMap>, - tasks: &mut JoinSet<()>, ) -> Result<(), SimulationError> { - let network_generator = Arc::new(Mutex::new( - NetworkGraphView::new(node_capacities.values().cloned().collect()) - .map_err(SimulationError::RandomActivityError)?, - )); - - log::info!( - "Created network generator: {}.", - network_generator.lock().await - ); - - for (pk, sender) in consumer_channels.into_iter() { - let (info, source_capacity) = match node_capacities.get(&pk) { - Some((info, capacity)) => (info.clone(), *capacity), - None => { - return Err(SimulationError::RandomActivityError( - RandomActivityError::ValueError(format!( - "Random activity generator run for: {} with unknown capacity.", - pk - )), - )); - } - }; - - let node_generator = RandomPaymentActivity::new( - source_capacity, - self.expected_payment_msat, - self.activity_multiplier, - ) - .map_err(SimulationError::RandomActivityError)?; + for executor in executors { + let sender = producer_channels.get(&executor.source_info.pubkey).ok_or( + SimulationError::RandomActivityError(RandomActivityError::ValueError(format!( + "Activity producer for: {} not found.", + executor.source_info.pubkey, + ))), + )?; tasks.spawn(produce_events( - info, - network_generator.clone(), - node_generator, + executor.source_info, + executor.network_generator, + executor.payment_generator, sender.clone(), self.shutdown_trigger.clone(), self.shutdown_listener.clone(), @@ -790,10 +789,10 @@ async fn consume_events( // produce events generates events for the activity description provided. It accepts a shutdown listener so it can // exit if other threads signal that they have errored out. -async fn produce_events( +async fn produce_events( source: NodeInfo, network_generator: Arc>, - node_generator: A, + node_generator: Box, sender: Sender, shutdown: Trigger, listener: Listener, From c4a6fa9bb71b1a0c964e501531c09c85cb231177 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Tue, 30 Jan 2024 09:30:24 -0500 Subject: [PATCH 10/12] multi: include Send in LightningNode trait as supertrait --- sim-cli/src/main.rs | 4 ++-- sim-lib/src/lib.rs | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sim-cli/src/main.rs b/sim-cli/src/main.rs index ac923ae9..545a5104 100644 --- a/sim-cli/src/main.rs +++ b/sim-cli/src/main.rs @@ -92,7 +92,7 @@ async fn main() -> anyhow::Result<()> { serde_json::from_str(&std::fs::read_to_string(sim_path)?) .map_err(|e| anyhow!("Could not deserialize node connection data or activity description from simulation file (line {}, col {}).", e.line(), e.column()))?; - let mut clients: HashMap>> = HashMap::new(); + let mut clients: HashMap>> = HashMap::new(); let mut pk_node_map = HashMap::new(); let mut alias_node_map = HashMap::new(); @@ -100,7 +100,7 @@ async fn main() -> anyhow::Result<()> { // TODO: Feels like there should be a better way of doing this without having to Arc>> it at this time. // Box sort of works, but we won't know the size of the dyn LightningNode at compile time so the compiler will // scream at us when trying to create the Arc> later on while adding the node to the clients map - let node: Arc> = match connection { + let node: Arc> = match connection { NodeConnection::LND(c) => Arc::new(Mutex::new(LndNode::new(c).await?)), NodeConnection::CLN(c) => Arc::new(Mutex::new(ClnNode::new(c).await?)), }; diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index bf301ca5..09efd216 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -179,7 +179,7 @@ impl Display for NodeInfo { /// LightningNode represents the functionality that is required to execute events on a lightning node. #[async_trait] -pub trait LightningNode { +pub trait LightningNode: Send { /// Get information about the node. fn get_info(&self) -> &NodeInfo; /// Get the network this node is running at @@ -325,7 +325,7 @@ enum SimulationOutput { #[derive(Clone)] pub struct Simulation { // The lightning node that is being simulated. - nodes: HashMap>>, + nodes: HashMap>>, // The activity that are to be executed on the node. activity: Vec, // High level triggers used to manage simulation tasks and shutdown. @@ -362,7 +362,7 @@ struct ExecutorKit { impl Simulation { pub fn new( - nodes: HashMap>>, + nodes: HashMap>>, activity: Vec, total_time: Option, expected_payment_msat: u64, @@ -722,7 +722,7 @@ impl Simulation { // expect the senders corresponding to our receiver to be dropped, which will cause the receiver to error out and // exit. async fn consume_events( - node: Arc>, + node: Arc>, mut receiver: Receiver, sender: Sender, shutdown: Trigger, @@ -986,7 +986,7 @@ async fn run_results_logger( /// out. In the multiple-producer case, a single producer shutting down does not drop *all* sending channels so the /// consumer will not exit and a trigger is required. async fn produce_simulation_results( - nodes: HashMap>>, + nodes: HashMap>>, mut output_receiver: Receiver, results: Sender<(Payment, PaymentResult)>, shutdown: Listener, @@ -1034,7 +1034,7 @@ async fn produce_simulation_results( } async fn track_payment_result( - node: Arc>, + node: Arc>, results: Sender<(Payment, PaymentResult)>, payment: Payment, shutdown: Listener, From 594484773b949fbeed1c432a0e09350da3755afb Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Tue, 30 Jan 2024 09:30:45 -0500 Subject: [PATCH 11/12] multi: once off documentation comment sweep --- sim-lib/src/lib.rs | 53 +++++++++++++++++----------------- sim-lib/src/random_activity.rs | 6 ++-- 2 files changed, 29 insertions(+), 30 deletions(-) diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 09efd216..fd739db3 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -95,15 +95,15 @@ pub struct SimParams { /// [NodeId], which enables the use of public keys and aliases in the simulation description. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ActivityParser { - // The source of the payment. + /// The source of the payment. #[serde(with = "serializers::serde_node_id")] pub source: NodeId, - // The destination of the payment. + /// The destination of the payment. #[serde(with = "serializers::serde_node_id")] pub destination: NodeId, - // The interval of the event, as in every how many seconds the payment is performed. + /// The interval of the event, as in every how many seconds the payment is performed. pub interval_secs: u16, - // The amount of m_sat to used in this payment. + /// The amount of m_sat to used in this payment. pub amount_msat: u64, } @@ -111,13 +111,13 @@ pub struct ActivityParser { /// This is constructed during activity validation and passed along to the [Simulation]. #[derive(Debug, Clone)] pub struct ActivityDefinition { - // The source of the payment. + /// The source of the payment. pub source: NodeInfo, - // The destination of the payment. + /// The destination of the payment. pub destination: NodeInfo, - // The interval of the event, as in every how many seconds the payment is performed. + /// The interval of the event, as in every how many seconds the payment is performed. pub interval_secs: u16, - // The amount of m_sat to used in this payment. + /// The amount of m_sat to used in this payment. pub amount_msat: u64, } @@ -135,7 +135,6 @@ pub enum SimulationError { RandomActivityError(RandomActivityError), } -// Phase 2: Event Queue #[derive(Debug, Error)] pub enum LightningError { #[error("Node connection error: {0}")] @@ -204,8 +203,8 @@ pub trait LightningNode: Send { } pub trait DestinationGenerator: Send { - // choose_destination picks a destination node within the network, returning the node's information and its - // capacity (if available). + /// choose_destination picks a destination node within the network, returning the node's information and its + /// capacity (if available). fn choose_destination(&self, source: PublicKey) -> (NodeInfo, Option); } @@ -214,10 +213,10 @@ pub trait DestinationGenerator: Send { pub struct PaymentGenerationError(String); pub trait PaymentGenerator: Display + Send { - // Returns the number of seconds that a node should wait until firing its next payment. + /// Returns the number of seconds that a node should wait until firing its next payment. fn next_payment_wait(&self) -> time::Duration; - // Returns a payment amount based, with a destination capacity optionally provided to inform the amount picked. + /// Returns a payment amount based, with a destination capacity optionally provided to inform the amount picked. fn payment_amount( &self, destination_capacity: Option, @@ -324,14 +323,14 @@ enum SimulationOutput { #[derive(Clone)] pub struct Simulation { - // The lightning node that is being simulated. + /// The lightning node that is being simulated. nodes: HashMap>>, - // The activity that are to be executed on the node. + /// The activity that are to be executed on the node. activity: Vec, - // High level triggers used to manage simulation tasks and shutdown. + /// High level triggers used to manage simulation tasks and shutdown. shutdown_trigger: Trigger, shutdown_listener: Listener, - // Total simulation time. The simulation will run forever if undefined. + /// Total simulation time. The simulation will run forever if undefined. total_time: Option, /// The expected payment size for the network. expected_payment_msat: u64, @@ -425,7 +424,7 @@ impl Simulation { Ok(()) } - // validates that the nodes are all on the same network and ensures that we're not running on mainnet. + /// validates that the nodes are all on the same network and ensures that we're not running on mainnet. async fn validate_node_network(&self) -> Result<(), LightningError> { if self.nodes.is_empty() { return Err(LightningError::ValidationError( @@ -531,8 +530,8 @@ impl Simulation { self.shutdown_trigger.trigger() } - // run_data_collection starts the tasks required for the simulation to report of the results of the activity that - // it generates. The simulation should report outputs via the receiver that is passed in. + /// run_data_collection starts the tasks required for the simulation to report of the results of the activity that + /// it generates. The simulation should report outputs via the receiver that is passed in. fn run_data_collection( &self, output_receiver: Receiver, @@ -716,11 +715,11 @@ impl Simulation { } } -// consume_events processes events that are crated for a lightning node that we can execute events on. Any output -// that is generated from the event being executed is piped into a channel to handle the result of the event. If it -// exits, it will use the trigger provided to trigger shutdown in other threads. If an error occurs elsewhere, we -// expect the senders corresponding to our receiver to be dropped, which will cause the receiver to error out and -// exit. +/// events that are crated for a lightning node that we can execute events on. Any output that is generated from the +/// event being executed is piped into a channel to handle the result of the event. If it exits, it will use the +/// trigger provided to trigger shutdown in other threads. If an error occurs elsewhere, we expect the senders +/// corresponding to our receiver to be dropped, which will cause the receiver to error out and +/// exit. async fn consume_events( node: Arc>, mut receiver: Receiver, @@ -787,8 +786,8 @@ async fn consume_events( } } -// produce events generates events for the activity description provided. It accepts a shutdown listener so it can -// exit if other threads signal that they have errored out. +/// produce events generates events for the activity description provided. It accepts a shutdown listener so it can +/// exit if other threads signal that they have errored out. async fn produce_events( source: NodeInfo, network_generator: Arc>, diff --git a/sim-lib/src/random_activity.rs b/sim-lib/src/random_activity.rs index 09f6d7f8..f6917d4d 100644 --- a/sim-lib/src/random_activity.rs +++ b/sim-lib/src/random_activity.rs @@ -29,9 +29,9 @@ pub struct NetworkGraphView { } impl NetworkGraphView { - // Creates a network view for the map of node public keys to capacity (in millisatoshis) provided. Returns an error - // if any node's capacity is zero (the node cannot receive), or there are not at least two nodes (one node can't - // send to itself). + /// Creates a network view for the map of node public keys to capacity (in millisatoshis) provided. Returns an error + /// if any node's capacity is zero (the node cannot receive), or there are not at least two nodes (one node can't + /// send to itself). pub fn new(nodes: Vec<(NodeInfo, u64)>) -> Result { if nodes.len() < 2 { return Err(RandomActivityError::ValueError( From 75ac4377aea642c4cd6e41f122545763a87cd4f0 Mon Sep 17 00:00:00 2001 From: Carla Kirk-Cohen Date: Tue, 30 Jan 2024 09:06:06 -0500 Subject: [PATCH 12/12] gitingore: ignore any json file to accommodate different sim file names --- .gitignore | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 5f974d65..a12a87b6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,6 @@ /target node_modules -config.json -sim.json -package-lock.json +*.json activity-generator/releases/* .DS_Store /results \ No newline at end of file