Skip to content

Commit

Permalink
sim-lib/refactor: use single dispatch for random and define activities
Browse files Browse the repository at this point in the history
  • Loading branch information
carlaKC committed Jan 19, 2024
1 parent 33c456d commit 3aaaf6b
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 101 deletions.
6 changes: 5 additions & 1 deletion sim-lib/src/defined_activity.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{DestinationGenerator, NodeInfo, PaymentGenerationError, PaymentGenerator};
use crate::{
DestinationGenerator, NodeInfo, PayGenDisplay, PaymentGenerationError, PaymentGenerator,
};
use core::fmt;
use std::fmt::Display;
use tokio::time::Duration;
Expand All @@ -20,6 +22,8 @@ impl DefinedPaymentActivity {
}
}

impl PayGenDisplay for DefinedPaymentActivity {}

impl Display for DefinedPaymentActivity {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
Expand Down
204 changes: 105 additions & 99 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,19 @@ pub struct Simulation {
no_results: bool,
}

/// PayGenDisplay is a marker trait to allow creation of trait objects for printable payment generators.
pub trait PayGenDisplay: PaymentGenerator + Display {}

/// ExecutorKit contains the components required to spin up an activity configured by the user, to be used to
/// spin up the appropriate producers and consumers for the activity.
struct ExecutorKit {
source_info: NodeInfo,
/// We use an arc mutex here because some implementations of the trait will be very expensive to clone.
/// See [NetworkGraphView] for details.
network_generator: Arc<Mutex<dyn DestinationGenerator + Send>>,
payment_generator: Box<dyn PayGenDisplay + Send>,
}

impl Simulation {
pub fn new(
nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode + Send>>>,
Expand Down Expand Up @@ -467,39 +480,20 @@ impl Simulation {
let (event_sender, event_receiver) = channel(1);
self.run_data_collection(event_receiver, &mut tasks);

// Create consumers for every source node when dealing with activity descriptions, or only for nodes with
// sufficient capacity if generating random activity. Since we have to query the capacity of every node
// in our network for picking random activity nodes, we cache this value here to be used later when we spin
// up producers.
let mut random_activity_nodes = HashMap::new();
let collecting_nodes = if !self.activity.is_empty() {
self.activity
// Get an execution kit per activity that we need to generate and spin up consumers for each source node.
let activities = self.activity_executors().await?;
let consumer_channels = self.dispatch_consumers(
activities
.iter()
.map(|activity| activity.source.pubkey)
.collect()
} else {
random_activity_nodes.extend(self.random_activity_nodes().await?);
random_activity_nodes.keys().cloned().collect()
};

let consumer_channels =
self.dispatch_consumers(collecting_nodes, event_sender.clone(), &mut tasks);
.map(|generator| generator.source_info.pubkey)
.collect(),
event_sender.clone(),
&mut tasks,
);

// Next, we'll spin up our actual activity generator that will be responsible for triggering the activity that
// has been configured (if any), passing in the channel that is used to notify data collection that events have
// been generated. Alternatively, we'll generate random activity if there is no activity specified.
if !self.activity.is_empty() {
self.dispatch_activity_producers(consumer_channels, &mut tasks)
.await;
} else {
log::info!(
"Generating random activity with multiplier: {}, average payment amount: {}.",
self.activity_multiplier,
self.expected_payment_msat
);
self.dispatch_random_producers(random_activity_nodes, consumer_channels, &mut tasks)
.await?;
}
// Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
self.dispatch_producers(activities, consumer_channels, &mut tasks)
.await?;

if let Some(total_time) = self.total_time {
let t = self.shutdown_trigger.clone();
Expand Down Expand Up @@ -572,14 +566,45 @@ impl Simulation {
log::debug!("Simulator data collection set up.");
}

async fn activity_executors(&self) -> Result<Vec<ExecutorKit>, SimulationError> {
let mut generators = Vec::new();

// Note: when we allow configuring both defined and random activity, this will no longer be an if/else, we'll
// just populate with each type as configured.
if !self.activity.is_empty() {
for description in self.activity.iter() {
let activity_generator = DefinedPaymentActivity::new(
description.destination.clone(),
Duration::from_secs(description.interval_secs.into()),
description.amount_msat,
);

generators.push(ExecutorKit {
source_info: description.source.clone(),
// Defined activities have very simple generators, so the traits required are implemented on
// a single struct which we just cheaply clone.
network_generator: Arc::new(Mutex::new(activity_generator.clone())),
payment_generator: Box::new(activity_generator),
});
}
} else {
generators = self.random_activity_nodes().await?;
}

Ok(generators)
}

/// Returns the list of nodes that are eligible for generating random activity on. This is the subset of nodes
/// that have sufficient capacity to generate payments of our expected payment amount.
async fn random_activity_nodes(
&self,
) -> Result<HashMap<PublicKey, (NodeInfo, u64)>, SimulationError> {
async fn random_activity_nodes(&self) -> Result<Vec<ExecutorKit>, SimulationError> {
// Collect capacity of each node from its view of its own channels. Total capacity is divided by two to
// avoid double counting capacity (as each node has a counterparty in the channel).
let mut node_capacities = HashMap::new();
let mut generators = Vec::new();
let mut active_nodes = HashMap::new();

// Do a first pass to get the capacity of each node which we need to be able to create a network generator.
// While we're at it, we get the node info and store it with capacity to create activity generators in our
// second pass.
for (pk, node) in self.nodes.iter() {
let chan_capacity = node.lock().await.list_channels().await?.iter().sum::<u64>();

Expand All @@ -590,16 +615,39 @@ impl Simulation {
continue;
}

node_capacities.insert(
*pk,
(
node.lock().await.get_node_info(pk).await?,
chan_capacity / 2,
// Don't double count channel capacity because each channel reports the total balance between counter
// parities. Track capacity separately to be used for our network generator.
let capacity = chan_capacity / 2;
let node_info = node.lock().await.get_node_info(pk).await?;
active_nodes.insert(node_info.pubkey, (node_info, capacity));
}

let network_generator = Arc::new(Mutex::new(
NetworkGraphView::new(active_nodes.values().cloned().collect())
.map_err(SimulationError::RandomActivityError)?,
));

log::info!(
"Created network generator: {}.",
network_generator.lock().await
);

for (node_info, capacity) in active_nodes.values() {
generators.push(ExecutorKit {
source_info: node_info.clone(),
network_generator: network_generator.clone(),
payment_generator: Box::new(
RandomPaymentActivity::new(
*capacity,
self.expected_payment_msat,
self.activity_multiplier,
)
.map_err(SimulationError::RandomActivityError)?,
),
);
});
}

Ok(node_capacities)
Ok(generators)
}

/// Responsible for spinning up consumer tasks for each node specified in consuming_nodes. Assumes that validation
Expand Down Expand Up @@ -636,73 +684,31 @@ impl Simulation {
channels
}

/// Responsible for spinning up producers for a set of activity descriptions.
async fn dispatch_activity_producers(
/// Responsible for spinning up producers for a set of activities. Requires that a consumer channel is present
/// for every source node in the set of executors.
async fn dispatch_producers(
&self,
executors: Vec<ExecutorKit>,
producer_channels: HashMap<PublicKey, Sender<SimulationEvent>>,
tasks: &mut JoinSet<()>,
) {
for description in self.activity.iter() {
let sender_chan = producer_channels.get(&description.source.pubkey).unwrap();
let generator = DefinedPaymentActivity::new(
description.destination.clone(),
Duration::from_secs(description.interval_secs.into()),
description.amount_msat,
);

tasks.spawn(produce_events(
description.source.clone(),
Arc::new(Mutex::new(generator.clone())),
generator,
sender_chan.clone(),
self.shutdown_trigger.clone(),
self.shutdown_listener.clone(),
));
}
}

/// Responsible for spinning up producers for a set of activity descriptions. Requires that node capacities are
/// provided for each node represented in producer channels.
async fn dispatch_random_producers(
&self,
node_capacities: HashMap<PublicKey, (NodeInfo, u64)>,
consumer_channels: HashMap<PublicKey, Sender<SimulationEvent>>,
tasks: &mut JoinSet<()>,
) -> Result<(), SimulationError> {
let network_generator = Arc::new(Mutex::new(
NetworkGraphView::new(node_capacities.values().cloned().collect())
.map_err(SimulationError::RandomActivityError)?,
));

log::info!(
"Created network generator: {}.",
network_generator.lock().await
);

for (pk, sender) in consumer_channels.into_iter() {
let (info, source_capacity) = match node_capacities.get(&pk) {
Some((info, capacity)) => (info.clone(), *capacity),
for executor in executors {
let sender = match producer_channels.get(&executor.source_info.pubkey) {
Some(producer) => producer,
None => {
return Err(SimulationError::RandomActivityError(
RandomActivityError::ValueError(format!(
"Random activity generator run for: {} with unknown capacity.",
pk
"Activity producer for: {} not found.",
executor.source_info.pubkey,
)),
));
))
}
};

let node_generator = RandomPaymentActivity::new(
source_capacity,
self.expected_payment_msat,
self.activity_multiplier,
)
.map_err(SimulationError::RandomActivityError)?;

tasks.spawn(produce_events(
info,
network_generator.clone(),
node_generator,
executor.source_info,
executor.network_generator,
executor.payment_generator,
sender.clone(),
self.shutdown_trigger.clone(),
self.shutdown_listener.clone(),
Expand Down Expand Up @@ -784,10 +790,10 @@ async fn consume_events(
}
}

async fn produce_events<N: DestinationGenerator, A: PaymentGenerator + Display>(
async fn produce_events<N: DestinationGenerator + ?Sized, A: PayGenDisplay + ?Sized>(
source: NodeInfo,
network_generator: Arc<Mutex<N>>,
node_generator: A,
node_generator: Box<A>,
sender: Sender<SimulationEvent>,
shutdown: Trigger,
listener: Listener,
Expand Down
6 changes: 5 additions & 1 deletion sim-lib/src/random_activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use bitcoin::secp256k1::PublicKey;
use rand_distr::{Distribution, Exp, LogNormal, WeightedIndex};
use std::time::Duration;

use crate::{DestinationGenerator, NodeInfo, PaymentGenerationError, PaymentGenerator};
use crate::{
DestinationGenerator, NodeInfo, PayGenDisplay, PaymentGenerationError, PaymentGenerator,
};

const HOURS_PER_MONTH: u64 = 30 * 24;
const SECONDS_PER_MONTH: u64 = HOURS_PER_MONTH * 60 * 60;
Expand Down Expand Up @@ -260,6 +262,8 @@ impl Display for RandomPaymentActivity {
}
}

impl PayGenDisplay for RandomPaymentActivity {}

#[cfg(test)]
mod tests {
mod test_network_graph_view {
Expand Down

0 comments on commit 3aaaf6b

Please sign in to comment.