diff --git a/iroh-resolver/tests/roundtrip.rs b/iroh-resolver/tests/roundtrip.rs index 5ac621c434..d0fa83cea4 100644 --- a/iroh-resolver/tests/roundtrip.rs +++ b/iroh-resolver/tests/roundtrip.rs @@ -27,9 +27,12 @@ type TestDir = BTreeMap; /// builds an unixfs directory out of a TestDir #[async_recursion(?Send)] -async fn build_directory(name: &str, dir: &TestDir) -> Result { +async fn build_directory(name: &str, dir: &TestDir, hamt: bool) -> Result { let mut builder = DirectoryBuilder::new(); builder.name(name); + if hamt { + builder.hamt(); + } for (name, entry) in dir { match entry { TestDirEntry::File(content) => { @@ -41,7 +44,7 @@ async fn build_directory(name: &str, dir: &TestDir) -> Result { builder.add_file(file); } TestDirEntry::Directory(dir) => { - let dir = build_directory(name, dir).await?; + let dir = build_directory(name, dir, hamt).await?; builder.add_dir(dir)?; } } @@ -111,8 +114,8 @@ async fn build_testdir( } /// a roundtrip test that converts a dir to an unixfs DAG and back -async fn dir_roundtrip_test(dir: TestDir) -> Result { - let directory = build_directory("", &dir).await?; +async fn dir_roundtrip_test(dir: TestDir, hamt: bool) -> Result { + let directory = build_directory("", &dir, hamt).await?; let stream = directory.encode(); let (root, resolver) = stream_to_resolver(stream).await?; let stream = @@ -122,11 +125,11 @@ async fn dir_roundtrip_test(dir: TestDir) -> Result { } /// sync version of dir_roundtrip_test for use in proptest -fn dir_roundtrip_test_sync(dir: TestDir) -> bool { +fn dir_roundtrip_test_sync(dir: TestDir, hamt: bool) -> bool { tokio::runtime::Builder::new_current_thread() .build() .unwrap() - .block_on(dir_roundtrip_test(dir)) + .block_on(dir_roundtrip_test(dir, hamt)) .unwrap() } @@ -184,13 +187,14 @@ fn file_roundtrip_test_sync(data: Bytes, chunk_size: usize, degree: usize) -> bo fn arb_test_dir() -> impl Strategy { // create an arbitrary nested directory structure + // zero size file names are not generated, since they are not allowed and don't work with hamt directories fn arb_dir_entry() -> impl Strategy { let leaf = any::>().prop_map(|x| TestDirEntry::File(Bytes::from(x))); leaf.prop_recursive(3, 64, 10, |inner| { - prop::collection::btree_map(".*", inner, 0..10).prop_map(TestDirEntry::Directory) + prop::collection::btree_map(".+", inner, 0..10).prop_map(TestDirEntry::Directory) }) } - prop::collection::btree_map(".*", arb_dir_entry(), 0..10) + prop::collection::btree_map(".+", arb_dir_entry(), 0..10) } fn arb_degree() -> impl Strategy { @@ -211,8 +215,35 @@ proptest! { #[test] fn test_dir_roundtrip(data in arb_test_dir()) { - assert!(dir_roundtrip_test_sync(data)); + assert!(dir_roundtrip_test_sync(data, false)); } + + #[test] + fn test_dir_roundtrip_hamt(data in arb_test_dir()) { + assert!(dir_roundtrip_test_sync(data, true)); + } +} + +#[test] +fn test_hamt_roundtrip_1() { + let mut dir = TestDir::new(); + dir.insert("foo".to_string(), TestDirEntry::File(Bytes::from("bar"))); + dir.insert("fnord".to_string(), TestDirEntry::File(Bytes::from("baz"))); + assert!(dir_roundtrip_test_sync(dir, true)); +} + +#[test] +fn test_hamt_roundtrip_2() { + let mut dir = TestDir::new(); + dir.insert("foo".to_string(), TestDirEntry::File(Bytes::from("bar"))); + assert!(dir_roundtrip_test_sync(dir, true)); +} + +#[test] +fn test_hamt_roundtrip_3() { + let mut dir = TestDir::new(); + dir.insert("a".to_string(), TestDirEntry::File(Bytes::from("bar"))); + assert!(dir_roundtrip_test_sync(dir, true)); } #[tokio::test] diff --git a/iroh-store/src/rpc.rs b/iroh-store/src/rpc.rs index 4c2e3d6e3f..0e1892bd26 100644 --- a/iroh-store/src/rpc.rs +++ b/iroh-store/src/rpc.rs @@ -1,5 +1,3 @@ -use std::result; - use anyhow::Result; use bytes::BytesMut; use iroh_rpc_client::{create_server, ServerError, ServerSocket, StoreServer}; @@ -8,6 +6,7 @@ use iroh_rpc_types::store::{ HasRequest, HasResponse, PutManyRequest, PutRequest, StoreAddr, StoreRequest, StoreService, VersionRequest, VersionResponse, }; +use std::result; use tracing::info; use crate::store::Store; diff --git a/iroh-unixfs/src/builder.rs b/iroh-unixfs/src/builder.rs index 4d462a0d68..2d95b34917 100644 --- a/iroh-unixfs/src/builder.rs +++ b/iroh-unixfs/src/builder.rs @@ -1,4 +1,5 @@ use std::{ + collections::BTreeMap, fmt::Debug, path::{Path, PathBuf}, pin::Pin, @@ -10,8 +11,11 @@ use async_recursion::async_recursion; use async_trait::async_trait; use bytes::Bytes; use cid::Cid; -use futures::stream::TryStreamExt; use futures::{future, stream::LocalBoxStream, Stream, StreamExt}; +use futures::{ + stream::{self, TryStreamExt}, + TryFutureExt, +}; use iroh_rpc_client::Client; use prost::Message; use tokio::io::AsyncRead; @@ -19,8 +23,9 @@ use tokio::io::AsyncRead; use crate::{ balanced_tree::{TreeBuilder, DEFAULT_DEGREE}, chunker::{self, Chunker, ChunkerConfig, DEFAULT_CHUNK_SIZE_LIMIT}, + hamt::{bitfield::Bitfield, bits, hash_key}, types::Block, - unixfs::{dag_pb, unixfs_pb, DataType, Node, UnixfsNode}, + unixfs::{dag_pb, dag_pb::PbLink, unixfs_pb, DataType, HamtHashFunction, Node, UnixfsNode}, }; // The maximum number of links we allow in a directory @@ -43,81 +48,37 @@ enum DirectoryType { /// Representation of a constructed Directory. #[derive(Debug, PartialEq)] -pub struct Directory { +pub enum Directory { + Basic(BasicDirectory), + Hamt(HamtDirectory), +} + +#[derive(Debug, PartialEq)] +pub struct BasicDirectory { name: String, entries: Vec, } -impl Directory { - pub fn name(&self) -> &str { - &self.name - } - - /// Wrap an entry in an unnamed directory. Used when adding a unixfs file or top level directory to - /// Iroh in order to preserve the file or directory's name. - pub fn wrap(self) -> Self { - Directory { - name: "".into(), - entries: vec![Entry::Directory(self)], - } - } - - pub async fn encode_root(self) -> Result { - let mut current = None; - let parts = self.encode(); - tokio::pin!(parts); - - while let Some(part) = parts.next().await { - current = Some(part); - } - - current.expect("must not be empty") - } - +impl BasicDirectory { pub fn encode<'a>(self) -> LocalBoxStream<'a, Result> { async_stream::try_stream! { let mut links = Vec::new(); for entry in self.entries { - let (name, root) = match entry { - Entry::File(file) => { - let name = file.name().to_string(); - let parts = file.encode().await?; - tokio::pin!(parts); - let mut root = None; - while let Some(part) = parts.next().await { - let block = part?; - root = Some(block.clone()); - yield block; - } - (name, root) - } - Entry::Directory(dir) => { - let name = dir.name.clone(); - let parts = dir.encode(); - tokio::pin!(parts); - let mut root = None; - while let Some(part) = parts.next().await { - let block = part?; - root = Some(block.clone()); - yield block; - } - (name, root) - } - Entry::Symlink(sym) => { - let name = sym.name().to_string(); - let block = sym.encode()?; - let root = Some(block.clone()); - yield block; - (name, root) - } - }; + let name = entry.name().to_string(); + let parts = entry.encode().await?; + tokio::pin!(parts); + let mut root = None; + while let Some(part) = parts.next().await { + let block = part?; + root = Some(block.clone()); + yield block; + } let root_block = root.expect("file must not be empty"); links.push(dag_pb::PbLink { hash: Some(root_block.cid().to_bytes()), name: Some(name), tsize: Some(root_block.data().len() as u64), }); - } // directory itself comes last @@ -134,6 +95,60 @@ impl Directory { } } +#[derive(Debug, PartialEq)] +pub struct HamtDirectory { + name: String, + hamt: Box, +} + +impl HamtDirectory { + pub fn encode<'a>(self) -> LocalBoxStream<'a, Result> { + self.hamt.encode() + } +} + +impl Directory { + fn single(name: String, entry: Entry) -> Self { + Directory::basic(name, vec![entry]) + } + + fn basic(name: String, entries: Vec) -> Self { + Directory::Basic(BasicDirectory { name, entries }) + } + + pub fn name(&self) -> &str { + match &self { + Directory::Basic(BasicDirectory { name, .. }) => name, + Directory::Hamt(HamtDirectory { name, .. }) => name, + } + } + + /// Wrap an entry in an unnamed directory. Used when adding a unixfs file or top level directory to + /// Iroh in order to preserve the file or directory's name. + pub fn wrap(self) -> Self { + Directory::single("".into(), Entry::Directory(self)) + } + + pub async fn encode_root(self) -> Result { + let mut current = None; + let parts = self.encode(); + tokio::pin!(parts); + + while let Some(part) = parts.next().await { + current = Some(part); + } + + current.expect("must not be empty") + } + + pub fn encode<'a>(self) -> LocalBoxStream<'a, Result> { + match self { + Directory::Basic(basic) => basic.encode(), + Directory::Hamt(hamt) => hamt.encode(), + } + } +} + enum Content { Reader(Pin>), Path(PathBuf), @@ -184,10 +199,7 @@ impl File { } pub fn wrap(self) -> Directory { - Directory { - name: "".into(), - entries: vec![Entry::File(self)], - } + Directory::single("".into(), Entry::File(self)) } pub async fn encode_root(self) -> Result { @@ -237,10 +249,7 @@ impl Symlink { } pub fn wrap(self) -> Directory { - Directory { - name: "".into(), - entries: vec![Entry::Symlink(self)], - } + Directory::single("".into(), Entry::Symlink(self)) } pub fn name(&self) -> &str { @@ -397,6 +406,24 @@ enum Entry { Symlink(Symlink), } +impl Entry { + pub fn name(&self) -> &str { + match self { + Entry::File(f) => f.name(), + Entry::Directory(d) => d.name(), + Entry::Symlink(s) => s.name(), + } + } + + pub async fn encode(self) -> Result>> { + Ok(match self { + Entry::File(f) => f.encode().await?.boxed_local(), + Entry::Directory(d) => d.encode(), + Entry::Symlink(s) => stream::iter(Some(s.encode())).boxed_local(), + }) + } +} + /// Construct a UnixFS directory. #[derive(Debug)] pub struct DirectoryBuilder { @@ -455,11 +482,119 @@ impl DirectoryBuilder { name, entries, typ, .. } = self; - ensure!(typ == DirectoryType::Basic, "too many links to fit into one chunk, must be encoded as a HAMT. However, HAMT creation has not yet been implemented."); + match typ { + DirectoryType::Basic => { + let name = name.unwrap_or_default(); + Ok(Directory::Basic(BasicDirectory { name, entries })) + } + DirectoryType::Hamt => { + let name = name.unwrap_or_default(); + let hamt = Box::new(HamtNode::new(entries)); + Ok(Directory::Hamt(HamtDirectory { name, hamt })) + } + } + } +} - let name = name.unwrap_or_default(); +/// A leaf when building a hamt directory. +/// +/// Basically just an entry and the hash of its name. +#[derive(Debug, PartialEq)] +pub struct HamtLeaf([u8; 8], Entry); - Ok(Directory { name, entries }) +/// A node when building a hamt directory. +/// +/// Either a branch or a leaf. Root will always be a branch, +/// even if it has only one child. +#[derive(Debug, PartialEq)] +enum HamtNode { + Branch(BTreeMap), + Leaf(HamtLeaf), +} + +impl HamtNode { + pub(super) fn new(entries: Vec) -> HamtNode { + // add the hash + let entries = entries + .into_iter() + .map(|entry| { + let name = entry.name().to_string(); + let hash = hash_key(name.as_bytes()); + HamtLeaf(hash, entry) + }) + .collect::>(); + Self::group(entries, 0, 8) + } + + fn group(leafs: Vec, pos: u32, len: u32) -> HamtNode { + if leafs.len() == 1 && pos > 0 { + HamtNode::Leaf(leafs.into_iter().next().unwrap()) + } else { + let mut res = BTreeMap::>::new(); + for leaf in leafs { + let value = bits(&leaf.0, pos, len); + res.entry(value).or_default().push(leaf); + } + let res = res + .into_iter() + .map(|(key, leafs)| { + let node = Self::group(leafs, pos + len, len); + (key, node) + }) + .collect(); + HamtNode::Branch(res) + } + } + + fn name(&self) -> &str { + match self { + HamtNode::Branch(_) => "", + HamtNode::Leaf(HamtLeaf(_, entry)) => entry.name(), + } + } + + pub fn encode<'a>(self) -> LocalBoxStream<'a, Result> { + match self { + Self::Branch(tree) => { + async_stream::try_stream! { + let mut links = Vec::with_capacity(tree.len()); + let mut bitfield = Bitfield::default(); + for (prefix, node) in tree { + let name = format!("{:02X}{}", prefix, node.name()); + bitfield.set_bit(prefix); + let blocks = node.encode(); + let mut root = None; + tokio::pin!(blocks); + while let Some(block) = blocks.next().await { + let block = block?; + root = Some(*block.cid()); + yield block; + } + links.push(PbLink { + name: Some(name), + hash: root.map(|cid| cid.to_bytes()), + tsize: None, + }); + } + let inner = unixfs_pb::Data { + r#type: DataType::HamtShard as i32, + hash_type: Some(HamtHashFunction::Murmur3 as u64), + fanout: Some(256), + data: Some(bitfield.as_bytes().to_vec().into()), + ..Default::default() + }; + let outer = encode_unixfs_pb(&inner, links).unwrap(); + // it does not really matter what enum variant we choose here as long as + // it is not raw. The type of the node will be HamtShard from above. + let node = UnixfsNode::Directory(crate::unixfs::Node { outer, inner }); + yield node.encode()?; + } + .boxed_local() + } + Self::Leaf(HamtLeaf(_hash, entry)) => async move { entry.encode().await } + .try_flatten_stream() + .boxed_local(), + } } } @@ -1019,9 +1154,6 @@ mod tests { // at directory link limit should be processed as a hamt assert_eq!(DirectoryType::Hamt, builder.typ); - if (builder.build()).is_ok() { - panic!("expected builder to error when attempting to build a hamt directory") - } Ok(()) } @@ -1053,35 +1185,37 @@ mod tests { // create directory manually let nested_file = FileBuilder::new().path(nested_file_path).build().await?; - let nested_dir = Directory { - name: String::from( + let nested_dir = Directory::single( + String::from( nested_dir_path .file_name() .and_then(|s| s.to_str()) .unwrap(), ), - entries: vec![Entry::File(nested_file)], - }; + Entry::File(nested_file), + ); let file = FileBuilder::new().path(file_path).build().await?; - let expected = Directory { - name: String::from(dir.clone().file_name().and_then(|s| s.to_str()).unwrap()), - entries: vec![Entry::File(file), Entry::Directory(nested_dir)], - }; + let expected = Directory::basic( + String::from(dir.clone().file_name().and_then(|s| s.to_str()).unwrap()), + vec![Entry::File(file), Entry::Directory(nested_dir)], + ); - let mut got = make_dir_from_path(dir, Chunker::Fixed(chunker::Fixed::default())).await?; + let got = make_dir_from_path(dir, Chunker::Fixed(chunker::Fixed::default())).await?; + + let e = |dir: Directory| match dir { + Directory::Basic(basic) => basic.entries, + _ => panic!("expected directory"), + }; // Before comparison sort entries to make test deterministic. // The readdir_r function is used in the underlying platform which // gives no guarantee to return in a specific order. // https://stackoverflow.com/questions/40021882/how-to-sort-readdir-iterator - got.entries.sort_by_key(|entry| match entry { - Entry::Directory(dir) => dir.name.clone(), - Entry::File(file) => file.name.clone(), - Entry::Symlink(sym) => sym.name().to_string(), - }); - + let expected = e(expected); + let mut got = e(got); + got.sort_by_key(|entry| entry.name().to_string()); assert_eq!(expected, got); Ok(()) } diff --git a/iroh-unixfs/src/hamt.rs b/iroh-unixfs/src/hamt.rs index 4f38358fb5..c57e96a361 100644 --- a/iroh-unixfs/src/hamt.rs +++ b/iroh-unixfs/src/hamt.rs @@ -13,7 +13,7 @@ use async_recursion::async_recursion; use self::{bitfield::Bitfield, hash_bits::HashBits}; #[allow(dead_code)] -mod bitfield; +pub(crate) mod bitfield; mod hash_bits; const HASH_BIT_LENGTH: usize = 8; @@ -300,7 +300,7 @@ impl Node { /// Hashes with murmur3 x64 and returns the first 64 bits. /// This matches what go-unixfs uses. -fn hash_key(key: &[u8]) -> [u8; HASH_BIT_LENGTH] { +pub(crate) fn hash_key(key: &[u8]) -> [u8; HASH_BIT_LENGTH] { let full = fastmurmur3::hash(key); // [h1, h2] let bytes = full.to_ne_bytes(); @@ -310,7 +310,12 @@ fn hash_key(key: &[u8]) -> [u8; HASH_BIT_LENGTH] { h1.to_be_bytes() } -fn log2(x: u32) -> u32 { +pub(crate) fn bits(hash: &[u8; 8], pos: u32, len: u32) -> u32 { + let mut hash = HashBits::new_at_index(hash, pos); + hash.next(len).unwrap() +} + +pub(crate) fn log2(x: u32) -> u32 { assert!(x > 0); u32::BITS as u32 - x.leading_zeros() - 1 }