Skip to content

Commit

Permalink
refactor: do not use subscribe_all in iroh sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed May 6, 2024
1 parent f53cef5 commit 9bdad98
Showing 1 changed file with 28 additions and 30 deletions.
58 changes: 28 additions & 30 deletions iroh/src/sync_engine/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::collections::HashSet;

use anyhow::{anyhow, Context, Result};
use anyhow::{Context, Result};
use futures_lite::StreamExt;
use iroh_gossip::{
net::{Event, Gossip},
proto::TopicId,
};
use futures_util::FutureExt;
use iroh_gossip::net::{Event, Gossip};
use iroh_net::key::PublicKey;
use iroh_sync::{actor::SyncHandle, ContentStatus, NamespaceId};
use tokio::{
sync::{broadcast::error::RecvError, mpsc},
sync::{broadcast, mpsc},
task::JoinSet,
};
use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamMap,
};
use tracing::{debug, error, trace};

use super::live::{Op, ToLiveActor};
Expand All @@ -37,7 +39,8 @@ pub struct GossipActor {
to_sync_actor: mpsc::Sender<ToLiveActor>,
joined: HashSet<NamespaceId>,
want_join: HashSet<NamespaceId>,
pending_joins: JoinSet<(NamespaceId, Result<TopicId>)>,
pending_joins: JoinSet<(NamespaceId, Result<broadcast::Receiver<Event>>)>,
gossip_events: StreamMap<NamespaceId, BroadcastStream<Event>>,
}

impl GossipActor {
Expand All @@ -55,16 +58,17 @@ impl GossipActor {
joined: Default::default(),
want_join: Default::default(),
pending_joins: Default::default(),
gossip_events: Default::default(),
}
}
pub async fn run(&mut self) -> anyhow::Result<()> {
let mut gossip_events = self.gossip.clone().subscribe_all();
// let mut gossip_events = self.gossip.clone().subscribe_all();
let mut i = 0;
loop {
i += 1;
trace!(?i, "tick wait");
tokio::select! {
next = gossip_events.next() => {
next = self.gossip_events.next(), if !self.gossip_events.is_empty() => {
trace!(?i, "tick: gossip_event");
if let Err(err) = self.on_gossip_event(next).await {
error!("gossip actor died: {err:?}");
Expand All @@ -82,9 +86,11 @@ impl GossipActor {
trace!(?i, "tick: pending_joins");
let (namespace, res) = res.context("pending_joins closed")?;
match res {
Ok(_topic) => {
Ok(stream) => {
debug!(namespace = %namespace.fmt_short(), "joined gossip");
self.joined.insert(namespace);
let stream = BroadcastStream::new(stream);
self.gossip_events.insert(namespace, stream);
},
Err(err) => {
if self.want_join.contains(&namespace) {
Expand All @@ -111,13 +117,11 @@ impl GossipActor {
let gossip = self.gossip.clone();
// join gossip for the topic to receive and send message
let fut = async move {
let res = gossip.join(namespace.into(), peers).await;
let res = match res {
Ok(fut) => fut.await,
Err(err) => Err(err),
};
(namespace, res)
let stream = gossip.subscribe(namespace.into()).await?;
let _topic = gossip.join(namespace.into(), peers).await?.await?;
Ok(stream)
};
let fut = fut.map(move |res| (namespace, res));
self.want_join.insert(namespace);
self.pending_joins.spawn(fut);
}
Expand All @@ -131,22 +135,16 @@ impl GossipActor {
}
async fn on_gossip_event(
&mut self,
event: Option<Result<(TopicId, Event), RecvError>>,
event: Option<(NamespaceId, Result<Event, BroadcastStreamRecvError>)>,
) -> Result<()> {
let (topic, event) = match event {
Some(Ok(event)) => event,
None => return Err(anyhow!("Gossip event channel closed")),
Some(Err(err)) => match err {
RecvError::Lagged(n) => {
error!("GossipActor too slow (lagged by {n}) - dropping gossip event");
return Ok(());
}
RecvError::Closed => {
return Err(anyhow!("Gossip event channel closed"));
}
},
let (namespace, event) = event.context("Gossip event channel closed")?;
let event = match event {
Ok(event) => event,
Err(BroadcastStreamRecvError::Lagged(n)) => {
error!("GossipActor too slow (lagged by {n}) - dropping gossip event");
return Ok(());
}
};
let namespace: NamespaceId = topic.as_bytes().into();
if !self.joined.contains(&namespace) && !self.want_join.contains(&namespace) {
error!(namespace = %namespace.fmt_short(), "received gossip event for unknown topic");
return Ok(());
Expand Down

0 comments on commit 9bdad98

Please sign in to comment.