Skip to content

Commit

Permalink
Merge pull request #173 from bjohnson5/168-start-and-count
Browse files Browse the repository at this point in the history
Issue #168: Adding optional start and count values to sim.json file
  • Loading branch information
carlaKC authored Apr 4, 2024
2 parents 0763aea + 1f0163b commit dc544fa
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 14 deletions.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,31 @@ to execute. Note that `source` nodes *must* be contained in `nodes`,
but destination nodes can be any public node in the network (though
this may result in liquidity draining over time).

Required fields:
```
"source": the payer
"destination": the payee
"interval_secs": how often the payments should be sent
"amount_msat": the amount of each payment
```

Optional fields:
```
"start_secs": the time to start sending payments
"count": the total number of payments to send
```

> If `start_secs` is not provided the payments will begin as soon as the simulation starts (default=0)
> If `count` is not provided the payments will continue for as long as the simulation runs (default=None)
The example simulation file below sets up the following simulation:
* Connect to `Alice` running LND to generate activity.
* Connect to `Bob` running CLN to generate activity.
* Dispatch 2000 msat payments from `Alice` to `Carol` every 1 seconds.
* Dispatch 140000 msat payments from `Bob` to `Alice` every 50 seconds.
* Dispatch 1000 msat payments from `Bob` to `Dave` every 2 seconds.
* Dispatch 10 payments (5000 msat each) from `Erin` to `Frank` at 2 second intervals, starting 20 seconds into the sim.
```
{
"nodes": [
Expand All @@ -162,6 +181,18 @@ The example simulation file below sets up the following simulation:
"ca_cert": "/path/ca.pem",
"client_cert": "/path/client.pem",
"client_key": "/path/client-key.pem"
},
{
"id": "Erin",
"address": "https://localhost:10012",
"macaroon": "/path/admin.macaroon",
"cert": "/path/tls.cert"
},
{
"id": "Frank",
"address": "https://localhost:10014",
"macaroon": "/path/admin.macaroon",
"cert": "/path/tls.cert"
}
],
"activity": [
Expand All @@ -182,6 +213,14 @@ The example simulation file below sets up the following simulation:
"destination": "03232e245059a2e7f6e32d6c4bca97fc4cda935c553ea3693adb3265a19050c3bf",
"interval_secs": 2,
"amount_msat": 1000
},
{
"source": "Erin",
"destination": "Frank",
"start_secs": 20,
"count": 10,
"interval_secs": 2,
"amount_msat": 5000
}
]
}
Expand Down
2 changes: 2 additions & 0 deletions sim-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ async fn main() -> anyhow::Result<()> {
validated_activities.push(ActivityDefinition {
source,
destination,
start_secs: act.start_secs,
count: act.count,
interval_secs: act.interval_secs,
amount_msat: act.amount_msat,
});
Expand Down
29 changes: 26 additions & 3 deletions sim-lib/src/defined_activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,24 @@ use tokio::time::Duration;
#[derive(Clone)]
pub struct DefinedPaymentActivity {
destination: NodeInfo,
start: Duration,
count: Option<u64>,
wait: Duration,
amount: u64,
}

impl DefinedPaymentActivity {
pub fn new(destination: NodeInfo, wait: Duration, amount: u64) -> Self {
pub fn new(
destination: NodeInfo,
start: Duration,
count: Option<u64>,
wait: Duration,
amount: u64,
) -> Self {
DefinedPaymentActivity {
destination,
start,
count,
wait,
amount,
}
Expand All @@ -36,6 +46,14 @@ impl DestinationGenerator for DefinedPaymentActivity {
}

impl PaymentGenerator for DefinedPaymentActivity {
fn payment_start(&self) -> Duration {
self.start
}

fn payment_count(&self) -> Option<u64> {
self.count
}

fn next_payment_wait(&self) -> Duration {
self.wait
}
Expand Down Expand Up @@ -69,8 +87,13 @@ mod tests {
let source = get_random_keypair();
let payment_amt = 50;

let generator =
DefinedPaymentActivity::new(node.clone(), Duration::from_secs(60), payment_amt);
let generator = DefinedPaymentActivity::new(
node.clone(),
Duration::from_secs(0),
None,
Duration::from_secs(60),
payment_amt,
);

let (dest, dest_capacity) = generator.choose_destination(source.1);
assert_eq!(node.pubkey, dest.pubkey);
Expand Down
81 changes: 70 additions & 11 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl NodeId {
crate::NodeId::PublicKey(pk) => {
if pk != node_id {
return Err(LightningError::ValidationError(format!(
"the provided node id does not match the one returned by the backend ({} != {}).",
"The provided node id does not match the one returned by the backend ({} != {}).",
pk, node_id
)));
}
Expand Down Expand Up @@ -139,6 +139,12 @@ pub struct ActivityParser {
/// The destination of the payment.
#[serde(with = "serializers::serde_node_id")]
pub destination: NodeId,
/// The time in the simulation to start the payment.
#[serde(default)]
pub start_secs: u16,
/// The number of payments to send over the course of the simulation.
#[serde(default)]
pub count: Option<u64>,
/// The interval of the event, as in every how many seconds the payment is performed.
pub interval_secs: u16,
/// The amount of m_sat to used in this payment.
Expand All @@ -153,6 +159,10 @@ pub struct ActivityDefinition {
pub source: NodeInfo,
/// The destination of the payment.
pub destination: NodeInfo,
/// The time in the simulation to start the payment.
pub start_secs: u16,
/// The number of payments to send over the course of the simulation.
pub count: Option<u64>,
/// The interval of the event, as in every how many seconds the payment is performed.
pub interval_secs: u16,
/// The amount of m_sat to used in this payment.
Expand Down Expand Up @@ -261,6 +271,12 @@ pub trait DestinationGenerator: Send {
pub struct PaymentGenerationError(String);

pub trait PaymentGenerator: Display + Send {
/// Returns the time that the payments should start
fn payment_start(&self) -> Duration;

/// Returns the number of payments that should be made
fn payment_count(&self) -> Option<u64>;

/// Returns the number of seconds that a node should wait until firing its next payment.
fn next_payment_wait(&self) -> time::Duration;

Expand Down Expand Up @@ -554,9 +570,25 @@ impl Simulation {
);

// Next, we'll spin up our actual producers that will be responsible for triggering the configured activity.
self.dispatch_producers(activities, consumer_channels, &mut tasks)
// The producers will use their own JoinSet so that the simulation can be shutdown if they all finish.
let mut producer_tasks = JoinSet::new();
self.dispatch_producers(activities, consumer_channels, &mut producer_tasks)
.await?;

// Start a task that waits for the producers to finish.
// If all producers finish, then there is nothing left to do and the simulation can be shutdown.
let producer_trigger = self.shutdown_trigger.clone();
tasks.spawn(async move {
while let Some(res) = producer_tasks.join_next().await {
if let Err(e) = res {
log::error!("Producer exited with error: {e}.");
}
}
log::info!("All producers finished. Shutting down.");
producer_trigger.trigger()
});

// Start a task that will shutdown the simulation if the total_time is met.
if let Some(total_time) = self.total_time {
let t = self.shutdown_trigger.clone();
let l = self.shutdown_listener.clone();
Expand Down Expand Up @@ -639,7 +671,7 @@ impl Simulation {
// csr: consume simulation results
let csr_write_results = self.write_results.clone();
tasks.spawn(async move {
log::debug!("Staring simulation results consumer.");
log::debug!("Starting simulation results consumer.");
if let Err(e) = consume_simulation_results(
result_logger,
results_receiver,
Expand Down Expand Up @@ -667,6 +699,8 @@ impl Simulation {
for description in self.activity.iter() {
let activity_generator = DefinedPaymentActivity::new(
description.destination.clone(),
Duration::from_secs(description.start_secs.into()),
description.count,
Duration::from_secs(description.interval_secs.into()),
description.amount_msat,
);
Expand Down Expand Up @@ -777,9 +811,9 @@ impl Simulation {
consume_events(ce_node, receiver, ce_output_sender, ce_listener).await
{
ce_shutdown.trigger();
log::error!("Event consumer exited with error: {e:?}.");
log::error!("Event consumer for node {node_info} exited with error: {e:?}.");
} else {
log::debug!("Event consumer for node {node_info} received shutdown signal.");
log::debug!("Event consumer for node {node_info} completed successfully.");
}
});
}
Expand Down Expand Up @@ -826,9 +860,9 @@ impl Simulation {
.await
{
pe_shutdown.trigger();
log::debug!("Event producer exited with error {e}.");
log::debug!("Activity producer for {source} exited with error {e}.");
} else {
log::debug!("Random activity generator for {source} received shutdown signal.");
log::debug!("Activity producer for {source} completed successfully.");
}
});
}
Expand Down Expand Up @@ -918,9 +952,33 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
sender: Sender<SimulationEvent>,
listener: Listener,
) -> Result<(), SimulationError> {
let mut current_count = 0;
loop {
let wait = node_generator.next_payment_wait();
log::debug!("Next payment for {source} in {:?} seconds.", wait);
if let Some(c) = node_generator.payment_count() {
if c == current_count {
log::info!(
"Payment count has been met for {source}: {c} payments. Stopping the activity."
);
return Ok(());
}
}

let wait: Duration = if current_count == 0 {
let start = node_generator.payment_start();
if start != Duration::from_secs(0) {
log::debug!(
"Using a start delay. The first payment for {source} will be at {:?}.",
start
);
}
start
} else {
log::debug!(
"Next payment for {source} in {:?}.",
node_generator.next_payment_wait()
);
node_generator.next_payment_wait()
};

select! {
biased;
Expand Down Expand Up @@ -948,14 +1006,15 @@ async fn produce_events<N: DestinationGenerator + ?Sized, A: PaymentGenerator +
},
};

log::debug!("Generated random payment: {source} -> {}: {amount} msat.", destination);
log::debug!("Generated payment: {source} -> {}: {amount} msat.", destination);

// Send the payment, exiting if we can no longer send to the consumer.
let event = SimulationEvent::SendPayment(destination.clone(), amount);
if sender.send(event.clone()).await.is_err() {
return Err(SimulationError::MpscChannelError (format!("Stopped random producer for {amount}: {source} -> {destination}.")));
return Err(SimulationError::MpscChannelError (format!("Stopped activity producer for {amount}: {source} -> {destination}.")));
}

current_count += 1;
},
}
}
Expand Down
10 changes: 10 additions & 0 deletions sim-lib/src/random_activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@ fn events_per_month(source_capacity_msat: u64, multiplier: f64, expected_payment
}

impl PaymentGenerator for RandomPaymentActivity {
/// Returns the time that the payments should start. This will always be 0 for the RandomPaymentActivity type.
fn payment_start(&self) -> Duration {
Duration::from_secs(0)
}

/// Returns the number of payments that should be made. This will always be None for the RandomPaymentActivity type.
fn payment_count(&self) -> Option<u64> {
None
}

/// Returns the amount of time until the next payment should be scheduled for the node.
fn next_payment_wait(&self) -> Duration {
let mut rng = rand::thread_rng();
Expand Down

0 comments on commit dc544fa

Please sign in to comment.