Skip to content

Commit

Permalink
feat: add optional name for activity descriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
Ifeanyichukwu committed May 14, 2024
1 parent 5838d4a commit 5307a4b
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 26 deletions.
15 changes: 14 additions & 1 deletion sim-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bitcoin::secp256k1::PublicKey;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -95,6 +95,7 @@ async fn main() -> anyhow::Result<()> {
let mut clients: HashMap<PublicKey, Arc<Mutex<dyn LightningNode>>> = HashMap::new();
let mut pk_node_map = HashMap::new();
let mut alias_node_map = HashMap::new();
let mut activity_name_map = HashSet::new();

for connection in nodes {
// TODO: Feels like there should be a better way of doing this without having to Arc<Mutex<T>>> it at this time.
Expand Down Expand Up @@ -182,13 +183,25 @@ async fn main() -> anyhow::Result<()> {
},
};

if act.activity_name.is_some() {
if activity_name_map.contains(&act.activity_name) {
anyhow::bail!(LightningError::ValidationError(format!(
"Duplicate activity name {:?} is not allowed.",
act.activity_name.unwrap()
)));
}

activity_name_map.insert(act.activity_name.clone());
}

validated_activities.push(ActivityDefinition {
source,
destination,
start_secs: act.start_secs,
count: act.count,
interval_secs: act.interval_secs,
amount_msat: act.amount_msat,
activity_name: act.activity_name,
});
}

Expand Down
94 changes: 69 additions & 25 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ pub struct ActivityParser {
pub interval_secs: u16,
/// The amount of m_sat to used in this payment.
pub amount_msat: u64,
/// An optional name for the activity.
pub activity_name: Option<String>,
}

/// Data structure used internally by the simulator. Both source and destination are represented as [PublicKey] here.
Expand All @@ -167,6 +169,8 @@ pub struct ActivityDefinition {
pub interval_secs: u16,
/// The amount of m_sat to used in this payment.
pub amount_msat: u64,
/// An optional name for the activity.
pub activity_name: Option<String>,
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -336,14 +340,16 @@ pub enum PaymentOutcome {
}

/// Describes a payment from a source node to a destination node.
#[derive(Debug, Clone, Copy, Serialize)]
#[derive(Debug, Clone, Serialize)]
struct Payment {
/// Pubkey of the source node dispatching the payment.
source: PublicKey,
/// Pubkey of the destination node receiving the payment.
destination: PublicKey,
/// Amount of the payment in msat.
amount_msat: u64,
/// Name of the activity.
activity_name: String,
/// Hash of the payment if it has been successfully dispatched.
#[serde(with = "serializers::serde_option_payment_hash")]
hash: Option<PaymentHash>,
Expand All @@ -352,6 +358,16 @@ struct Payment {
dispatch_time: SystemTime,
}

impl Payment {
fn activity_prefix(&self) -> String {
if !self.activity_name.is_empty() {
format!("{} activity:", self.activity_name)
} else {
"".to_string()
}
}
}

impl Display for Payment {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let dispatch_time = self
Expand All @@ -361,7 +377,8 @@ impl Display for Payment {

write!(
f,
"Payment {} dispatched at {:?} sending {} msat from {} -> {}.",
"{}Payment {} dispatched at {:?} sending {} msat from {} -> {}.",
self.activity_prefix(),
self.hash.map(|h| hex::encode(h.0)).unwrap_or_default(),
dispatch_time,
self.amount_msat,
Expand All @@ -377,7 +394,7 @@ impl Display for Payment {
enum SimulationEvent {
/// Dispatch a payment of the specified amount to the public key provided.
/// Results in `SimulationOutput::SendPaymentSuccess` or `SimulationOutput::SendPaymentFailure`.
SendPayment(NodeInfo, u64),
SendPayment(NodeInfo, u64, String),
}

/// SimulationOutput provides the output of a simulation event.
Expand Down Expand Up @@ -427,6 +444,7 @@ struct ExecutorKit {
/// See [NetworkGraphView] for details.
network_generator: Arc<Mutex<dyn DestinationGenerator>>,
payment_generator: Box<dyn PaymentGenerator>,
activity_name: String,
}

impl Simulation {
Expand Down Expand Up @@ -726,7 +744,12 @@ impl Simulation {
// Note: when we allow configuring both defined and random activity, this will no longer be an if/else, we'll
// just populate with each type as configured.
if !self.activity.is_empty() {
for description in self.activity.iter() {
for (index, description) in self.activity.iter().enumerate() {
let activity_name = match &description.activity_name {
Some(name) => name.clone(),
None => format!("Index {}", index),
};

let activity_generator = DefinedPaymentActivity::new(
description.destination.clone(),
Duration::from_secs(description.start_secs.into()),
Expand All @@ -741,6 +764,7 @@ impl Simulation {
// a single struct which we just cheaply clone.
network_generator: Arc::new(Mutex::new(activity_generator.clone())),
payment_generator: Box::new(activity_generator),
activity_name,
});
}
} else {
Expand Down Expand Up @@ -788,10 +812,11 @@ impl Simulation {
network_generator.lock().await
);

for (node_info, capacity) in active_nodes.values() {
for (index, (node_info, capacity)) in active_nodes.values().enumerate() {
generators.push(ExecutorKit {
source_info: node_info.clone(),
network_generator: network_generator.clone(),
activity_name: format!("Index {index} random"),
payment_generator: Box::new(
RandomPaymentActivity::new(
*capacity,
Expand Down Expand Up @@ -875,13 +900,15 @@ impl Simulation {
let source = executor.source_info.clone();

log::info!(
"Starting activity producer for {}: {}.",
"{} activity: Starting activity producer for {}: {}.",
executor.activity_name,
source,
executor.payment_generator
);

if let Err(e) = produce_events(
executor.source_info,
executor.activity_name,
executor.network_generator,
executor.payment_generator,
pe_sender,
Expand Down Expand Up @@ -918,7 +945,7 @@ async fn consume_events(
simulation_event = receiver.recv() => {
if let Some(event) = simulation_event {
match event {
SimulationEvent::SendPayment(dest, amt_msat) => {
SimulationEvent::SendPayment(dest, amt_msat, activity_name) => {
let mut node = node.lock().await;

let mut payment = Payment {
Expand All @@ -927,6 +954,7 @@ async fn consume_events(
amount_msat: amt_msat,
destination: dest.pubkey,
dispatch_time: SystemTime::now(),
activity_name,
};

let outcome = match node.send_payment(dest.pubkey, amt_msat).await {
Expand Down Expand Up @@ -977,6 +1005,7 @@ async fn consume_events(
/// exit if other threads signal that they have errored out.
async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator + ?Sized>(
source: NodeInfo,
activity_name: String,
network_generator: Arc<Mutex<N>>,
node_generator: Box<A>,
sender: Sender<SimulationEvent>,
Expand Down Expand Up @@ -1026,7 +1055,7 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
let amount = match node_generator.payment_amount(capacity) {
Ok(amt) => {
if amt == 0 {
log::debug!("Skipping zero amount payment for {source} -> {destination}.");
log::debug!("{activity_name} activity : Skipping zero amount payment for {source} -> {destination}.");
continue;
}
amt
Expand All @@ -1039,7 +1068,7 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
log::debug!("Generated payment: {source} -> {}: {amount} msat.", destination);

// Send the payment, exiting if we can no longer send to the consumer.
let event = SimulationEvent::SendPayment(destination.clone(), amount);
let event = SimulationEvent::SendPayment(destination.clone(), amount, activity_name.clone());
if sender.send(event.clone()).await.is_err() {
return Err(SimulationError::MpscChannelError (format!("Stopped activity producer for {amount}: {source} -> {destination}.")));
}
Expand Down Expand Up @@ -1106,8 +1135,13 @@ async fn consume_simulation_results(
}

/// PaymentResultLogger is an aggregate logger that will report on a summary of the payments that have been reported.
#[derive(Default)]
#[derive(Default, Clone)]
struct PaymentResultLogger {
activity_results: HashMap<String, PaymentActivityResult>,
}

#[derive(Default, Clone)]
struct PaymentActivityResult {
success_payment: u64,
failed_payment: u64,
total_sent: u64,
Expand All @@ -1116,30 +1150,40 @@ struct PaymentResultLogger {
impl PaymentResultLogger {
fn new() -> Self {
PaymentResultLogger {
..Default::default()
activity_results: HashMap::new(),
}
}

fn report_result(&mut self, details: &Payment, result: &PaymentResult) {
let activity_name = details.activity_prefix();
let activity_result = self
.activity_results
.entry(activity_name.clone())
.or_default();

match result.payment_outcome {
PaymentOutcome::Success => self.success_payment += 1,
_ => self.failed_payment += 1,
PaymentOutcome::Success => activity_result.success_payment += 1,
_ => activity_result.failed_payment += 1,
}

self.total_sent += details.amount_msat;
activity_result.total_sent += details.amount_msat;
}
}

impl Display for PaymentResultLogger {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let total_payments = self.success_payment + self.failed_payment;
write!(
f,
"Processed {} payments sending {} msat total with {:.2}% success rate.",
total_payments,
self.total_sent,
(self.success_payment as f64 / total_payments as f64) * 100.0
)
for (activity_name, activity_result) in &self.activity_results {
let total_payments = activity_result.success_payment + activity_result.failed_payment;
writeln!(
f,
"{} Processed {} payments sending {} msat total with {:.2}% success rate.",
activity_name,
total_payments,
activity_result.total_sent,
(activity_result.success_payment as f64 / total_payments as f64) * 100.0
)?;
}
Ok(())
}
}

Expand Down Expand Up @@ -1197,14 +1241,14 @@ async fn produce_simulation_results(
SimulationOutput::SendPaymentSuccess(payment) => {
if let Some(source_node) = nodes.get(&payment.source) {
set.spawn(track_payment_result(
source_node.clone(), results.clone(), payment, listener.clone()
source_node.clone(), results.clone(), payment.clone(), listener.clone()
));
} else {
break Err(SimulationError::MissingNodeError(format!("Source node with public key: {} unavailable.", payment.source)));
}
},
SimulationOutput::SendPaymentFailure(payment, result) => {
if results.send((payment, result.clone())).await.is_err() {
if results.send((payment.clone(), result.clone())).await.is_err() {
break Err(SimulationError::MpscChannelError(
format!("Failed to send payment result: {result} for payment {:?} dispatched at {:?}.", payment.hash, payment.dispatch_time),
));
Expand Down Expand Up @@ -1272,7 +1316,7 @@ async fn track_payment_result(
_ = listener.clone() => {
log::debug!("Track payment result received a shutdown signal.");
},
send_payment_result = results.send((payment, res.clone())) => {
send_payment_result = results.send((payment.clone(), res.clone())) => {
if send_payment_result.is_err() {
return Err(SimulationError::MpscChannelError(format!("Failed to send payment result {res} for payment {payment}.")))
}
Expand Down

0 comments on commit 5307a4b

Please sign in to comment.