Skip to content

Commit

Permalink
refactor(iroh-net)!: Rename MagicEndpoint -> Endpoint (#2287)
Browse files Browse the repository at this point in the history
## Description

This renames the MagicEndpoint (and it's builder) into Endpoint.
Hopefully providing a more consistent API.

## Breaking Changes

- iroh_net::magic_endpoint -> iroh_net::endpoint
- iroh_net::magic_endpoint::MagicEndpoint ->
iroh_net::endpoint::Endpoint
- iroh_net::magic_endpoint::MagicEndpointBuilder ->
iroh_net::endpoint::Builder
- iroh::node::Node::magic_endpoint -> iroh::node::Node::endpoint

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
flub authored May 13, 2024
1 parent 531829d commit f4d6ca1
Show file tree
Hide file tree
Showing 42 changed files with 231 additions and 246 deletions.
2 changes: 1 addition & 1 deletion iroh-blobs/examples/fetch-fsm.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! An example how to download a single blob or collection from a node and write it to stdout using the `get` finite state machine directly.
//!
//! Since this example does not use `iroh-net::MagicEndpoint`, it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses.
//! Since this example does not use [`iroh-net::Endpoint`], it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses.
//!
//! Run the provide-bytes example first. It will give instructions on how to run this example properly.
use std::net::SocketAddr;
Expand Down
2 changes: 1 addition & 1 deletion iroh-blobs/examples/fetch-stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! An example how to download a single blob or collection from a node and write it to stdout, using a helper method to turn the `get` finite state machine into a stream.
//!
//! Since this example does not use `iroh-net::MagicEndpoint`, it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses.
//! Since this example does not use [`iroh-net::Endpoint`], it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses.
//!
//! Run the provide-bytes example first. It will give instructions on how to run this example properly.
use std::net::SocketAddr;
Expand Down
2 changes: 1 addition & 1 deletion iroh-blobs/examples/provide-bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! An example that provides a blob or a collection over a Quinn connection.
//!
//! Since this example does not use `iroh-net::MagicEndpoint`, it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses.
//! Since this example does not use [`iroh-net::Endpoint`], it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses.
//!
//! Run this example with
//! cargo run --example provide-bytes blob
Expand Down
8 changes: 4 additions & 4 deletions iroh-blobs/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use std::{
use futures_lite::{future::BoxedLocal, Stream, StreamExt};
use hashlink::LinkedHashSet;
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_net::{magic_endpoint, MagicEndpoint, NodeAddr, NodeId};
use iroh_net::{endpoint, Endpoint, NodeAddr, NodeId};
use tokio::{
sync::{mpsc, oneshot},
task::JoinSet,
Expand Down Expand Up @@ -324,7 +324,7 @@ pub struct Downloader {

impl Downloader {
/// Create a new Downloader with the default [`ConcurrencyLimits`] and [`RetryConfig`].
pub fn new<S>(store: S, endpoint: MagicEndpoint, rt: LocalPoolHandle) -> Self
pub fn new<S>(store: S, endpoint: Endpoint, rt: LocalPoolHandle) -> Self
where
S: Store,
{
Expand All @@ -334,7 +334,7 @@ impl Downloader {
/// Create a new Downloader with custom [`ConcurrencyLimits`] and [`RetryConfig`].
pub fn with_config<S>(
store: S,
endpoint: MagicEndpoint,
endpoint: Endpoint,
rt: LocalPoolHandle,
concurrency_limits: ConcurrencyLimits,
retry_config: RetryConfig,
Expand Down Expand Up @@ -1452,7 +1452,7 @@ impl Queue {
}

impl Dialer for iroh_net::dialer::Dialer {
type Connection = magic_endpoint::Connection;
type Connection = endpoint::Connection;

fn queue_dial(&mut self, node_id: NodeId) {
self.queue_dial(node_id, crate::protocol::ALPN)
Expand Down
8 changes: 4 additions & 4 deletions iroh-blobs/src/downloader/get.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! [`Getter`] implementation that performs requests over [`Connection`]s.
//!
//! [`Connection`]: iroh_net::magic_endpoint::Connection
//! [`Connection`]: iroh_net::endpoint::Connection
use crate::{
get::{db::get_to_db, error::GetError},
Expand All @@ -9,7 +9,7 @@ use crate::{
use futures_lite::FutureExt;
#[cfg(feature = "metrics")]
use iroh_metrics::{inc, inc_by};
use iroh_net::magic_endpoint;
use iroh_net::endpoint;

#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
Expand All @@ -32,13 +32,13 @@ impl From<GetError> for FailureAction {

/// [`Getter`] implementation that performs requests over [`Connection`]s.
///
/// [`Connection`]: iroh_net::magic_endpoint::Connection
/// [`Connection`]: iroh_net::endpoint::Connection
pub(crate) struct IoGetter<S: Store> {
pub store: S,
}

impl<S: Store> Getter for IoGetter<S> {
type Connection = magic_endpoint::Connection;
type Connection = endpoint::Connection;

fn get(
&mut self,
Expand Down
34 changes: 17 additions & 17 deletions iroh-blobs/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::Hash;
use anyhow::Result;
use bao_tree::io::fsm::BaoContentItem;
use bao_tree::ChunkNum;
use iroh_net::magic_endpoint::{self, RecvStream, SendStream};
use iroh_net::endpoint::{self, RecvStream, SendStream};
use serde::{Deserialize, Serialize};
use tracing::{debug, error};

Expand Down Expand Up @@ -72,7 +72,7 @@ pub mod fsm {
};
use derive_more::From;
use iroh_io::{AsyncSliceWriter, AsyncStreamReader, TokioStreamReader};
use iroh_net::magic_endpoint::Connection;
use iroh_net::endpoint::Connection;
use tokio::io::AsyncWriteExt;

type WrappedRecvStream = TrackingReader<TokioStreamReader<RecvStream>>;
Expand Down Expand Up @@ -143,7 +143,7 @@ pub mod fsm {
}

/// Initiate a new bidi stream to use for the get response
pub async fn next(self) -> Result<AtConnected, magic_endpoint::ConnectionError> {
pub async fn next(self) -> Result<AtConnected, endpoint::ConnectionError> {
let start = Instant::now();
let (writer, reader) = self.connection.open_bi().await?;
let reader = TrackingReader::new(TokioStreamReader::new(reader));
Expand Down Expand Up @@ -188,7 +188,7 @@ pub mod fsm {
RequestTooBig,
/// Error when writing the request to the [`SendStream`].
#[error("write: {0}")]
Write(#[from] magic_endpoint::WriteError),
Write(#[from] endpoint::WriteError),
/// A generic io error
#[error("io {0}")]
Io(io::Error),
Expand All @@ -197,7 +197,7 @@ pub mod fsm {
impl ConnectedNextError {
fn from_io(cause: io::Error) -> Self {
if let Some(inner) = cause.get_ref() {
if let Some(e) = inner.downcast_ref::<magic_endpoint::WriteError>() {
if let Some(e) = inner.downcast_ref::<endpoint::WriteError>() {
Self::Write(e.clone())
} else {
Self::Io(cause)
Expand Down Expand Up @@ -395,7 +395,7 @@ pub mod fsm {
NotFound,
/// Quinn read error when reading the size header
#[error("read: {0}")]
Read(magic_endpoint::ReadError),
Read(endpoint::ReadError),
/// Generic io error
#[error("io: {0}")]
Io(io::Error),
Expand All @@ -421,7 +421,7 @@ pub mod fsm {
AtBlobHeaderNextError::NotFound
} else if let Some(e) = cause
.get_ref()
.and_then(|x| x.downcast_ref::<magic_endpoint::ReadError>())
.and_then(|x| x.downcast_ref::<endpoint::ReadError>())
{
AtBlobHeaderNextError::Read(e.clone())
} else {
Expand Down Expand Up @@ -544,7 +544,7 @@ pub mod fsm {
/// The [`DecodeError::Io`] variant is just a fallback for any other io error that
/// is not actually a [`ReadError`].
///
/// [`ReadError`]: magic_endpoint::ReadError
/// [`ReadError`]: endpoint::ReadError
#[derive(Debug, thiserror::Error)]
pub enum DecodeError {
/// A chunk was not found or invalid, so the provider stopped sending data
Expand All @@ -564,7 +564,7 @@ pub mod fsm {
LeafHashMismatch(ChunkNum),
/// Error when reading from the stream
#[error("read: {0}")]
Read(magic_endpoint::ReadError),
Read(endpoint::ReadError),
/// A generic io error
#[error("io: {0}")]
Io(#[from] io::Error),
Expand Down Expand Up @@ -605,7 +605,7 @@ pub mod fsm {
bao_tree::io::DecodeError::LeafHashMismatch(chunk) => Self::LeafHashMismatch(chunk),
bao_tree::io::DecodeError::Io(cause) => {
if let Some(inner) = cause.get_ref() {
if let Some(e) = inner.downcast_ref::<magic_endpoint::ReadError>() {
if let Some(e) = inner.downcast_ref::<endpoint::ReadError>() {
Self::Read(e.clone())
} else {
Self::Io(cause)
Expand Down Expand Up @@ -847,7 +847,7 @@ pub mod fsm {
}

/// Finish the get response, returning statistics
pub async fn next(self) -> result::Result<Stats, magic_endpoint::ReadError> {
pub async fn next(self) -> result::Result<Stats, endpoint::ReadError> {
// Shut down the stream
let (reader, bytes_read) = self.reader.into_parts();
let mut reader = reader.into_inner();
Expand Down Expand Up @@ -884,13 +884,13 @@ pub mod fsm {
pub enum GetResponseError {
/// Error when opening a stream
#[error("connection: {0}")]
Connection(#[from] magic_endpoint::ConnectionError),
Connection(#[from] endpoint::ConnectionError),
/// Error when writing the handshake or request to the stream
#[error("write: {0}")]
Write(#[from] magic_endpoint::WriteError),
Write(#[from] endpoint::WriteError),
/// Error when reading from the stream
#[error("read: {0}")]
Read(#[from] magic_endpoint::ReadError),
Read(#[from] endpoint::ReadError),
/// Error when decoding, e.g. hash mismatch
#[error("decode: {0}")]
Decode(bao_tree::io::DecodeError),
Expand All @@ -911,13 +911,13 @@ impl From<bao_tree::io::DecodeError> for GetResponseError {
bao_tree::io::DecodeError::Io(cause) => {
// try to downcast to specific quinn errors
if let Some(source) = cause.source() {
if let Some(error) = source.downcast_ref::<magic_endpoint::ConnectionError>() {
if let Some(error) = source.downcast_ref::<endpoint::ConnectionError>() {
return Self::Connection(error.clone());
}
if let Some(error) = source.downcast_ref::<magic_endpoint::ReadError>() {
if let Some(error) = source.downcast_ref::<endpoint::ReadError>() {
return Self::Read(error.clone());
}
if let Some(error) = source.downcast_ref::<magic_endpoint::WriteError>() {
if let Some(error) = source.downcast_ref::<endpoint::WriteError>() {
return Self::Write(error.clone());
}
}
Expand Down
2 changes: 1 addition & 1 deletion iroh-blobs/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::num::NonZeroU64;
use futures_lite::StreamExt;
use iroh_base::hash::Hash;
use iroh_base::rpc::RpcError;
use iroh_net::magic_endpoint::Connection;
use iroh_net::endpoint::Connection;
use serde::{Deserialize, Serialize};

use crate::hashseq::parse_hash_seq;
Expand Down
20 changes: 10 additions & 10 deletions iroh-blobs/src/get/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Error returned from get operations
use iroh_net::magic_endpoint;
use iroh_net::endpoint;

use crate::util::progress::ProgressSendError;

Expand Down Expand Up @@ -35,10 +35,10 @@ impl From<ProgressSendError> for GetError {
}
}

impl From<magic_endpoint::ConnectionError> for GetError {
fn from(value: magic_endpoint::ConnectionError) -> Self {
impl From<endpoint::ConnectionError> for GetError {
fn from(value: endpoint::ConnectionError) -> Self {
// explicit match just to be sure we are taking everything into account
use magic_endpoint::ConnectionError;
use endpoint::ConnectionError;
match value {
e @ ConnectionError::VersionMismatch => {
// > The peer doesn't implement any supported version
Expand Down Expand Up @@ -77,9 +77,9 @@ impl From<magic_endpoint::ConnectionError> for GetError {
}
}

impl From<magic_endpoint::ReadError> for GetError {
fn from(value: magic_endpoint::ReadError) -> Self {
use magic_endpoint::ReadError;
impl From<endpoint::ReadError> for GetError {
fn from(value: endpoint::ReadError) -> Self {
use endpoint::ReadError;
match value {
e @ ReadError::Reset(_) => GetError::RemoteReset(e.into()),
ReadError::ConnectionLost(conn_error) => conn_error.into(),
Expand All @@ -93,9 +93,9 @@ impl From<magic_endpoint::ReadError> for GetError {
}
}

impl From<magic_endpoint::WriteError> for GetError {
fn from(value: magic_endpoint::WriteError) -> Self {
use magic_endpoint::WriteError;
impl From<endpoint::WriteError> for GetError {
fn from(value: endpoint::WriteError) -> Self {
use endpoint::WriteError;
match value {
e @ WriteError::Stopped(_) => GetError::RemoteReset(e.into()),
WriteError::ConnectionLost(conn_error) => conn_error.into(),
Expand Down
2 changes: 1 addition & 1 deletion iroh-blobs/src/get/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
};
use bao_tree::{ChunkNum, ChunkRanges};
use bytes::Bytes;
use iroh_net::magic_endpoint::Connection;
use iroh_net::endpoint::Connection;
use rand::Rng;

use super::{fsm, Stats};
Expand Down
6 changes: 3 additions & 3 deletions iroh-blobs/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@
//! keep a connection open and reuse it for multiple requests.
use bao_tree::{ChunkNum, ChunkRanges};
use derive_more::From;
use iroh_net::magic_endpoint::VarInt;
use iroh_net::endpoint::VarInt;
use serde::{Deserialize, Serialize};
mod range_spec;
pub use range_spec::{NonEmptyRequestRangeSpecIter, RangeSpec, RangeSpecSeq};
Expand Down Expand Up @@ -433,8 +433,8 @@ pub enum Closed {
/// [`RecvStream::stop`]. We don't use this explicitly but this is here as
/// documentation as to what happened to `0`.
///
/// [`RecvStream`]: iroh_net::magic_endpoint::RecvStream
/// [`RecvStream::stop`]: iroh_net::magic_endpoint::RecvStream::stop
/// [`RecvStream`]: iroh_net::endpoint::RecvStream
/// [`RecvStream::stop`]: iroh_net::endpoint::RecvStream::stop
StreamDropped = 0,
/// The provider is terminating.
///
Expand Down
4 changes: 2 additions & 2 deletions iroh-blobs/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use iroh_io::stats::{
SliceReaderStats, StreamWriterStats, TrackingSliceReader, TrackingStreamWriter,
};
use iroh_io::{AsyncSliceReader, AsyncStreamWriter, TokioStreamWriter};
use iroh_net::magic_endpoint::{self, RecvStream, SendStream};
use iroh_net::endpoint::{self, RecvStream, SendStream};
use serde::{Deserialize, Serialize};
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, debug_span, info, trace, warn};
Expand Down Expand Up @@ -281,7 +281,7 @@ pub trait EventSender: Clone + Sync + Send + 'static {

/// Handle a single connection.
pub async fn handle_connection<D: Map, E: EventSender>(
connection: magic_endpoint::Connection,
connection: endpoint::Connection,
db: D,
events: E,
rt: LocalPoolHandle,
Expand Down
Loading

0 comments on commit f4d6ca1

Please sign in to comment.