Skip to content

Commit

Permalink
feat: require Send in a few places to allow the API to be Send
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Oct 20, 2022
1 parent caba4ea commit 59bc0b5
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 50 deletions.
50 changes: 23 additions & 27 deletions iroh-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::p2p::MockP2p;
use crate::p2p::{ClientP2p, P2p};
use crate::{AddEvent, IpfsPath};
use anyhow::Result;
use futures::future::{BoxFuture, LocalBoxFuture};
use futures::stream::LocalBoxStream;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::FutureExt;
use futures::StreamExt;
use iroh_resolver::unixfs_builder;
Expand All @@ -26,7 +26,7 @@ pub struct Iroh {

pub enum OutType {
Dir,
Reader(Box<dyn AsyncRead + Unpin>),
Reader(Box<dyn AsyncRead + Send + Unpin>),
Symlink(PathBuf),
}

Expand All @@ -35,37 +35,35 @@ pub enum OutType {
// Instead we spell things out explicitly without magic.

#[cfg_attr(feature= "testing", automock(type P = MockP2p;))]
pub trait Api {
pub trait Api: Send + Sync {
type P: P2p;

fn p2p(&self) -> Result<Self::P>;

/// Produces a asynchronous stream of file descriptions
/// Each description is a tuple of a relative path, and either a `Directory` or a `Reader`
/// with the file contents.
fn get_stream(
&self,
ipfs_path: &IpfsPath,
) -> LocalBoxStream<'_, Result<(RelativePathBuf, OutType)>>;
fn get_stream(&self, ipfs_path: &IpfsPath)
-> BoxStream<'_, Result<(RelativePathBuf, OutType)>>;

fn add_file(
&self,
path: &Path,
wrap: bool,
) -> LocalBoxFuture<'_, Result<LocalBoxStream<'static, Result<AddEvent>>>>;
) -> BoxFuture<'_, Result<BoxStream<'static, Result<AddEvent>>>>;
fn add_dir(
&self,
path: &Path,
wrap: bool,
) -> LocalBoxFuture<'_, Result<LocalBoxStream<'static, Result<AddEvent>>>>;
) -> BoxFuture<'_, Result<BoxStream<'static, Result<AddEvent>>>>;
fn add_symlink(
&self,
path: &Path,
wrap: bool,
) -> LocalBoxFuture<'_, Result<LocalBoxStream<'static, Result<AddEvent>>>>;
) -> BoxFuture<'_, Result<BoxStream<'static, Result<AddEvent>>>>;

fn check(&self) -> BoxFuture<'_, StatusTable>;
fn watch(&self) -> LocalBoxFuture<'static, LocalBoxStream<'static, StatusTable>>;
fn watch(&self) -> BoxFuture<'static, BoxStream<'static, StatusTable>>;
}

impl Iroh {
Expand Down Expand Up @@ -108,7 +106,7 @@ impl Api for Iroh {
fn get_stream(
&self,
ipfs_path: &IpfsPath,
) -> LocalBoxStream<'_, Result<(RelativePathBuf, OutType)>> {
) -> BoxStream<'_, Result<(RelativePathBuf, OutType)>> {
tracing::debug!("get {:?}", ipfs_path);
let resolver = iroh_resolver::resolver::Resolver::new(self.client.clone());
let results = resolver.resolve_recursive_with_paths(ipfs_path.clone());
Expand Down Expand Up @@ -140,68 +138,66 @@ impl Api for Iroh {
}
}
}
.boxed_local()
.boxed()
}

fn add_file(
&self,
path: &Path,
wrap: bool,
) -> LocalBoxFuture<'_, Result<LocalBoxStream<'static, Result<AddEvent>>>> {
) -> BoxFuture<'_, Result<BoxStream<'static, Result<AddEvent>>>> {
let providing_client = iroh_resolver::unixfs_builder::StoreAndProvideClient {
client: self.client.clone(),
};
let path = path.to_path_buf();
async move {
unixfs_builder::add_file(Some(providing_client), &path, wrap)
.await
.map(|s| s.boxed_local())
.map(|s| s.boxed())
}
.boxed_local()
.boxed()
}

fn add_dir(
&self,
path: &Path,
wrap: bool,
) -> LocalBoxFuture<'_, Result<LocalBoxStream<'static, Result<AddEvent>>>> {
) -> BoxFuture<'_, Result<BoxStream<'static, Result<AddEvent>>>> {
let providing_client = iroh_resolver::unixfs_builder::StoreAndProvideClient {
client: self.client.clone(),
};
let path = path.to_path_buf();
async move {
unixfs_builder::add_dir(Some(providing_client), &path, wrap)
.await
.map(|s| s.boxed_local())
.map(|s| s.boxed())
}
.boxed_local()
.boxed()
}

fn add_symlink(
&self,
path: &Path,
wrap: bool,
) -> LocalBoxFuture<'_, Result<LocalBoxStream<'static, Result<AddEvent>>>> {
) -> BoxFuture<'_, Result<BoxStream<'static, Result<AddEvent>>>> {
let providing_client = iroh_resolver::unixfs_builder::StoreAndProvideClient {
client: self.client.clone(),
};
let path = path.to_path_buf();
async move {
unixfs_builder::add_symlink(Some(providing_client), &path, wrap)
.await
.map(|s| s.boxed_local())
.map(|s| s.boxed())
}
.boxed_local()
.boxed()
}

fn check(&self) -> BoxFuture<'_, StatusTable> {
async { self.client.check().await }.boxed()
}

fn watch(
&self,
) -> LocalBoxFuture<'static, LocalBoxStream<'static, iroh_rpc_client::StatusTable>> {
fn watch(&self) -> BoxFuture<'static, BoxStream<'static, iroh_rpc_client::StatusTable>> {
let client = self.client.clone();
async { client.watch().await.boxed_local() }.boxed_local()
async { client.watch().await.boxed() }.boxed()
}
}
6 changes: 3 additions & 3 deletions iroh-api/src/api_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::path::{Path, PathBuf};
use crate::{AddEvent, Api, Cid, IpfsPath, OutType};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use futures::stream::LocalBoxStream;
use futures::stream::BoxStream;
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
use relative_path::RelativePathBuf;

#[async_trait(?Send)]
#[async_trait]
pub trait ApiExt: Api {
/// High level get, equivalent of CLI `iroh get`
async fn get<'a>(
Expand All @@ -36,7 +36,7 @@ pub trait ApiExt: Api {
&self,
path: &Path,
wrap: bool,
) -> Result<LocalBoxStream<'static, Result<AddEvent>>> {
) -> Result<BoxStream<Result<AddEvent>>> {
if path.is_dir() {
self.add_dir(path, wrap).await
} else if path.is_symlink() {
Expand Down
29 changes: 16 additions & 13 deletions iroh-resolver/src/unixfs_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use async_recursion::async_recursion;
use async_trait::async_trait;
use bytes::Bytes;
use cid::Cid;
use futures::{stream::LocalBoxStream, Stream, StreamExt};
use futures::{stream::BoxStream, Stream, StreamExt};
use iroh_rpc_client::Client;
use prost::Message;
use tokio::io::AsyncRead;
Expand Down Expand Up @@ -69,7 +69,7 @@ impl Directory {
current.expect("must not be empty")
}

pub fn encode<'a>(self) -> LocalBoxStream<'a, Result<Block>> {
pub fn encode<'a>(self) -> BoxStream<'a, Result<Block>> {
async_stream::try_stream! {
let mut links = Vec::new();
for entry in self.entries {
Expand Down Expand Up @@ -125,12 +125,12 @@ impl Directory {
let node = UnixfsNode::Directory(Node { outer, inner });
yield node.encode()?;
}
.boxed_local()
.boxed()
}
}

enum Content {
Reader(Pin<Box<dyn AsyncRead>>),
Reader(Pin<Box<dyn AsyncRead + Send>>),
Path(PathBuf),
}

Expand Down Expand Up @@ -264,7 +264,7 @@ impl Symlink {
pub struct FileBuilder {
name: Option<String>,
path: Option<PathBuf>,
reader: Option<Pin<Box<dyn AsyncRead>>>,
reader: Option<Pin<Box<dyn AsyncRead + Send>>>,
chunk_size: Option<usize>,
degree: Option<usize>,
}
Expand Down Expand Up @@ -318,7 +318,10 @@ impl FileBuilder {
self
}

pub fn content_reader<T: tokio::io::AsyncRead + 'static>(&mut self, content: T) -> &mut Self {
pub fn content_reader<T: tokio::io::AsyncRead + Send + 'static>(
&mut self,
content: T,
) -> &mut Self {
self.reader = Some(Box::pin(content));
self
}
Expand Down Expand Up @@ -489,7 +492,7 @@ pub(crate) fn encode_unixfs_pb(
}

#[async_trait]
pub trait Store {
pub trait Store: Send + Sync {
async fn put(&self, cid: Cid, blob: Bytes, links: Vec<Cid>) -> Result<()>;
}

Expand Down Expand Up @@ -598,10 +601,10 @@ pub enum AddEvent {
Done(Cid),
}

pub async fn add_blocks_to_store<S: Store>(
pub async fn add_blocks_to_store<S: Store + Send>(
store: Option<S>,
mut blocks: Pin<Box<dyn Stream<Item = Result<Block>>>>,
) -> impl Stream<Item = Result<AddEvent>> {
mut blocks: Pin<Box<dyn Stream<Item = Result<Block>> + Send>>,
) -> impl Stream<Item = Result<AddEvent>> + Send {
async_stream::try_stream! {

let mut root = None;
Expand All @@ -622,8 +625,8 @@ pub async fn add_blocks_to_store<S: Store>(
}
}

#[async_recursion(?Send)]
async fn make_dir_from_path<P: Into<PathBuf>>(path: P) -> Result<Directory> {
#[async_recursion]
async fn make_dir_from_path<P: Into<PathBuf> + Send>(path: P) -> Result<Directory> {
let path = path.into();
let mut dir = DirectoryBuilder::new();
dir.name(
Expand Down Expand Up @@ -807,7 +810,7 @@ mod tests {
type TestDir = BTreeMap<String, TestDirEntry>;

/// builds an unixfs directory out of a TestDir
#[async_recursion(?Send)]
#[async_recursion]
async fn build_directory(name: &str, dir: &TestDir) -> Result<Directory> {
let mut builder = DirectoryBuilder::new();
builder.name(name);
Expand Down
14 changes: 7 additions & 7 deletions iroh/src/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn fixture_get() -> MockApi {
OutType::Reader(Box::new(std::io::Cursor::new("hello"))),
)),
])
.boxed_local()
.boxed()
});
api
}
Expand All @@ -62,7 +62,7 @@ fn fixture_add_file() -> MockApi {
.map_err(|e| e.into());

Box::pin(future::ready(Ok(
futures::stream::iter(vec![add_event]).boxed_local()
futures::stream::iter(vec![add_event]).boxed()
)))
});
api
Expand All @@ -76,7 +76,7 @@ fn fixture_add_directory() -> MockApi {
.map_err(|e| e.into());

Box::pin(future::ready(Ok(
futures::stream::iter(vec![add_event]).boxed_local()
futures::stream::iter(vec![add_event]).boxed()
)))
});
api
Expand All @@ -92,7 +92,7 @@ fn fixture_get_wrapped_file() -> MockApi {
OutType::Reader(Box::new(std::io::Cursor::new("hello"))),
)),
])
.boxed_local()
.boxed()
});
api
}
Expand All @@ -104,7 +104,7 @@ fn fixture_get_unwrapped_file() -> MockApi {
RelativePathBuf::from_path("").unwrap(),
OutType::Reader(Box::new(std::io::Cursor::new("hello"))),
))])
.boxed_local()
.boxed()
});
api
}
Expand All @@ -119,7 +119,7 @@ fn fixture_get_wrapped_symlink() -> MockApi {
OutType::Symlink(PathBuf::from("target/path/foo.txt")),
)),
])
.boxed_local()
.boxed()
});
api
}
Expand All @@ -131,7 +131,7 @@ fn fixture_get_unwrapped_symlink() -> MockApi {
RelativePathBuf::from_path("").unwrap(),
OutType::Symlink(PathBuf::from("target/path/foo.txt")),
))])
.boxed_local()
.boxed()
});
api
}
Expand Down

0 comments on commit 59bc0b5

Please sign in to comment.