diff --git a/README.md b/README.md index ebf9a7c6..d8a8b08f 100644 --- a/README.md +++ b/README.md @@ -153,12 +153,15 @@ Optional fields: ``` "start_secs": the time to start sending payments "count": the total number of payments to send +"activity_name": an optional name for each activity description, used as the logging prefix to relate logs to their corresponding activity ``` > If `start_secs` is not provided the payments will begin as soon as the simulation starts (default=0) > If `count` is not provided the payments will continue for as long as the simulation runs (default=None) +> If `activity_name` is not provided the index of each activity description will be used as the logging prefix + The example simulation file below sets up the following simulation: * Connect to `Alice` running LND to generate activity. * Connect to `Bob` running CLN to generate activity. @@ -200,7 +203,8 @@ The example simulation file below sets up the following simulation: "source": "Alice", "destination": "02d804ad31429c8cc29e20ec43b4129553eb97623801e534ab5a66cdcd2149dbed", "interval_secs": 1, - "amount_msat": 2000 + "amount_msat": 2000, + "activity_name": "Daily Contribution" }, { "source": "0230a16a05c5ca120136b3a770a2adfdad88a68d526e63448a9eef88bddd6a30d8", diff --git a/sim-cli/src/main.rs b/sim-cli/src/main.rs index f88aa2a1..215296ab 100644 --- a/sim-cli/src/main.rs +++ b/sim-cli/src/main.rs @@ -1,5 +1,5 @@ use bitcoin::secp256k1::PublicKey; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::Mutex; @@ -95,6 +95,7 @@ async fn main() -> anyhow::Result<()> { let mut clients: HashMap>> = HashMap::new(); let mut pk_node_map = HashMap::new(); let mut alias_node_map = HashMap::new(); + let mut activity_name_map = HashSet::new(); for connection in nodes { // TODO: Feels like there should be a better way of doing this without having to Arc>> it at this time. @@ -182,6 +183,17 @@ async fn main() -> anyhow::Result<()> { }, }; + if act.activity_name.is_some() { + if activity_name_map.contains(&act.activity_name) { + anyhow::bail!(LightningError::ValidationError(format!( + "Duplicate activity name {:?} is not allowed.", + act.activity_name.unwrap() + ))); + } + + activity_name_map.insert(act.activity_name.clone()); + } + validated_activities.push(ActivityDefinition { source, destination, @@ -189,6 +201,7 @@ async fn main() -> anyhow::Result<()> { count: act.count, interval_secs: act.interval_secs, amount_msat: act.amount_msat, + activity_name: act.activity_name, }); } diff --git a/sim-lib/src/lib.rs b/sim-lib/src/lib.rs index 716682ec..4d6d408c 100644 --- a/sim-lib/src/lib.rs +++ b/sim-lib/src/lib.rs @@ -193,6 +193,8 @@ pub struct ActivityParser { /// The amount of m_sat to used in this payment. #[serde(with = "serializers::serde_value_or_range")] pub amount_msat: Amount, + /// An optional name for the activity. + pub activity_name: Option, } /// Data structure used internally by the simulator. Both source and destination are represented as [PublicKey] here. @@ -211,6 +213,8 @@ pub struct ActivityDefinition { pub interval_secs: Interval, /// The amount of m_sat to used in this payment. pub amount_msat: Amount, + /// An optional name for the activity. + pub activity_name: Option, } #[derive(Debug, Error)] @@ -380,7 +384,7 @@ pub enum PaymentOutcome { } /// Describes a payment from a source node to a destination node. -#[derive(Debug, Clone, Copy, Serialize)] +#[derive(Debug, Clone, Serialize)] struct Payment { /// Pubkey of the source node dispatching the payment. source: PublicKey, @@ -471,6 +475,7 @@ struct ExecutorKit { /// See [NetworkGraphView] for details. network_generator: Arc>, payment_generator: Box, + activity_name: String, } impl Simulation { @@ -770,7 +775,12 @@ impl Simulation { // Note: when we allow configuring both defined and random activity, this will no longer be an if/else, we'll // just populate with each type as configured. if !self.activity.is_empty() { - for description in self.activity.iter() { + for (index, description) in self.activity.iter().enumerate() { + let activity_name = match &description.activity_name { + Some(name) => name.clone(), + None => format!("Index {}", index), + }; + let activity_generator = DefinedPaymentActivity::new( description.destination.clone(), Duration::from_secs(description.start_secs.into()), @@ -785,6 +795,7 @@ impl Simulation { // a single struct which we just cheaply clone. network_generator: Arc::new(Mutex::new(activity_generator.clone())), payment_generator: Box::new(activity_generator), + activity_name, }); } } else { @@ -832,10 +843,11 @@ impl Simulation { network_generator.lock().await ); - for (node_info, capacity) in active_nodes.values() { + for (i, (node_info, capacity)) in active_nodes.values().enumerate() { generators.push(ExecutorKit { source_info: node_info.clone(), network_generator: network_generator.clone(), + activity_name: format!("Random index {i}"), payment_generator: Box::new( RandomPaymentActivity::new( *capacity, @@ -919,13 +931,15 @@ impl Simulation { let source = executor.source_info.clone(); log::info!( - "Starting activity producer for {}: {}.", + "{} activity: Starting activity producer for {}: {}.", + executor.activity_name, source, executor.payment_generator ); if let Err(e) = produce_events( executor.source_info, + executor.activity_name, executor.network_generator, executor.payment_generator, pe_sender, @@ -1021,6 +1035,7 @@ async fn consume_events( /// exit if other threads signal that they have errored out. async fn produce_events( source: NodeInfo, + activity_name: String, network_generator: Arc>, node_generator: Box, sender: Sender, @@ -1070,7 +1085,7 @@ async fn produce_events { if amt == 0 { - log::debug!("Skipping zero amount payment for {source} -> {destination}."); + log::debug!("{activity_name} activity : Skipping zero amount payment for {source} -> {destination}."); continue; } amt @@ -1083,7 +1098,7 @@ async fn produce_events {}: {amount} msat.", destination); // Send the payment, exiting if we can no longer send to the consumer. - let event = SimulationEvent::SendPayment(destination.clone(), amount); + let event = SimulationEvent::SendPayment(destination.clone(), amount,); if sender.send(event.clone()).await.is_err() { return Err(SimulationError::MpscChannelError (format!("Stopped activity producer for {amount}: {source} -> {destination}."))); } @@ -1241,14 +1256,14 @@ async fn produce_simulation_results( SimulationOutput::SendPaymentSuccess(payment) => { if let Some(source_node) = nodes.get(&payment.source) { set.spawn(track_payment_result( - source_node.clone(), results.clone(), payment, listener.clone() + source_node.clone(), results.clone(), payment.clone(), listener.clone() )); } else { break Err(SimulationError::MissingNodeError(format!("Source node with public key: {} unavailable.", payment.source))); } }, SimulationOutput::SendPaymentFailure(payment, result) => { - if results.send((payment, result.clone())).await.is_err() { + if results.send((payment.clone(), result.clone())).await.is_err() { break Err(SimulationError::MpscChannelError( format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.", payment.hash, payment.dispatch_time), )); @@ -1316,7 +1331,7 @@ async fn track_payment_result( _ = listener.clone() => { log::debug!("Track payment result received a shutdown signal."); }, - send_payment_result = results.send((payment, res.clone())) => { + send_payment_result = results.send((payment.clone(), res.clone())) => { if send_payment_result.is_err() { return Err(SimulationError::MpscChannelError(format!("Failed to send payment result {res} for payment {payment}."))) }