From fc735026c772c1aa4f00b9af0ffcc0654497d9a3 Mon Sep 17 00:00:00 2001 From: Franz Heinzmann Date: Fri, 17 May 2024 23:53:27 +0200 Subject: [PATCH] feat: emit PendingContentReady event (#2302) ## Description This adds a new `LiveEvent` variant: `PendingContentReady`. It is emitted once, after sync was started, the list of pending content blobs becomes empty. This is the case if either all blobs from this sesion have been downloaded, or failed. I think this could help with simple examples like https://gist.github.com/dignifiedquire/efbd1a7a1da729494adb088d72f1ceaa#file-main-sync-rs-L84 It should not be used as a guarantee that all content is now available locally, and it should be used with a timeout usually, because it may very legally never be emitted (e.g. if the remote does not have all blobs themselves, or closes the connections, etc). ## Breaking Changes * `LiveEvent` now has a new variant `PendingContentReady`. This event is emitted after a sync completed and `Self::SyncFinished` was emitted. It signals that all currently pending downloads have been completed. Receiving this event does not guarantee that all content in the document is available. If blobs failed to download, this event will still be emitted after all operations completed. ## Notes & open questions ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [ ] Tests if relevant. - [x] All breaking changes documented. --- iroh-cli/src/commands/doc.rs | 3 ++ iroh/src/client/docs.rs | 11 +++++ iroh/src/docs_engine.rs | 11 +++++ iroh/src/docs_engine/live.rs | 81 ++++++++++++++++++++++++++++++++-- iroh/src/docs_engine/state.rs | 27 ++++++++++++ iroh/tests/sync.rs | 83 ++++++++++++++++++++++++++++------- 6 files changed, 195 insertions(+), 21 deletions(-) diff --git a/iroh-cli/src/commands/doc.rs b/iroh-cli/src/commands/doc.rs index 7a424735a9..a461685f7b 100644 --- a/iroh-cli/src/commands/doc.rs +++ b/iroh-cli/src/commands/doc.rs @@ -609,6 +609,9 @@ impl DocCommands { LiveEvent::NeighborDown(peer) => { println!("neighbor peer down: {peer:?}"); } + LiveEvent::PendingContentReady => { + println!("all pending content is now ready") + } } } } diff --git a/iroh/src/client/docs.rs b/iroh/src/client/docs.rs index f1da48dfcf..2b22c39aa5 100644 --- a/iroh/src/client/docs.rs +++ b/iroh/src/client/docs.rs @@ -547,6 +547,16 @@ pub enum LiveEvent { NeighborDown(PublicKey), /// A set-reconciliation sync finished. SyncFinished(SyncEvent), + /// All pending content is now ready. + /// + /// This event signals that all queued content downloads from the last sync run have either + /// completed or failed. + /// + /// It will only be emitted after a [`Self::SyncFinished`] event, never before. + /// + /// Receiving this event does not guarantee that all content in the document is available. If + /// blobs failed to download, this event will still be emitted after all operations completed. + PendingContentReady, } impl From for LiveEvent { @@ -568,6 +578,7 @@ impl From for LiveEvent { crate::docs_engine::LiveEvent::NeighborUp(node) => Self::NeighborUp(node), crate::docs_engine::LiveEvent::NeighborDown(node) => Self::NeighborDown(node), crate::docs_engine::LiveEvent::SyncFinished(details) => Self::SyncFinished(details), + crate::docs_engine::LiveEvent::PendingContentReady => Self::PendingContentReady, } } } diff --git a/iroh/src/docs_engine.rs b/iroh/src/docs_engine.rs index ce29668307..018d894f16 100644 --- a/iroh/src/docs_engine.rs +++ b/iroh/src/docs_engine.rs @@ -226,6 +226,16 @@ pub enum LiveEvent { /// The content hash of the newly available entry content hash: Hash, }, + /// All pending content is now ready. + /// + /// This event signals that all queued content downloads from the last sync run have either + /// completed or failed. + /// + /// It will only be emitted after a [`Self::SyncFinished`] event, never before. + /// + /// Receiving this event does not guarantee that all content in the document is available. If + /// blobs failed to download, this event will still be emitted after all operations completed. + PendingContentReady, /// We have a new neighbor in the swarm. NeighborUp(PublicKey), /// We lost a neighbor in the swarm. @@ -241,6 +251,7 @@ impl From for LiveEvent { live::Event::NeighborUp(peer) => Self::NeighborUp(peer), live::Event::NeighborDown(peer) => Self::NeighborDown(peer), live::Event::SyncFinished(ev) => Self::SyncFinished(ev), + live::Event::PendingContentReady => Self::PendingContentReady, } } } diff --git a/iroh/src/docs_engine/live.rs b/iroh/src/docs_engine/live.rs index 13e16961be..8dd3d5843a 100644 --- a/iroh/src/docs_engine/live.rs +++ b/iroh/src/docs_engine/live.rs @@ -117,6 +117,14 @@ pub enum Event { NeighborDown(PublicKey), /// A set-reconciliation sync finished. SyncFinished(SyncEvent), + /// All pending content is now ready. + /// + /// This event is only emitted after a sync completed and `Self::SyncFinished` was emitted at + /// least once. It signals that all currently pending downloads have been completed. + /// + /// Receiving this event does not guarantee that all content in the document is available. If + /// blobs failed to download, this event will still be emitted after all operations completed. + PendingContentReady, } type SyncConnectRes = ( @@ -155,7 +163,7 @@ pub struct LiveActor { /// Content hashes which are wanted but not yet queued because no provider was found. missing_hashes: HashSet, /// Content hashes queued in downloader. - queued_hashes: HashSet, + queued_hashes: QueuedHashes, /// Subscribers to actor events subscribers: SubscribersMap, @@ -567,6 +575,20 @@ impl LiveActor { .send(&namespace, Event::SyncFinished(ev)) .await; + // Check if there are queued pending content hashes for this namespace. + // If hashes are pending, mark this namespace to be eglible for a PendingContentReady event once all + // pending hashes have completed downloading. + // If no hashes are pending, emit the PendingContentReady event right away. The next + // PendingContentReady event may then only be emitted after the next sync completes. + if self.queued_hashes.contains_namespace(&namespace) { + self.state.set_may_emit_ready(&namespace, true); + } else { + self.subscribers + .send(&namespace, Event::PendingContentReady) + .await; + self.state.set_may_emit_ready(&namespace, false); + } + if resync { self.sync_with_peer(namespace, peer, SyncReason::Resync); } @@ -605,7 +627,8 @@ impl LiveActor { hash: Hash, res: Result, ) { - self.queued_hashes.remove(&hash); + let completed_namespaces = self.queued_hashes.remove_hash(&hash); + debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready"); if res.is_ok() { self.subscribers .send(&namespace, Event::ContentReady { hash }) @@ -616,6 +639,13 @@ impl LiveActor { } else { self.missing_hashes.insert(hash); } + for namespace in completed_namespaces.iter() { + if let Some(true) = self.state.may_emit_ready(namespace) { + self.subscribers + .send(namespace, Event::PendingContentReady) + .await; + } + } } async fn on_neighbor_content_ready( @@ -657,6 +687,7 @@ impl LiveActor { async fn on_replica_event(&mut self, event: iroh_docs::Event) -> Result<()> { match event { iroh_docs::Event::LocalInsert { namespace, entry } => { + debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert"); let topic = TopicId::from_bytes(*namespace.as_bytes()); // A new entry was inserted locally. Broadcast a gossip message. if self.state.is_syncing(&namespace) { @@ -672,6 +703,7 @@ impl LiveActor { should_download, remote_content_status, } => { + debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert"); // A new entry was inserted from initial sync or gossip. Queue downloading the // content. if should_download { @@ -701,13 +733,14 @@ impl LiveActor { self.missing_hashes.remove(&hash); return; } - if self.queued_hashes.contains(&hash) { + if self.queued_hashes.contains_hash(&hash) { + self.queued_hashes.insert(hash, namespace); self.downloader.nodes_have(hash, vec![node]).await; } else if !only_if_missing || self.missing_hashes.contains(&hash) { let req = DownloadRequest::untagged(HashAndFormat::raw(hash), vec![node]); let handle = self.downloader.queue(req).await; - self.queued_hashes.insert(hash); + self.queued_hashes.insert(hash, namespace); self.missing_hashes.remove(&hash); self.download_tasks .spawn(async move { (namespace, hash, handle.await) }); @@ -800,6 +833,7 @@ impl SubscribersMap { } async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool { + debug!(namespace=%namespace.fmt_short(), %event, "emit event"); let Some(subscribers) = self.0.get_mut(namespace) else { return false; }; @@ -819,6 +853,45 @@ impl SubscribersMap { } } +#[derive(Debug, Default)] +struct QueuedHashes { + by_hash: HashMap>, + by_namespace: HashMap>, +} + +impl QueuedHashes { + fn insert(&mut self, hash: Hash, namespace: NamespaceId) { + self.by_hash.entry(hash).or_default().insert(namespace); + self.by_namespace.entry(namespace).or_default().insert(hash); + } + + /// Remove a hash from the set of queued hashes. + /// + /// Returns a list of namespaces that are now complete (have no queued hashes anymore). + fn remove_hash(&mut self, hash: &Hash) -> Vec { + let namespaces = self.by_hash.remove(hash).unwrap_or_default(); + let mut removed_namespaces = vec![]; + for namespace in namespaces { + if let Some(hashes) = self.by_namespace.get_mut(&namespace) { + hashes.remove(hash); + if hashes.is_empty() { + self.by_namespace.remove(&namespace); + removed_namespaces.push(namespace); + } + } + } + removed_namespaces + } + + fn contains_hash(&self, hash: &Hash) -> bool { + self.by_hash.contains_key(hash) + } + + fn contains_namespace(&self, namespace: &NamespaceId) -> bool { + self.by_namespace.contains_key(namespace) + } +} + #[derive(Debug, Default)] struct Subscribers(Vec>); diff --git a/iroh/src/docs_engine/state.rs b/iroh/src/docs_engine/state.rs index 7bab017e05..91e28a721e 100644 --- a/iroh/src/docs_engine/state.rs +++ b/iroh/src/docs_engine/state.rs @@ -52,6 +52,7 @@ pub struct NamespaceStates(BTreeMap); #[derive(Default)] struct NamespaceState { nodes: BTreeMap, + may_emit_ready: bool, } impl NamespaceStates { @@ -116,6 +117,32 @@ impl NamespaceStates { state.finish(origin, result) } + /// Set whether a [`super::live::Event::PendingContentReady`] may be emitted once the pending queue + /// becomes empty. + /// + /// This should be set to `true` if there are pending content hashes after a sync finished, and + /// to `false` whenever a `PendingContentReady` was emitted. + pub fn set_may_emit_ready(&mut self, namespace: &NamespaceId, value: bool) -> Option<()> { + let state = self.0.get_mut(namespace)?; + state.may_emit_ready = value; + Some(()) + } + /// Returns whether a [`super::live::Event::PendingContentReady`] event may be emitted once the + /// pending queue becomes empty. + /// + /// If this returns `false`, an event should not be emitted even if the queue becomes empty, + /// because a currently running sync did not yet terminate. Once it terminates, the event will + /// be emitted from the handler for finished syncs. + pub fn may_emit_ready(&mut self, namespace: &NamespaceId) -> Option { + let state = self.0.get_mut(namespace)?; + if state.may_emit_ready { + state.may_emit_ready = false; + Some(true) + } else { + Some(false) + } + } + /// Remove a namespace from the set of syncing namespaces. pub fn remove(&mut self, namespace: &NamespaceId) -> bool { self.0.remove(namespace).is_some() diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index 74f3a8880a..556f5829a7 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -110,13 +110,14 @@ async fn sync_simple() -> Result<()> { Box::new(move |e| matches!(e, LiveEvent::InsertRemote { from, .. } if *from == peer0 )), Box::new(move |e| match_sync_finished(e, peer0)), Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash0)), + match_event!(LiveEvent::PendingContentReady), ], ) .await; assert_latest(&doc1, b"k1", b"v1").await; info!("node0: assert 2 events"); - assert_next_unordered( + assert_next( &mut events0, TIMEOUT, vec![ @@ -286,21 +287,23 @@ async fn sync_full_basic() -> Result<()> { &mut events1, TIMEOUT, vec![ - Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer0)), - Box::new(move |e| matches!(e, LiveEvent::InsertRemote { from, .. } if *from == peer0 )), + match_event!(LiveEvent::NeighborUp(peer) if *peer == peer0), + match_event!(LiveEvent::InsertRemote { from, .. } if *from == peer0 ), Box::new(move |e| match_sync_finished(e, peer0)), - Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash0)), + match_event!(LiveEvent::ContentReady { hash } if *hash == hash0), + match_event!(LiveEvent::PendingContentReady), ], ) .await; info!("peer0: wait for 2 events (join & accept sync finished from peer1)"); - assert_next_unordered( + assert_next( &mut events0, TIMEOUT, vec![ - Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer1)), + match_event!(LiveEvent::NeighborUp(peer) if *peer == peer1), Box::new(move |e| match_sync_finished(e, peer1)), + match_event!(LiveEvent::PendingContentReady), ], ) .await; @@ -312,16 +315,17 @@ async fn sync_full_basic() -> Result<()> { .set_bytes(author1, key1.to_vec(), value1.to_vec()) .await?; assert_latest(&doc1, key1, value1).await; - info!("peer1: wait for 1 event (local insert)"); - let e = next(&mut events1).await; - assert!( - matches!(&e, LiveEvent::InsertLocal { entry } if entry.content_hash() == hash1), - "expected LiveEvent::InsertLocal but got {e:?}", - ); + info!("peer1: wait for 1 event (local insert, and pendingcontentready)"); + assert_next( + &mut events1, + TIMEOUT, + vec![match_event!(LiveEvent::InsertLocal { entry} if entry.content_hash() == hash1)], + ) + .await; // peer0: assert events for entry received via gossip info!("peer0: wait for 2 events (gossip'ed entry from peer1)"); - assert_next_unordered( + assert_next( &mut events0, TIMEOUT, vec![ @@ -345,7 +349,7 @@ async fn sync_full_basic() -> Result<()> { let peer2 = nodes[2].node_id(); let mut events2 = doc2.subscribe().await?; - info!("peer2: wait for 8 events (from sync with peers)"); + info!("peer2: wait for 9 events (from sync with peers)"); assert_next_unordered_with_optionals( &mut events2, TIMEOUT, @@ -367,6 +371,8 @@ async fn sync_full_basic() -> Result<()> { // 2 ContentReady events Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash0)), Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash1)), + // at least 1 PendingContentReady + match_event!(LiveEvent::PendingContentReady), ], // optional events // it may happen that we run sync two times against our two peers: @@ -376,29 +382,33 @@ async fn sync_full_basic() -> Result<()> { // 2 SyncFinished events Box::new(move |e| match_sync_finished(e, peer0)), Box::new(move |e| match_sync_finished(e, peer1)), + match_event!(LiveEvent::PendingContentReady), + match_event!(LiveEvent::PendingContentReady), ] ).await; assert_latest(&doc2, b"k1", b"v1").await; assert_latest(&doc2, b"k2", b"v2").await; info!("peer0: wait for 2 events (join & accept sync finished from peer2)"); - assert_next_unordered( + assert_next( &mut events0, TIMEOUT, vec![ Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer2)), Box::new(move |e| match_sync_finished(e, peer2)), + match_event!(LiveEvent::PendingContentReady), ], ) .await; info!("peer1: wait for 2 events (join & accept sync finished from peer2)"); - assert_next_unordered( + assert_next( &mut events1, TIMEOUT, vec![ Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer2)), Box::new(move |e| match_sync_finished(e, peer2)), + match_event!(LiveEvent::PendingContentReady), ], ) .await; @@ -522,6 +532,7 @@ async fn test_sync_via_relay() -> Result<()> { Box::new( move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == inserted_hash), ), + match_event!(LiveEvent::PendingContentReady), ], vec![Box::new(move |e| match_sync_finished(e, node1_id))], ).await; @@ -548,7 +559,10 @@ async fn test_sync_via_relay() -> Result<()> { move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == updated_hash), ), ], - vec![Box::new(move |e| match_sync_finished(e, node1_id))], + vec![ + Box::new(move |e| match_sync_finished(e, node1_id)), + Box::new(move |e| matches!(e, LiveEvent::PendingContentReady)), + ], ).await; let actual = doc2 .get_exact(author1, b"foo", false) @@ -616,9 +630,11 @@ async fn sync_restart_node() -> Result<()> { match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()), match_event!(LiveEvent::InsertRemote { from, content_status: ContentStatus::Missing, .. } if *from == id2), match_event!(LiveEvent::ContentReady { hash } if *hash == hash_a), + match_event!(LiveEvent::PendingContentReady), ], vec![ match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()), + match_event!(LiveEvent::PendingContentReady), ], ) .await; @@ -663,6 +679,7 @@ async fn sync_restart_node() -> Result<()> { ], vec![ match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()), + match_event!(LiveEvent::PendingContentReady), ] ).await; assert_latest(&doc1, b"n2/b", b"b").await; @@ -679,6 +696,9 @@ async fn sync_restart_node() -> Result<()> { ], vec![ match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()), + match_event!(LiveEvent::PendingContentReady), + match_event!(LiveEvent::SyncFinished(e) if e.peer == id2 && e.result.is_ok()), + match_event!(LiveEvent::PendingContentReady), ] ).await; @@ -1191,6 +1211,35 @@ fn apply_matchers(item: &T, matchers: &mut Vec bool + Send> false } +/// Receive the next `matchers.len()` elements from a stream and matches them against the functions +/// in `matchers`, in order. +/// +/// Returns all received events. +#[allow(clippy::type_complexity)] +async fn assert_next( + mut stream: impl Stream> + Unpin + Send, + timeout: Duration, + matchers: Vec bool + Send>>, +) -> Vec { + let fut = async { + let mut items = vec![]; + for (i, f) in matchers.iter().enumerate() { + let item = stream + .next() + .await + .expect("event stream ended prematurely") + .expect("event stream errored"); + if !(f)(&item) { + panic!("assertion failed for event {i} {item:?}"); + } + items.push(item); + } + items + }; + let res = tokio::time::timeout(timeout, fut).await; + res.expect("timeout reached") +} + /// Receive `matchers.len()` elements from a stream and assert that each element matches one of the /// functions in `matchers`. ///