Skip to content

Commit

Permalink
sim-lib/refactor: redefine payment to imply dispatch state
Browse files Browse the repository at this point in the history
- Instead of `DispatchedPayment`, we define a stateful `Payment` type that includes a `hash` if successfully dispatched
  • Loading branch information
okjodom committed Sep 21, 2023
1 parent 0b0c670 commit 0615f4b
Showing 1 changed file with 70 additions and 50 deletions.
120 changes: 70 additions & 50 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,6 @@ pub trait LightningNode {
async fn get_node_features(&mut self, node: PublicKey) -> Result<NodeFeatures, LightningError>;
}

/// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions
/// on.
#[derive(Clone, Copy)]
enum SimulationEvent {
// Dispatch a payment of the specified amount to the public key provided.
SendPayment(PublicKey, u64),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PaymentResult {
pub htlc_count: usize,
Expand Down Expand Up @@ -180,18 +172,24 @@ pub enum PaymentOutcome {
Unknown,
}

/// Describes a payment from a source node to a destination node.
#[derive(Debug, Clone, Copy, Serialize)]
struct DispatchedPayment {
struct Payment {
/// Pubkey of the source node dispatching the payment.
source: PublicKey,
/// Pubkey of the destination node receiving the payment.
destination: PublicKey,
/// Amount of the payment in msat.
amount_msat: u64,
/// Hash of the payment if it has been successfully dispatched.
#[serde(with = "serializers::serde_option_payment_hash")]
hash: Option<PaymentHash>,
amount_msat: u64,
/// Time at which the payment was dispatched.
#[serde(with = "serde_millis")]
dispatch_time: SystemTime,
}

impl Display for DispatchedPayment {
impl Display for Payment {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let hash = match self.hash {
Some(hash) => hex::encode(hash.0),
Expand All @@ -210,10 +208,23 @@ impl Display for DispatchedPayment {
}
}

/// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions
/// on.
#[derive(Clone, Copy)]
enum SimulationEvent {
/// Dispatch a payment of the specified amount to the public key provided.
/// Results in `SimulationOutput::SendPaymentSuccess` or `SimulationOutput::SendPaymentFailure`.
SendPayment(PublicKey, u64),
}

/// SimulationOutput provides the output of a simulation event.
enum SimulationOutput {
// The payment hash that results from a SendPayment SimulationEvent being triggered.
PaymentSent(DispatchedPayment),
/// Intermediate output for when simulator has successfully dispatched a payment.
/// We need to track the result of the payment to report on it.
SendPaymentSuccess(Payment),
/// Final output for when simulator has failed to dispatch a payment.
/// Report this as the final result of simulation event.
SendPaymentFailure(Payment, PaymentResult),
}

#[derive(Clone)]
Expand Down Expand Up @@ -445,23 +456,26 @@ async fn consume_events(
match event {
SimulationEvent::SendPayment(dest, amt_msat) => {
let mut node = node.lock().await;
let mut payment = DispatchedPayment {

let mut payment = Payment {
source: node.get_info().pubkey,
hash: None,
amount_msat: amt_msat,
destination: dest,
dispatch_time: SystemTime::now(),
};

match node.send_payment(dest, amt_msat).await {
let outcome = match node.send_payment(dest, amt_msat).await {
Ok(payment_hash) => {
log::debug!(
"Send payment: {} -> {}: ({}).",
node_id,
dest,
hex::encode(payment_hash.0)
);
// We need to track the payment outcome using the payment hash that we have received.
payment.hash = Some(payment_hash);
SimulationOutput::SendPaymentSuccess(payment)
}
Err(e) => {
log::error!(
Expand All @@ -470,10 +484,10 @@ async fn consume_events(
dest,
e
);
SimulationOutput::SendPaymentFailure(payment, PaymentResult::default())
}
};

let outcome = SimulationOutput::PaymentSent(payment);
match sender.send(outcome).await {
Ok(_) => {}
Err(e) => {
Expand Down Expand Up @@ -538,7 +552,7 @@ async fn produce_events(
}

async fn consume_simulation_results(
receiver: Receiver<(DispatchedPayment, PaymentResult)>,
receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
) {
Expand All @@ -552,7 +566,7 @@ async fn consume_simulation_results(
}

async fn write_payment_results(
mut receiver: Receiver<(DispatchedPayment, PaymentResult)>,
mut receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
) -> Result<(), SimulationError> {
Expand Down Expand Up @@ -622,7 +636,7 @@ impl PaymentResultLogger {
}
}

fn report_result(&mut self, details: &DispatchedPayment, result: &PaymentResult) {
fn report_result(&mut self, details: &Payment, result: &PaymentResult) {
match result.payment_outcome {
PaymentOutcome::Success => self.success_payment += 1,
_ => self.failed_payment += 1,
Expand Down Expand Up @@ -655,7 +669,7 @@ impl PaymentResultLogger {
async fn produce_simulation_results(
nodes: HashMap<PublicKey, Arc<Mutex<dyn LightningNode + Send>>>,
mut output_receiver: Receiver<SimulationOutput>,
results: Sender<(DispatchedPayment, PaymentResult)>,
results: Sender<(Payment, PaymentResult)>,
shutdown: Listener,
) {
log::debug!("Simulation results producer started.");
Expand All @@ -670,12 +684,17 @@ async fn produce_simulation_results(
match output{
Some(simulation_output) => {
match simulation_output{
SimulationOutput::PaymentSent(dispatched_payment) => {
SimulationOutput::SendPaymentSuccess(dispatched_payment) => {
let source_node = nodes.get(&dispatched_payment.source).unwrap().clone();
set.spawn(track_payment_result(
source_node,results.clone(),simulation_output, shutdown.clone(),
source_node,results.clone(),dispatched_payment, shutdown.clone(),
));
},
SimulationOutput::SendPaymentFailure(payment, result) => {
if results.clone().send((payment, result)).await.is_err() {
log::debug!("Could not send payment result");
}
}
};

},
Expand All @@ -697,43 +716,44 @@ async fn produce_simulation_results(

async fn track_payment_result(
node: Arc<Mutex<dyn LightningNode + Send>>,
results: Sender<(DispatchedPayment, PaymentResult)>,
output: SimulationOutput,
results: Sender<(Payment, PaymentResult)>,
dispatched_payment: Payment,
shutdown: Listener,
) {
log::trace!("Payment result tracker starting.");

let mut node = node.lock().await;

match output {
SimulationOutput::PaymentSent(payment) => {
let res = match payment.hash {
Some(hash) => {
log::debug!("Tracking payment outcome for: {}.", hex::encode(hash.0));
let track_payment = node.track_payment(hash, shutdown.clone());

match track_payment.await {
Ok(res) => {
log::debug!(
"Track payment {} result: {:?}.",
hex::encode(hash.0),
res.payment_outcome
);
res
}
Err(e) => {
log::error!("Track payment failed for {}: {e}.", hex::encode(hash.0));
PaymentResult::default()
}
}
let res = match dispatched_payment.hash {
Some(hash) => {
log::debug!("Tracking payment outcome for: {}.", hex::encode(hash.0));
let track_payment = node.track_payment(hash, shutdown.clone());

match track_payment.await {
Ok(res) => {
log::debug!(
"Track payment {} result: {:?}.",
hex::encode(hash.0),
res.payment_outcome
);
res
}
Err(e) => {
log::error!("Track payment failed for {}: {e}.", hex::encode(hash.0));
PaymentResult::default()
}
None => PaymentResult::default(),
};

if results.clone().send((payment, res)).await.is_err() {
log::debug!("Could not send payment result for");
}
}
None => PaymentResult::default(),
};

if results
.clone()
.send((dispatched_payment, res))
.await
.is_err()
{
log::debug!("Could not send payment result");
}

log::trace!("Payment result tracker exiting.");
Expand Down

0 comments on commit 0615f4b

Please sign in to comment.