Skip to content

Commit

Permalink
refactor: simplify generics in client
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed May 6, 2024
1 parent bde91d4 commit e39a486
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 22 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ missing_debug_implementations = "warn"

[workspace.lints.clippy]
unused-async = "warn"

[patch.crates-io]
quic-rpc = { git = "https://github.com/n0-computer/quic-rpc.git", branch = "refactor/service-conn" }
9 changes: 4 additions & 5 deletions iroh/src/client/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
use quic_rpc::transport::flume::FlumeConnection;

use crate::rpc_protocol::{ProviderRequest, ProviderResponse, RpcService};
use crate::rpc_protocol::RpcService;

/// RPC client to an iroh node running in the same process.
pub type RpcClient =
quic_rpc::RpcClient<RpcService, FlumeConnection<ProviderResponse, ProviderRequest>>;
pub type RpcClient = quic_rpc::RpcClient<RpcService, FlumeConnection<RpcService>>;

/// In-memory client to an iroh node running in the same process.
///
/// This is obtained from [`crate::node::Node::client`].
pub type Iroh = super::Iroh<FlumeConnection<ProviderResponse, ProviderRequest>>;
pub type Iroh = super::Iroh<FlumeConnection<RpcService>>;

/// In-memory document client to an iroh node running in the same process.
pub type Doc = super::docs::Doc<FlumeConnection<ProviderResponse, ProviderRequest>>;
pub type Doc = super::docs::Doc<FlumeConnection<RpcService>>;
11 changes: 5 additions & 6 deletions iroh/src/client/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,23 @@ use quic_rpc::transport::quinn::QuinnConnection;

use crate::{
node::RpcStatus,
rpc_protocol::{NodeStatusRequest, ProviderRequest, ProviderResponse, RpcService},
rpc_protocol::{NodeStatusRequest, RpcService},
};

/// ALPN used by irohs RPC mechanism.
// TODO: Change to "/iroh-rpc/1"
pub(crate) const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1";

/// RPC client to an iroh node running in a separate process.
pub type RpcClient =
quic_rpc::RpcClient<RpcService, QuinnConnection<ProviderResponse, ProviderRequest>>;
pub type RpcClient = quic_rpc::RpcClient<RpcService, QuinnConnection<RpcService>>;

/// Client to an iroh node running in a separate process.
///
/// This is obtained from [`Iroh::connect`].
pub type Iroh = super::Iroh<QuinnConnection<ProviderResponse, ProviderRequest>>;
pub type Iroh = super::Iroh<QuinnConnection<RpcService>>;

/// RPC document client to an iroh node running in a separate process.
pub type Doc = super::docs::Doc<QuinnConnection<ProviderResponse, ProviderRequest>>;
pub type Doc = super::docs::Doc<QuinnConnection<RpcService>>;

impl Iroh {
/// Connect to an iroh node running on the same computer, but in a different process.
Expand All @@ -51,7 +50,7 @@ pub(crate) async fn connect_raw(rpc_port: u16) -> anyhow::Result<RpcClient> {
let endpoint = create_quinn_client(bind_addr, vec![RPC_ALPN.to_vec()], false)?;
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), rpc_port);
let server_name = "localhost".to_string();
let connection = QuinnConnection::new(endpoint, addr, server_name);
let connection = QuinnConnection::<RpcService>::new(endpoint, addr, server_name);
let client = RpcClient::new(connection);
// Do a status request to check if the server is running.
let _version = tokio::time::timeout(Duration::from_secs(1), client.rpc(NodeStatusRequest))
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
use tracing::debug;

use crate::rpc_protocol::{ProviderRequest, ProviderResponse};
use crate::client::RpcService;
use crate::sync_engine::SyncEngine;

mod builder;
Expand Down Expand Up @@ -97,7 +97,7 @@ struct NodeInner<D> {
endpoint: MagicEndpoint,
secret_key: SecretKey,
cancel_token: CancellationToken,
controller: FlumeConnection<ProviderResponse, ProviderRequest>,
controller: FlumeConnection<RpcService>,
#[debug("callbacks: Sender<Box<dyn Fn(Event)>>")]
cb_sender: mpsc::Sender<Box<dyn Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>>,
callbacks: Callbacks,
Expand Down
11 changes: 4 additions & 7 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tracing::{debug, error, error_span, info, trace, warn, Instrument};
use crate::{
client::RPC_ALPN,
node::{Event, NodeInner},
rpc_protocol::{ProviderRequest, ProviderResponse, RpcService},
rpc_protocol::RpcService,
sync_engine::SyncEngine,
util::{fs::load_secret_key, path::IrohPaths},
};
Expand Down Expand Up @@ -247,9 +247,7 @@ where
}

/// Configure the default iroh rpc endpoint.
pub async fn enable_rpc(
self,
) -> Result<Builder<D, QuinnServerEndpoint<ProviderRequest, ProviderResponse>>> {
pub async fn enable_rpc(self) -> Result<Builder<D, QuinnServerEndpoint<RpcService>>> {
let (ep, actual_rpc_port) = make_rpc_endpoint(&self.secret_key, DEFAULT_RPC_PORT)?;
if let StorageConfig::Persistent(ref root) = self.storage {
// store rpc endpoint
Expand Down Expand Up @@ -739,7 +737,7 @@ const MAX_RPC_STREAMS: u32 = 1024;
fn make_rpc_endpoint(
secret_key: &SecretKey,
rpc_port: u16,
) -> Result<(QuinnServerEndpoint<ProviderRequest, ProviderResponse>, u16)> {
) -> Result<(QuinnServerEndpoint<RpcService>, u16)> {
let rpc_addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, rpc_port);
let mut transport_config = quinn::TransportConfig::default();
transport_config
Expand Down Expand Up @@ -774,8 +772,7 @@ fn make_rpc_endpoint(
};

let actual_rpc_port = rpc_quinn_endpoint.local_addr()?.port();
let rpc_endpoint =
QuinnServerEndpoint::<ProviderRequest, ProviderResponse>::new(rpc_quinn_endpoint)?;
let rpc_endpoint = QuinnServerEndpoint::<RpcService>::new(rpc_quinn_endpoint)?;

Ok((rpc_endpoint, actual_rpc_port))
}

0 comments on commit e39a486

Please sign in to comment.