Skip to content

Commit

Permalink
refactor: simplify generics in client
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed May 21, 2024
1 parent 370075c commit 68b341b
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 25 deletions.
54 changes: 44 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ missing_debug_implementations = "warn"

[workspace.lints.clippy]
unused-async = "warn"

[patch.crates-io]
quic-rpc = { git = "https://github.com/n0-computer/quic-rpc.git", branch = "main" }
8 changes: 4 additions & 4 deletions iroh/src/client/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
use quic_rpc::transport::flume::FlumeConnection;

use crate::rpc_protocol::{Request, Response, RpcService};
use crate::rpc_protocol::RpcService;

/// RPC client to an iroh node running in the same process.
pub type RpcClient = quic_rpc::RpcClient<RpcService, FlumeConnection<Response, Request>>;
pub type RpcClient = quic_rpc::RpcClient<RpcService, FlumeConnection<RpcService>>;

/// In-memory client to an iroh node running in the same process.
///
/// This is obtained from [`crate::node::Node::client`].
pub type Iroh = super::Iroh<FlumeConnection<Response, Request>>;
pub type Iroh = super::Iroh<FlumeConnection<RpcService>>;

/// In-memory document client to an iroh node running in the same process.
pub type Doc = super::docs::Doc<FlumeConnection<Response, Request>>;
pub type Doc = super::docs::Doc<FlumeConnection<RpcService>>;
10 changes: 5 additions & 5 deletions iroh/src/client/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,23 @@ use quic_rpc::transport::quinn::QuinnConnection;

use crate::{
node::RpcStatus,
rpc_protocol::{NodeStatusRequest, Request, Response, RpcService},
rpc_protocol::{NodeStatusRequest, RpcService},
};

/// ALPN used by irohs RPC mechanism.
// TODO: Change to "/iroh-rpc/1"
pub(crate) const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1";

/// RPC client to an iroh node running in a separate process.
pub type RpcClient = quic_rpc::RpcClient<RpcService, QuinnConnection<Response, Request>>;
pub type RpcClient = quic_rpc::RpcClient<RpcService, QuinnConnection<RpcService>>;

/// Client to an iroh node running in a separate process.
///
/// This is obtained from [`Iroh::connect`].
pub type Iroh = super::Iroh<QuinnConnection<Response, Request>>;
pub type Iroh = super::Iroh<QuinnConnection<RpcService>>;

/// RPC document client to an iroh node running in a separate process.
pub type Doc = super::docs::Doc<QuinnConnection<Response, Request>>;
pub type Doc = super::docs::Doc<QuinnConnection<RpcService>>;

impl Iroh {
/// Connect to an iroh node running on the same computer, but in a different process.
Expand All @@ -50,7 +50,7 @@ pub(crate) async fn connect_raw(rpc_port: u16) -> anyhow::Result<RpcClient> {
let endpoint = create_quinn_client(bind_addr, vec![RPC_ALPN.to_vec()], false)?;
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), rpc_port);
let server_name = "localhost".to_string();
let connection = QuinnConnection::new(endpoint, addr, server_name);
let connection = QuinnConnection::<RpcService>::new(endpoint, addr, server_name);
let client = RpcClient::new(connection);
// Do a status request to check if the server is running.
let _version = tokio::time::timeout(Duration::from_secs(1), client.rpc(NodeStatusRequest))
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
use tracing::debug;

use crate::client::RpcService;
use crate::docs_engine::Engine;
use crate::rpc_protocol::{Request, Response};

mod builder;
mod rpc;
Expand Down Expand Up @@ -90,7 +90,7 @@ struct NodeInner<D> {
endpoint: Endpoint,
secret_key: SecretKey,
cancel_token: CancellationToken,
controller: FlumeConnection<Response, Request>,
controller: FlumeConnection<RpcService>,
#[debug("callbacks: Sender<Box<dyn Fn(Event)>>")]
cb_sender: mpsc::Sender<Box<dyn Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>>,
callbacks: Callbacks,
Expand Down
8 changes: 4 additions & 4 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
client::RPC_ALPN,
docs_engine::Engine,
node::{Event, NodeInner},
rpc_protocol::{Request, Response, RpcService},
rpc_protocol::RpcService,
util::{fs::load_secret_key, path::IrohPaths},
};

Expand Down Expand Up @@ -246,7 +246,7 @@ where
}

/// Configure the default iroh rpc endpoint.
pub async fn enable_rpc(self) -> Result<Builder<D, QuinnServerEndpoint<Request, Response>>> {
pub async fn enable_rpc(self) -> Result<Builder<D, QuinnServerEndpoint<RpcService>>> {
let (ep, actual_rpc_port) = make_rpc_endpoint(&self.secret_key, DEFAULT_RPC_PORT)?;
if let StorageConfig::Persistent(ref root) = self.storage {
// store rpc endpoint
Expand Down Expand Up @@ -737,7 +737,7 @@ const MAX_RPC_STREAMS: u32 = 1024;
fn make_rpc_endpoint(
secret_key: &SecretKey,
rpc_port: u16,
) -> Result<(QuinnServerEndpoint<Request, Response>, u16)> {
) -> Result<(QuinnServerEndpoint<RpcService>, u16)> {
let rpc_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, rpc_port);
let mut transport_config = quinn::TransportConfig::default();
transport_config
Expand Down Expand Up @@ -772,7 +772,7 @@ fn make_rpc_endpoint(
};

let actual_rpc_port = rpc_quinn_endpoint.local_addr()?.port();
let rpc_endpoint = QuinnServerEndpoint::<Request, Response>::new(rpc_quinn_endpoint)?;
let rpc_endpoint = QuinnServerEndpoint::<RpcService>::new(rpc_quinn_endpoint)?;

Ok((rpc_endpoint, actual_rpc_port))
}

0 comments on commit 68b341b

Please sign in to comment.