Skip to content

Commit

Permalink
more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed May 1, 2024
1 parent 48a2993 commit 4882412
Showing 1 changed file with 53 additions and 11 deletions.
64 changes: 53 additions & 11 deletions iroh/src/gossip_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,51 @@ struct State {
/// This is to prevent a single slow subscriber from blocking the dispatch loop.
/// If a subscriber is lagging, it should be closed and re-opened.
const SUBSCRIPTION_CAPACITY: usize = 128;
/// Type alias for a stream of gossip updates, so we don't have to repeat all the bounds.
type UpdateStream = Box<dyn Stream<Item = GossipSubscribeUpdate> + Send + Sync + Unpin + 'static>;
/// Type alias for a sink of gossip responses.
type ResponseSink = flume::Sender<RpcResult<GossipSubscribeResponse>>;

#[derive(derive_more::Debug)]
enum TopicState {
/// The topic is currently joining.
/// Making new subscriptions is allowed, but they will have to wait for the join to finish.
Joining {
/// Stream/sink pairs that are waiting for the topic to become live.
#[debug(skip)]
waiting: Vec<(UpdateStream, ResponseSink)>,
peers: BTreeSet<NodeId>,
/// Set of bootstrap nodes we are using.
bootstrap: BTreeSet<NodeId>,
/// The task that is driving the join future.
#[allow(dead_code)]
join_task: AbortingJoinHandle<()>,
},
/// The topic is currently live.
/// New subscriptions can be immediately added.
Live {
/// Task/sink pairs that are currently live.
/// The task is the task that is sending broadcast messages to the topic.
live: Vec<(AbortingJoinHandle<()>, ResponseSink)>,
},
/// The topic is currently quitting.
/// We can't make new subscriptions without waiting for the quit to finish.
Quitting {
/// Stream/sink pairs that are waiting for the topic to quit so
/// it can be joined again.
#[debug(skip)]
waiting: Vec<(UpdateStream, ResponseSink)>,
peers: BTreeSet<NodeId>,
/// Set of bootstrap nodes we are using.
///
/// This is used to re-join the topic after quitting.
bootstrap: BTreeSet<NodeId>,
/// The task that is driving the quit future.
#[allow(dead_code)]
quit_task: AbortingJoinHandle<()>,
},
}

impl TopicState {
/// Extract all senders from the state.
fn into_senders(self) -> Vec<ResponseSink> {
match self {
TopicState::Joining { waiting, .. } | TopicState::Quitting { waiting, .. } => {
Expand All @@ -82,7 +96,7 @@ impl TopicState {
}

impl GossipDispatcher {
/// Create a new gossip engine with the given gossip instance.
/// Create a new gossip dispatcher with the given gossip instance.
pub fn spawn(gossip: Gossip) -> Self {
let inner = Arc::new(Mutex::new(State {
current_subscriptions: BTreeMap::new(),
Expand All @@ -94,24 +108,31 @@ impl GossipDispatcher {
res
}

/// Quit a gossip topic and handle the result of the quitting.
///
/// On quit success, will try to join the topic again with the bootstrap nodes we have accumulated while waiting for quit to finish.
/// On quit failure, all waiting streams will be notified with the error and removed.
async fn quit_task(self, topic: TopicId) {
let res = self.gossip.quit(topic).await;
let mut inner = self.inner.lock().unwrap();
if let Some(TopicState::Quitting { waiting, peers, .. }) =
inner.current_subscriptions.remove(&topic)
if let Some(TopicState::Quitting {
waiting,
bootstrap: peers,
..
}) = inner.current_subscriptions.remove(&topic)
{
match res {
Ok(()) => {
if waiting.is_empty() {
return;
}
let bootstrap = peers.iter().copied().collect();
let bootstrap = peers.clone();
let join_task = spawn_owned(self.clone().join_task(topic, bootstrap));
inner.current_subscriptions.insert(
topic,
TopicState::Joining {
waiting,
peers,
bootstrap: peers,
join_task,
},
);
Expand All @@ -127,20 +148,28 @@ impl GossipDispatcher {
}
}

/// Try to send an event to a sink.
///
/// This will not wait until the sink is full, but send a `Lagged` response if the sink is almost full.
fn try_send(entry: &(AbortingJoinHandle<()>, ResponseSink), event: &Event) -> bool {
let (task, send) = entry;
// This means that we stop sending to the stream when the update side is finished.
if task.is_finished() {
return false;
}
// If the stream is disconnected, we don't need to send to it.
if send.is_disconnected() {
return false;
}
// Check if the send buffer is almost full, and send a lagged response if it is.
if let Some(cap) = send.capacity() {
if send.len() >= cap - 1 {
send.try_send(Ok(GossipSubscribeResponse::Lagged)).ok();
return false;
}
}
// Send the event to the stream.
// We are the owner of the stream, so we can be sure that there is still room.
send.try_send(Ok(GossipSubscribeResponse::Event(event.clone().into())))
.is_ok()
}
Expand All @@ -163,7 +192,7 @@ impl GossipDispatcher {
topic,
TopicState::Quitting {
waiting: vec![],
peers: BTreeSet::new(),
bootstrap: BTreeSet::new(),
quit_task: quit_task.into(),
},
);
Expand Down Expand Up @@ -225,6 +254,8 @@ impl GossipDispatcher {
}

/// Call join, then await the result.
///
/// Basically just flattens the two stages of joining into one.
async fn join(gossip: Gossip, topic: TopicId, bootstrap: Vec<NodeId>) -> anyhow::Result<()> {
gossip.join(topic, bootstrap).await?.await?;
Ok(())
Expand Down Expand Up @@ -280,20 +311,30 @@ impl GossipDispatcher {
let join_task = spawn_owned(this.clone().join_task(topic, msg.bootstrap.clone()));
entry.insert(TopicState::Joining {
waiting,
peers: msg.bootstrap,
bootstrap: msg.bootstrap,
join_task,
});
}
Entry::Occupied(mut entry) => {
// There is already a subscription
let state = entry.get_mut();
match state {
TopicState::Joining { waiting, peers, .. } => {
TopicState::Joining {
waiting,
bootstrap: peers,
..
} => {
// We are joining, so we need to wait with creating the update task.
//
// TODO: should we merge the bootstrap nodes and try to join with all of them?
peers.extend(msg.bootstrap.into_iter());
waiting.push((updates, send));
}
TopicState::Quitting { waiting, peers, .. } => {
TopicState::Quitting {
waiting,
bootstrap: peers,
..
} => {
// We are quitting, so we need to wait with creating the update task.
peers.extend(msg.bootstrap.into_iter());
waiting.push((updates, send));
Expand All @@ -310,6 +351,7 @@ impl GossipDispatcher {
}
}

/// tokio::spawn but returns an `AbortingJoinHandle` that owns the task.
fn spawn_owned<F, T>(f: F) -> AbortingJoinHandle<T>
where
F: std::future::Future<Output = T> + Send + 'static,
Expand Down

0 comments on commit 4882412

Please sign in to comment.