From 9bdad984699ea507888b15cc1800c94708d8d347 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 3 May 2024 23:56:32 +0200 Subject: [PATCH] refactor: do not use subscribe_all in iroh sync --- iroh/src/sync_engine/gossip.rs | 58 ++++++++++++++++------------------ 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/iroh/src/sync_engine/gossip.rs b/iroh/src/sync_engine/gossip.rs index 3dfd5b08fd6..8fdedf5c6c6 100644 --- a/iroh/src/sync_engine/gossip.rs +++ b/iroh/src/sync_engine/gossip.rs @@ -1,17 +1,19 @@ use std::collections::HashSet; -use anyhow::{anyhow, Context, Result}; +use anyhow::{Context, Result}; use futures_lite::StreamExt; -use iroh_gossip::{ - net::{Event, Gossip}, - proto::TopicId, -}; +use futures_util::FutureExt; +use iroh_gossip::net::{Event, Gossip}; use iroh_net::key::PublicKey; use iroh_sync::{actor::SyncHandle, ContentStatus, NamespaceId}; use tokio::{ - sync::{broadcast::error::RecvError, mpsc}, + sync::{broadcast, mpsc}, task::JoinSet, }; +use tokio_stream::{ + wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}, + StreamMap, +}; use tracing::{debug, error, trace}; use super::live::{Op, ToLiveActor}; @@ -37,7 +39,8 @@ pub struct GossipActor { to_sync_actor: mpsc::Sender, joined: HashSet, want_join: HashSet, - pending_joins: JoinSet<(NamespaceId, Result)>, + pending_joins: JoinSet<(NamespaceId, Result>)>, + gossip_events: StreamMap>, } impl GossipActor { @@ -55,16 +58,17 @@ impl GossipActor { joined: Default::default(), want_join: Default::default(), pending_joins: Default::default(), + gossip_events: Default::default(), } } pub async fn run(&mut self) -> anyhow::Result<()> { - let mut gossip_events = self.gossip.clone().subscribe_all(); + // let mut gossip_events = self.gossip.clone().subscribe_all(); let mut i = 0; loop { i += 1; trace!(?i, "tick wait"); tokio::select! { - next = gossip_events.next() => { + next = self.gossip_events.next(), if !self.gossip_events.is_empty() => { trace!(?i, "tick: gossip_event"); if let Err(err) = self.on_gossip_event(next).await { error!("gossip actor died: {err:?}"); @@ -82,9 +86,11 @@ impl GossipActor { trace!(?i, "tick: pending_joins"); let (namespace, res) = res.context("pending_joins closed")?; match res { - Ok(_topic) => { + Ok(stream) => { debug!(namespace = %namespace.fmt_short(), "joined gossip"); self.joined.insert(namespace); + let stream = BroadcastStream::new(stream); + self.gossip_events.insert(namespace, stream); }, Err(err) => { if self.want_join.contains(&namespace) { @@ -111,13 +117,11 @@ impl GossipActor { let gossip = self.gossip.clone(); // join gossip for the topic to receive and send message let fut = async move { - let res = gossip.join(namespace.into(), peers).await; - let res = match res { - Ok(fut) => fut.await, - Err(err) => Err(err), - }; - (namespace, res) + let stream = gossip.subscribe(namespace.into()).await?; + let _topic = gossip.join(namespace.into(), peers).await?.await?; + Ok(stream) }; + let fut = fut.map(move |res| (namespace, res)); self.want_join.insert(namespace); self.pending_joins.spawn(fut); } @@ -131,22 +135,16 @@ impl GossipActor { } async fn on_gossip_event( &mut self, - event: Option>, + event: Option<(NamespaceId, Result)>, ) -> Result<()> { - let (topic, event) = match event { - Some(Ok(event)) => event, - None => return Err(anyhow!("Gossip event channel closed")), - Some(Err(err)) => match err { - RecvError::Lagged(n) => { - error!("GossipActor too slow (lagged by {n}) - dropping gossip event"); - return Ok(()); - } - RecvError::Closed => { - return Err(anyhow!("Gossip event channel closed")); - } - }, + let (namespace, event) = event.context("Gossip event channel closed")?; + let event = match event { + Ok(event) => event, + Err(BroadcastStreamRecvError::Lagged(n)) => { + error!("GossipActor too slow (lagged by {n}) - dropping gossip event"); + return Ok(()); + } }; - let namespace: NamespaceId = topic.as_bytes().into(); if !self.joined.contains(&namespace) && !self.want_join.contains(&namespace) { error!(namespace = %namespace.fmt_short(), "received gossip event for unknown topic"); return Ok(());