Skip to content

Commit

Permalink
feat: emit PendingContentReady event (#2302)
Browse files Browse the repository at this point in the history
## Description

This adds a new `LiveEvent` variant: `PendingContentReady`.

It is emitted once, after sync was started, the list of pending content
blobs becomes empty. This is the case if either all blobs from this
sesion have been downloaded, or failed.

I think this could help with simple examples like
https://gist.github.com/dignifiedquire/efbd1a7a1da729494adb088d72f1ceaa#file-main-sync-rs-L84

It should not be used as a guarantee that all content is now available
locally, and it should be used with a timeout usually, because it may
very legally never be emitted (e.g. if the remote does not have all
blobs themselves, or closes the connections, etc).

## Breaking Changes

* `LiveEvent` now has a new variant `PendingContentReady`. This event is
emitted after a sync completed and `Self::SyncFinished` was emitted. It
signals that all currently pending downloads have been completed.
Receiving this event does not guarantee that all content in the document
is available. If blobs failed to download, this event will still be
emitted after all operations completed.


## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [ ] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
Frando authored May 17, 2024
1 parent ab18eae commit fc73502
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 21 deletions.
3 changes: 3 additions & 0 deletions iroh-cli/src/commands/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,9 @@ impl DocCommands {
LiveEvent::NeighborDown(peer) => {
println!("neighbor peer down: {peer:?}");
}
LiveEvent::PendingContentReady => {
println!("all pending content is now ready")
}
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions iroh/src/client/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,16 @@ pub enum LiveEvent {
NeighborDown(PublicKey),
/// A set-reconciliation sync finished.
SyncFinished(SyncEvent),
/// All pending content is now ready.
///
/// This event signals that all queued content downloads from the last sync run have either
/// completed or failed.
///
/// It will only be emitted after a [`Self::SyncFinished`] event, never before.
///
/// Receiving this event does not guarantee that all content in the document is available. If
/// blobs failed to download, this event will still be emitted after all operations completed.
PendingContentReady,
}

impl From<crate::docs_engine::LiveEvent> for LiveEvent {
Expand All @@ -568,6 +578,7 @@ impl From<crate::docs_engine::LiveEvent> for LiveEvent {
crate::docs_engine::LiveEvent::NeighborUp(node) => Self::NeighborUp(node),
crate::docs_engine::LiveEvent::NeighborDown(node) => Self::NeighborDown(node),
crate::docs_engine::LiveEvent::SyncFinished(details) => Self::SyncFinished(details),
crate::docs_engine::LiveEvent::PendingContentReady => Self::PendingContentReady,
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions iroh/src/docs_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,16 @@ pub enum LiveEvent {
/// The content hash of the newly available entry content
hash: Hash,
},
/// All pending content is now ready.
///
/// This event signals that all queued content downloads from the last sync run have either
/// completed or failed.
///
/// It will only be emitted after a [`Self::SyncFinished`] event, never before.
///
/// Receiving this event does not guarantee that all content in the document is available. If
/// blobs failed to download, this event will still be emitted after all operations completed.
PendingContentReady,
/// We have a new neighbor in the swarm.
NeighborUp(PublicKey),
/// We lost a neighbor in the swarm.
Expand All @@ -241,6 +251,7 @@ impl From<live::Event> for LiveEvent {
live::Event::NeighborUp(peer) => Self::NeighborUp(peer),
live::Event::NeighborDown(peer) => Self::NeighborDown(peer),
live::Event::SyncFinished(ev) => Self::SyncFinished(ev),
live::Event::PendingContentReady => Self::PendingContentReady,
}
}
}
Expand Down
81 changes: 77 additions & 4 deletions iroh/src/docs_engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ pub enum Event {
NeighborDown(PublicKey),
/// A set-reconciliation sync finished.
SyncFinished(SyncEvent),
/// All pending content is now ready.
///
/// This event is only emitted after a sync completed and `Self::SyncFinished` was emitted at
/// least once. It signals that all currently pending downloads have been completed.
///
/// Receiving this event does not guarantee that all content in the document is available. If
/// blobs failed to download, this event will still be emitted after all operations completed.
PendingContentReady,
}

type SyncConnectRes = (
Expand Down Expand Up @@ -155,7 +163,7 @@ pub struct LiveActor<B: iroh_blobs::store::Store> {
/// Content hashes which are wanted but not yet queued because no provider was found.
missing_hashes: HashSet<Hash>,
/// Content hashes queued in downloader.
queued_hashes: HashSet<Hash>,
queued_hashes: QueuedHashes,

/// Subscribers to actor events
subscribers: SubscribersMap,
Expand Down Expand Up @@ -567,6 +575,20 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
.send(&namespace, Event::SyncFinished(ev))
.await;

// Check if there are queued pending content hashes for this namespace.
// If hashes are pending, mark this namespace to be eglible for a PendingContentReady event once all
// pending hashes have completed downloading.
// If no hashes are pending, emit the PendingContentReady event right away. The next
// PendingContentReady event may then only be emitted after the next sync completes.
if self.queued_hashes.contains_namespace(&namespace) {
self.state.set_may_emit_ready(&namespace, true);
} else {
self.subscribers
.send(&namespace, Event::PendingContentReady)
.await;
self.state.set_may_emit_ready(&namespace, false);
}

if resync {
self.sync_with_peer(namespace, peer, SyncReason::Resync);
}
Expand Down Expand Up @@ -605,7 +627,8 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
hash: Hash,
res: Result<Stats, DownloadError>,
) {
self.queued_hashes.remove(&hash);
let completed_namespaces = self.queued_hashes.remove_hash(&hash);
debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
if res.is_ok() {
self.subscribers
.send(&namespace, Event::ContentReady { hash })
Expand All @@ -616,6 +639,13 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
} else {
self.missing_hashes.insert(hash);
}
for namespace in completed_namespaces.iter() {
if let Some(true) = self.state.may_emit_ready(namespace) {
self.subscribers
.send(namespace, Event::PendingContentReady)
.await;
}
}
}

async fn on_neighbor_content_ready(
Expand Down Expand Up @@ -657,6 +687,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
async fn on_replica_event(&mut self, event: iroh_docs::Event) -> Result<()> {
match event {
iroh_docs::Event::LocalInsert { namespace, entry } => {
debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
let topic = TopicId::from_bytes(*namespace.as_bytes());
// A new entry was inserted locally. Broadcast a gossip message.
if self.state.is_syncing(&namespace) {
Expand All @@ -672,6 +703,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
should_download,
remote_content_status,
} => {
debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
// A new entry was inserted from initial sync or gossip. Queue downloading the
// content.
if should_download {
Expand Down Expand Up @@ -701,13 +733,14 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
self.missing_hashes.remove(&hash);
return;
}
if self.queued_hashes.contains(&hash) {
if self.queued_hashes.contains_hash(&hash) {
self.queued_hashes.insert(hash, namespace);
self.downloader.nodes_have(hash, vec![node]).await;
} else if !only_if_missing || self.missing_hashes.contains(&hash) {
let req = DownloadRequest::untagged(HashAndFormat::raw(hash), vec![node]);
let handle = self.downloader.queue(req).await;

self.queued_hashes.insert(hash);
self.queued_hashes.insert(hash, namespace);
self.missing_hashes.remove(&hash);
self.download_tasks
.spawn(async move { (namespace, hash, handle.await) });
Expand Down Expand Up @@ -800,6 +833,7 @@ impl SubscribersMap {
}

async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
debug!(namespace=%namespace.fmt_short(), %event, "emit event");
let Some(subscribers) = self.0.get_mut(namespace) else {
return false;
};
Expand All @@ -819,6 +853,45 @@ impl SubscribersMap {
}
}

#[derive(Debug, Default)]
struct QueuedHashes {
by_hash: HashMap<Hash, HashSet<NamespaceId>>,
by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
}

impl QueuedHashes {
fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
self.by_hash.entry(hash).or_default().insert(namespace);
self.by_namespace.entry(namespace).or_default().insert(hash);
}

/// Remove a hash from the set of queued hashes.
///
/// Returns a list of namespaces that are now complete (have no queued hashes anymore).
fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
let namespaces = self.by_hash.remove(hash).unwrap_or_default();
let mut removed_namespaces = vec![];
for namespace in namespaces {
if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
hashes.remove(hash);
if hashes.is_empty() {
self.by_namespace.remove(&namespace);
removed_namespaces.push(namespace);
}
}
}
removed_namespaces
}

fn contains_hash(&self, hash: &Hash) -> bool {
self.by_hash.contains_key(hash)
}

fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
self.by_namespace.contains_key(namespace)
}
}

#[derive(Debug, Default)]
struct Subscribers(Vec<flume::Sender<Event>>);

Expand Down
27 changes: 27 additions & 0 deletions iroh/src/docs_engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct NamespaceStates(BTreeMap<NamespaceId, NamespaceState>);
#[derive(Default)]
struct NamespaceState {
nodes: BTreeMap<NodeId, PeerState>,
may_emit_ready: bool,
}

impl NamespaceStates {
Expand Down Expand Up @@ -116,6 +117,32 @@ impl NamespaceStates {
state.finish(origin, result)
}

/// Set whether a [`super::live::Event::PendingContentReady`] may be emitted once the pending queue
/// becomes empty.
///
/// This should be set to `true` if there are pending content hashes after a sync finished, and
/// to `false` whenever a `PendingContentReady` was emitted.
pub fn set_may_emit_ready(&mut self, namespace: &NamespaceId, value: bool) -> Option<()> {
let state = self.0.get_mut(namespace)?;
state.may_emit_ready = value;
Some(())
}
/// Returns whether a [`super::live::Event::PendingContentReady`] event may be emitted once the
/// pending queue becomes empty.
///
/// If this returns `false`, an event should not be emitted even if the queue becomes empty,
/// because a currently running sync did not yet terminate. Once it terminates, the event will
/// be emitted from the handler for finished syncs.
pub fn may_emit_ready(&mut self, namespace: &NamespaceId) -> Option<bool> {
let state = self.0.get_mut(namespace)?;
if state.may_emit_ready {
state.may_emit_ready = false;
Some(true)
} else {
Some(false)
}
}

/// Remove a namespace from the set of syncing namespaces.
pub fn remove(&mut self, namespace: &NamespaceId) -> bool {
self.0.remove(namespace).is_some()
Expand Down
Loading

0 comments on commit fc73502

Please sign in to comment.