Skip to content

Commit

Permalink
sim-lib: add dispatch of random activity generation
Browse files Browse the repository at this point in the history
  • Loading branch information
carlaKC committed Oct 2, 2023
1 parent 0493680 commit 9214ae9
Showing 1 changed file with 211 additions and 11 deletions.
222 changes: 211 additions & 11 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@ use thiserror::Error;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tokio::time;
use tokio::time::Duration;
use tokio::{select, time};
use triggered::{Listener, Trigger};

use self::random_activity::{NetworkGraphView, PaymentActivityGenerator};

pub mod cln;
pub mod lnd;
mod random_activity;
mod serializers;

/// The default expected payment amount for the simulation, around ~$10 at the time of writing.
pub const EXPECTED_PAYMENT_AMOUNT: u64 = 3_800_000;

/// The number of times over each node in the network sends its total deployed capacity in a calendar month.
pub const ACTIVITY_MULTIPLIER: f64 = 2.0;

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum NodeConnection {
#[serde(alias = "lnd", alias = "Lnd")]
Expand Down Expand Up @@ -239,6 +247,11 @@ pub struct Simulation {
total_time: Option<time::Duration>,
/// The number of activity results to batch before printing in CSV.
print_batch_size: u32,
/// The expected payment size for the network.
expected_payment_msat: u64,
/// The number of times that the network sends its total capacity in a month of operation when generating random
/// activity.
activity_multiplier: f64,
}

const DEFAULT_PRINT_BATCH_SIZE: u32 = 500;
Expand All @@ -258,6 +271,8 @@ impl Simulation {
shutdown_listener,
total_time: total_time.map(|x| Duration::from_secs(x as u64)),
print_batch_size: print_batch_size.unwrap_or(DEFAULT_PRINT_BATCH_SIZE),
expected_payment_msat: EXPECTED_PAYMENT_AMOUNT,
activity_multiplier: ACTIVITY_MULTIPLIER,
}
}

Expand Down Expand Up @@ -326,21 +341,49 @@ impl Simulation {
let (event_sender, event_receiver) = channel(1);
self.run_data_collection(event_receiver, &mut tasks);

// Create consumers for every source node that is listed in our activity.
let collecting_nodes = self
.activity
.iter()
.map(|activity| activity.source)
.collect();
// Create consumers for every source node when dealing with activity descriptions, or only for nodes with
// sufficient capacity if generating random activity. Since we have to query the capacity of every node
// in our network for picking random activity nodes, we cache this value here to be used later when we spin
// up producers.
let mut random_activity_nodes = None;
let collecting_nodes = if !self.activity.is_empty() {
self.activity
.iter()
.map(|activity| activity.source)
.collect()
} else {
random_activity_nodes = Some(self.random_activity_nodes().await?);
random_activity_nodes
.as_ref()
.unwrap()
.keys()
.cloned()
.collect()
};

let producer_senders =
self.dispatch_consumers(collecting_nodes, event_sender.clone(), &mut tasks);

// Next, we'll spin up our actual activity generator that will be responsible for triggering the activity that
// has been configured, passing in the channel that is used to notify data collection that events have been
// generated.
self.dispatch_activity_producers(producer_senders, &mut tasks)
.await;
// has been configured (if any), passing in the channel that is used to notify data collection that events have
// been generated. Alternatively, we'll generate random activity if there is no activity specified.
if !self.activity.is_empty() {
self.dispatch_activity_producers(producer_senders, &mut tasks)
.await;
} else {
log::info!(
"Generating random activity with multiplier: {}, average payment amount: {}.",
self.activity_multiplier,
self.expected_payment_msat
);
self.dispatch_random_producers(
// Safe to unwrap because we set Some when selecting our nodes to run consumers for.
random_activity_nodes.unwrap(),
producer_senders,
&mut tasks,
)
.await?;
}

if let Some(total_time) = self.total_time {
let t = self.shutdown_trigger.clone();
Expand Down Expand Up @@ -404,6 +447,29 @@ impl Simulation {
log::debug!("Simulator data collection set up.");
}

/// Returns the list of nodes that are eligible for generating random acitvity on. This is the subset of nodes
/// that have sufficient capacity to generate payments of our expected payment amount.
async fn random_activity_nodes(&self) -> Result<HashMap<PublicKey, u64>, SimulationError> {
// Collect capacity of each node from its view of its own channels. Total capacity is divided by two to
// avoid double counting capacity (as each node has a counterparty in the channel).
let mut node_capacities = HashMap::new();
for (pk, node) in self.nodes.iter() {
let chan_capacity = node.lock().await.list_channels().await?.iter().sum::<u64>();

if let Err(e) = PaymentActivityGenerator::validate_capacity(
chan_capacity,
self.expected_payment_msat,
) {
log::warn!("Node: {} not eligible for activity generation: {e}.", *pk);
continue;
}

node_capacities.insert(*pk, chan_capacity / 2);
}

Ok(node_capacities)
}

/// Responsible for spinning up consumer tasks for each node specified in consuming_nodes. Assumes that validation
/// has already ensured that we have execution on every nodes listed in consuming_nodes.
fn dispatch_consumers(
Expand Down Expand Up @@ -456,6 +522,67 @@ impl Simulation {
));
}
}

/// Responsible for spinning up producers for a set of activity descriptions. Requires that node capacities are
/// provided for each node represented in producer channels.
async fn dispatch_random_producers(
&self,
node_capacities: HashMap<PublicKey, u64>,
producer_channels: HashMap<PublicKey, Sender<SimulationEvent>>,
tasks: &mut JoinSet<()>,
) -> Result<(), SimulationError> {
// Validate that we have at least two nodes to generate activities on (we need each to feasibly have a node
// that is not itself to send to).
if node_capacities.len() < 2 {
return Err(SimulationError::RandomActivityError(
"at least two nodes with sufficient capacity required for random activity"
.to_string(),
));
}

let network_generator =
Arc::new(Mutex::new(NetworkGraphView::new(node_capacities.clone())?));

log::info!(
"Created network generator: {}.",
network_generator.lock().await
);

for (pk, sender) in producer_channels.iter() {
let source_capacity = match node_capacities.get(pk) {
Some(capacity) => *capacity,
None => {
return Err(SimulationError::RandomActivityError(format!(
"Random activity generator run for: {} with unknown capacity.",
*pk
)));
}
};

let node_generator = match PaymentActivityGenerator::new(
source_capacity,
self.expected_payment_msat,
self.activity_multiplier,
) {
Ok(generator) => generator,
Err(e) => {
log::warn!("Could not create activity generator for: {}: {e}.", *pk);
continue;
}
};

tasks.spawn(produce_random_events(
*pk,
network_generator.clone(),
node_generator,
sender.clone(),
self.shutdown_trigger.clone(),
self.shutdown_listener.clone(),
));
}

Ok(())
}
}

// consume_events processes events that are crated for a lightning node that we can execute events on. Any output
Expand Down Expand Up @@ -569,6 +696,79 @@ async fn produce_events(
shutdown.trigger();
}

async fn produce_random_events<N: NetworkGenerator, A: PaymentGenerator + Display>(
source: PublicKey,
network_generator: Arc<Mutex<N>>,
node_generator: A,
sender: Sender<SimulationEvent>,
shutdown: Trigger,
listener: Listener,
) {
log::info!("Started random activity producer for {source}: {node_generator}.");

loop {
let wait = node_generator.next_payment_wait();
log::debug!("Next payment for {source} in {:?} seconds.", wait);

select! {
biased;
_ = listener.clone() => {
log::debug!("Random activity generator for {source} received signal to shut down.");
break;
},

// Wait until our time to next payment has elapsed then execute a random amount payment to a random
// destination.
_ = time::sleep(wait) => {
let generator = network_generator.lock().await;

let destination = match generator.sample_node_by_capacity(source) {
Ok(dest) => dest,
Err(e) => {
// There's a small chance that we can't get a destination that is not ourselves in small
// networks (see trait docs for details). We don't treat this as a terminal error because we
// assume that our validation has checked that there are other options available.
log::debug!("Skipping payment {source}: {e}.");
continue;
}
};

// Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
// a payment amount something has gone wrong (because we should have validated that we can always
// generate amounts), so we exit.
let amount = match node_generator.payment_amount(destination.1) {
Ok(amt) => {
if amt == 0 {
log::debug!("Skipping zero amount payment for {source} -> {}.", destination.0);
continue;
}

amt
},
Err(e) => {
log::error!("Could not get amount for {source} -> {}: {e}.", destination.0);
break;
},
};

log::debug!("Generated random payment: {source} -> {}: {amount} msat.", destination.0);

// Send the payment, exiting if we can no longer send to the consumer.
let event = SimulationEvent::SendPayment(destination.0, amount);
if let Err(e) = sender.send(event).await {
log::debug!(
"Stopped random producer for {amount}: {source} -> {}. Consumer error: {e}.", destination.0,
);
break;
}
},
}
}

log::debug!("Stopped random activity producer {source}.");
shutdown.trigger();
}

async fn consume_simulation_results(
receiver: Receiver<(DispatchedPayment, PaymentResult)>,
listener: Listener,
Expand Down

0 comments on commit 9214ae9

Please sign in to comment.