Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh)!: remove node events #2274

Merged
merged 8 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 2 additions & 80 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,21 @@
//!
//! A node is a server that serves various protocols.
//!
//! You can monitor what is happening in the node using [`Node::subscribe`].
//!
//! To shut down the node, call [`Node::shutdown`].
use std::fmt::Debug;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use futures_lite::{future::Boxed as BoxFuture, FutureExt, StreamExt};
use futures_lite::StreamExt;
use iroh_base::key::PublicKey;
use iroh_blobs::downloader::Downloader;
use iroh_blobs::store::Store as BaoStore;
use iroh_net::util::AbortingJoinHandle;
use iroh_net::{endpoint::LocalEndpointsStream, key::SecretKey, Endpoint};
use quic_rpc::transport::flume::FlumeConnection;
use quic_rpc::RpcClient;
use tokio::sync::{mpsc, RwLock};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
Expand All @@ -35,38 +32,6 @@ mod rpc_status;
pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig};
pub use self::rpc_status::RpcStatus;

type EventCallback = Box<dyn Fn(Event) -> BoxFuture<()> + 'static + Sync + Send>;

#[derive(Default, derive_more::Debug, Clone)]
struct Callbacks(#[debug("..")] Arc<RwLock<Vec<EventCallback>>>);

impl Callbacks {
async fn push(&self, cb: EventCallback) {
self.0.write().await.push(cb);
}

#[allow(dead_code)]
async fn send(&self, event: Event) {
let cbs = self.0.read().await;
for cb in &*cbs {
cb(event.clone()).await;
}
}
}

impl iroh_blobs::provider::EventSender for Callbacks {
fn send(&self, event: iroh_blobs::provider::Event) -> BoxFuture<()> {
let this = self.clone();
async move {
let cbs = this.0.read().await;
for cb in &*cbs {
cb(Event::ByteProvide(event.clone())).await;
}
}
.boxed()
}
}

/// A server which implements the iroh node.
///
/// Clients can connect to this server and requests hashes from it.
Expand All @@ -91,9 +56,6 @@ struct NodeInner<D> {
secret_key: SecretKey,
cancel_token: CancellationToken,
controller: FlumeConnection<Response, Request>,
#[debug("callbacks: Sender<Box<dyn Fn(Event)>>")]
cb_sender: mpsc::Sender<Box<dyn Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>>,
callbacks: Callbacks,
#[allow(dead_code)]
gc_task: Option<AbortingJoinHandle<()>>,
#[debug("rt")]
Expand All @@ -102,15 +64,6 @@ struct NodeInner<D> {
downloader: Downloader,
}

/// Events emitted by the [`Node`] informing about the current status.
#[derive(Debug, Clone)]
pub enum Event {
/// Events from the iroh-blobs transfer protocol.
ByteProvide(iroh_blobs::provider::Event),
/// Events from database
Db(iroh_blobs::store::Event),
}

/// In memory node.
pub type MemNode = Node<iroh_blobs::store::mem::Store>;

Expand Down Expand Up @@ -177,18 +130,6 @@ impl<D: BaoStore> Node<D> {
self.inner.secret_key.public()
}

/// Subscribe to [`Event`]s emitted from the node, informing about connections and
/// progress.
///
/// Warning: The callback must complete quickly, as otherwise it will block ongoing work.
pub async fn subscribe<F: Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>(
&self,
cb: F,
) -> Result<()> {
self.inner.cb_sender.send(Box::new(cb)).await?;
Ok(())
}

/// Returns a handle that can be used to do RPC calls to the node internally.
pub fn controller(&self) -> crate::client::MemRpcClient {
RpcClient::new(self.inner.controller.clone())
Expand Down Expand Up @@ -319,23 +260,7 @@ mod tests {

let _drop_guard = node.cancel_token().drop_guard();

let (r, mut s) = mpsc::channel(1);
node.subscribe(move |event| {
let r = r.clone();
async move {
if let Event::ByteProvide(iroh_blobs::provider::Event::TaggedBlobAdded {
hash,
..
}) = event
{
r.send(hash).await.ok();
}
}
.boxed()
})
.await?;

let got_hash = tokio::time::timeout(Duration::from_secs(1), async move {
let _got_hash = tokio::time::timeout(Duration::from_secs(1), async move {
let mut stream = node
.controller()
.server_streaming(BlobAddPathRequest {
Expand Down Expand Up @@ -364,9 +289,6 @@ mod tests {
.context("timeout")?
.context("get failed")?;

let event_hash = s.recv().await.expect("missing add tagged blob event");
assert_eq!(got_hash, event_hash);

Ok(())
}

Expand Down
66 changes: 38 additions & 28 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,18 @@ use quic_rpc::{
RpcServer, ServiceEndpoint,
};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio_util::{sync::CancellationToken, task::LocalPoolHandle};
use tracing::{debug, error, error_span, info, trace, warn, Instrument};

use crate::{
client::RPC_ALPN,
docs_engine::Engine,
node::{Event, NodeInner},
node::NodeInner,
rpc_protocol::{Request, Response, RpcService},
util::{fs::load_secret_key, path::IrohPaths},
};

use super::{rpc, rpc_status::RpcStatus, Callbacks, EventCallback, Node};
use super::{rpc, rpc_status::RpcStatus, Node};

pub const PROTOCOLS: [&[u8]; 3] = [iroh_blobs::protocol::ALPN, GOSSIP_ALPN, DOCS_ALPN];

Expand Down Expand Up @@ -69,7 +68,7 @@ const MAX_STREAMS: u64 = 10;
///
/// The returned [`Node`] is awaitable to know when it finishes. It can be terminated
/// using [`Node::shutdown`].
#[derive(Debug)]
#[derive(derive_more::Debug)]
pub struct Builder<D, E = DummyServerEndpoint>
where
D: Map,
Expand All @@ -88,6 +87,9 @@ where
docs_store: iroh_docs::store::fs::Store,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: bool,
/// Callback to register when a gc loop is done
#[debug("callback")]
gc_done_callback: Option<Box<dyn Fn() + Send>>,
}

/// Configuration for storage.
Expand Down Expand Up @@ -135,6 +137,7 @@ impl Default for Builder<iroh_blobs::store::mem::Store> {
node_discovery: Default::default(),
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
gc_done_callback: None,
}
}
}
Expand All @@ -160,6 +163,7 @@ impl<D: Map> Builder<D> {
node_discovery: Default::default(),
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
gc_done_callback: None,
}
}
}
Expand Down Expand Up @@ -222,6 +226,7 @@ where
node_discovery: self.node_discovery,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
gc_done_callback: self.gc_done_callback,
})
}

Expand All @@ -242,6 +247,7 @@ where
node_discovery: self.node_discovery,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
gc_done_callback: self.gc_done_callback,
}
}

Expand All @@ -267,6 +273,7 @@ where
node_discovery: self.node_discovery,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
gc_done_callback: self.gc_done_callback,
})
}

Expand Down Expand Up @@ -337,6 +344,13 @@ where
self
}

/// Register a callback for when GC is done.
#[cfg(any(test, feature = "test-utils"))]
pub fn register_gc_done_cb(mut self, cb: Box<dyn Fn() + Send>) -> Self {
self.gc_done_callback.replace(cb);
self
}

/// Whether to log the SSL pre-master key.
///
/// If `true` and the `SSLKEYLOGFILE` environment variable is the path to a file this
Expand All @@ -352,7 +366,7 @@ where
/// This will create the underlying network server and spawn a tokio task accepting
/// connections. The returned [`Node`] can be used to control the task as well as
/// get information about it.
pub async fn spawn(self) -> Result<Node<D>> {
pub async fn spawn(mut self) -> Result<Node<D>> {
trace!("spawning node");
let lp = LocalPoolHandle::new(num_cpus::get());

Expand Down Expand Up @@ -406,7 +420,6 @@ where
let endpoint = endpoint.bind(bind_port).await?;
trace!("created quinn endpoint");

let (cb_sender, cb_receiver) = mpsc::channel(8);
let cancel_token = CancellationToken::new();

debug!("rpc listening on: {:?}", self.rpc_endpoint.local_addr());
Expand All @@ -427,12 +440,13 @@ where
);
let sync_db = sync.sync.clone();

let callbacks = Callbacks::default();
let gc_task = if let GcPolicy::Interval(gc_period) = self.gc_policy {
tracing::info!("Starting GC task with interval {:?}", gc_period);
let db = self.blobs_store.clone();
let callbacks = callbacks.clone();
let task = lp.spawn_pinned(move || Self::gc_loop(db, sync_db, gc_period, callbacks));
let gc_done_callback = self.gc_done_callback.take();

let task =
lp.spawn_pinned(move || Self::gc_loop(db, sync_db, gc_period, gc_done_callback));
Some(task.into())
} else {
None
Expand All @@ -446,8 +460,6 @@ where
secret_key: self.secret_key,
controller,
cancel_token,
callbacks: callbacks.clone(),
cb_sender,
gc_task,
rt: lp.clone(),
sync,
Expand All @@ -464,8 +476,6 @@ where
async move {
Self::run(
ep,
callbacks,
cb_receiver,
handler,
self.rpc_endpoint,
internal_rpc,
Expand Down Expand Up @@ -508,8 +518,6 @@ where
#[allow(clippy::too_many_arguments)]
async fn run(
server: Endpoint,
callbacks: Callbacks,
mut cb_receiver: mpsc::Receiver<EventCallback>,
handler: rpc::Handler<D>,
rpc: E,
internal_rpc: impl ServiceEndpoint<RpcService>,
Expand Down Expand Up @@ -586,10 +594,6 @@ where
}
});
},
// Handle new callbacks
Some(cb) = cb_receiver.recv() => {
callbacks.push(cb).await;
}
else => break,
}
}
Expand All @@ -609,7 +613,7 @@ where
db: D,
ds: iroh_docs::actor::SyncHandle,
gc_period: Duration,
callbacks: Callbacks,
done_cb: Option<Box<dyn Fn() + Send>>,
) {
let mut live = BTreeSet::new();
tracing::debug!("GC loop starting {:?}", gc_period);
Expand All @@ -623,14 +627,11 @@ where
// do delay before the two phases of GC
tokio::time::sleep(gc_period).await;
tracing::debug!("Starting GC");
callbacks
.send(Event::Db(iroh_blobs::store::Event::GcStarted))
.await;
live.clear();
let doc_hashes = match ds.content_hashes().await {
Ok(hashes) => hashes,
Err(err) => {
tracing::error!("Error getting doc hashes: {}", err);
tracing::warn!("Error getting doc hashes: {}", err);
continue 'outer;
}
};
Expand Down Expand Up @@ -680,9 +681,9 @@ where
}
}
}
callbacks
.send(Event::Db(iroh_blobs::store::Event::GcCompleted))
.await;
if let Some(ref cb) = done_cb {
cb();
}
}
}
}
Expand Down Expand Up @@ -719,7 +720,7 @@ async fn handle_connection<D: BaoStore>(
iroh_blobs::provider::handle_connection(
connection,
node.db.clone(),
node.callbacks.clone(),
MockEventSender,
node.rt.clone(),
)
.await
Expand Down Expand Up @@ -776,3 +777,12 @@ fn make_rpc_endpoint(

Ok((rpc_endpoint, actual_rpc_port))
}

#[derive(Debug, Clone)]
struct MockEventSender;

impl iroh_blobs::provider::EventSender for MockEventSender {
fn send(&self, _event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> {
Box::pin(std::future::ready(()))
}
}
9 changes: 1 addition & 8 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use crate::rpc_protocol::{
NodeWatchResponse, Request, RpcService, SetTagOption,
};

use super::{Event, NodeInner};
use super::NodeInner;

const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1);
/// Chunk size for getting blobs over RPC
Expand Down Expand Up @@ -761,13 +761,6 @@ impl<D: BaoStore> Handler<D> {
tag: tag.clone(),
})
.await?;
self.inner
.callbacks
.send(Event::ByteProvide(
iroh_blobs::provider::Event::TaggedBlobAdded { hash, format, tag },
))
.await;

Ok(())
}

Expand Down
Loading
Loading