Skip to content

Commit

Permalink
perf: impl put_many and reuse column families
Browse files Browse the repository at this point in the history
This requires getting rid of a lot of async in the store. Async only in the public interface
  • Loading branch information
rklaehn committed Oct 25, 2022
1 parent 96c6148 commit a47d4ba
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 114 deletions.
80 changes: 72 additions & 8 deletions iroh-resolver/src/unixfs_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use async_recursion::async_recursion;
use async_trait::async_trait;
use bytes::Bytes;
use cid::Cid;
use futures::{stream::LocalBoxStream, Stream, StreamExt};
use futures::stream::TryStreamExt;
use futures::{future, stream::LocalBoxStream, Stream, StreamExt};
use iroh_rpc_client::Client;
use prost::Message;
use tokio::io::AsyncRead;
Expand All @@ -31,7 +32,7 @@ use crate::{
const DIRECTORY_LINK_LIMIT: usize = 6000;

/// How many chunks to buffer up when adding content.
const ADD_PAR: usize = 24;
const _ADD_PAR: usize = 24;

#[derive(Debug, PartialEq)]
enum DirectoryType {
Expand Down Expand Up @@ -496,6 +497,7 @@ pub(crate) fn encode_unixfs_pb(
pub trait Store: 'static + Send + Sync + Clone {
async fn has(&self, &cid: Cid) -> Result<bool>;
async fn put(&self, cid: Cid, blob: Bytes, links: Vec<Cid>) -> Result<()>;
async fn put_many(&self, blocks: Vec<Block>) -> Result<()>;
}

#[async_trait]
Expand All @@ -507,6 +509,12 @@ impl Store for Client {
async fn put(&self, cid: Cid, blob: Bytes, links: Vec<Cid>) -> Result<()> {
self.try_store()?.put(cid, blob, links).await
}

async fn put_many(&self, blocks: Vec<Block>) -> Result<()> {
self.try_store()?
.put_many(blocks.into_iter().map(|x| x.into_parts()).collect())
.await
}
}

#[derive(Debug, Clone)]
Expand All @@ -525,6 +533,13 @@ impl Store for StoreAndProvideClient {
// we provide after insertion is finished
// self.client.try_p2p()?.start_providing(&cid).await
}

async fn put_many(&self, blocks: Vec<Block>) -> Result<()> {
self.client
.try_store()?
.put_many(blocks.into_iter().map(|x| x.into_parts()).collect())
.await
}
}

#[async_trait]
Expand All @@ -536,6 +551,14 @@ impl Store for Arc<tokio::sync::Mutex<std::collections::HashMap<Cid, Bytes>>> {
self.lock().await.insert(cid, blob);
Ok(())
}

async fn put_many(&self, blocks: Vec<Block>) -> Result<()> {
let mut this = self.lock().await;
for block in blocks {
this.insert(*block.cid(), block.data().clone());
}
Ok(())
}
}

/// Adds a single file.
Expand Down Expand Up @@ -617,20 +640,54 @@ pub enum AddEvent {
},
}

pub async fn add_blocks_to_store<S: Store>(
use async_stream::stream;

fn add_blocks_to_store_chunked<S: Store>(
store: S,
mut blocks: Pin<Box<dyn Stream<Item = Result<Block>>>>,
) -> impl Stream<Item = Result<AddEvent>> {
let mut chunk = Vec::new();
let mut chunk_size = 0u64;
const MAX_CHUNK_SIZE: u64 = 1024 * 1024 * 16;
let t = stream! {
while let Some(block) = blocks.next().await {
let block = block?;
let block_size = block.data().len() as u64;
let cid = *block.cid();
if chunk_size + block_size > MAX_CHUNK_SIZE {
tracing::info!("adding chunk of {} bytes", chunk_size);
store.put_many(chunk.clone()).await?;
chunk.clear();
chunk_size = 0;
} else {
chunk_size += block_size;
}
yield Ok(AddEvent::ProgressDelta {
cid,
size: block.raw_data_size(),
});
}
// make sure to also send the last chunk!
store.put_many(chunk).await?;
};
t
}

fn _add_blocks_to_store_single<S: Store>(
store: Option<S>,
blocks: Pin<Box<dyn Stream<Item = Result<Block>>>>,
) -> impl Stream<Item = Result<AddEvent>> {
blocks
.map(move |block| {
.and_then(|x| future::ok(vec![x]))
.map(move |blocks| {
let store = store.clone();
async move {
let block = block?;
let block = blocks?[0].clone();
let raw_data_size = block.raw_data_size();
let (cid, bytes, links) = block.into_parts();
let cid = *block.cid();
if let Some(store) = store {
if !store.has(cid).await? {
store.put(cid, bytes, links).await?;
store.put_many(vec![block]).await?;
}
}

Expand All @@ -640,7 +697,14 @@ pub async fn add_blocks_to_store<S: Store>(
})
}
})
.buffered(ADD_PAR)
.buffered(_ADD_PAR)
}

pub async fn add_blocks_to_store<S: Store>(
store: Option<S>,
blocks: Pin<Box<dyn Stream<Item = Result<Block>>>>,
) -> impl Stream<Item = Result<AddEvent>> {
add_blocks_to_store_chunked(store.unwrap(), blocks)
}

#[async_recursion(?Send)]
Expand Down
18 changes: 16 additions & 2 deletions iroh-rpc-client/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use futures::Stream;
#[cfg(feature = "grpc")]
use iroh_rpc_types::store::store_client::StoreClient as GrpcStoreClient;
use iroh_rpc_types::store::{
GetLinksRequest, GetRequest, GetSizeRequest, HasRequest, PutRequest, Store, StoreClientAddr,
StoreClientBackend,
GetLinksRequest, GetRequest, GetSizeRequest, HasRequest, PutManyRequest, PutRequest, Store,
StoreClientAddr, StoreClientBackend,
};
use iroh_rpc_types::Addr;
#[cfg(feature = "grpc")]
Expand Down Expand Up @@ -40,6 +40,20 @@ impl StoreClient {
Ok(())
}

#[tracing::instrument(skip(self, blocks))]
pub async fn put_many(&self, blocks: Vec<(Cid, Bytes, Vec<Cid>)>) -> Result<()> {
let blocks = blocks
.into_iter()
.map(|(cid, blob, links)| PutRequest {
cid: cid.to_bytes(),
blob,
links: links.iter().map(|l| l.to_bytes()).collect(),
})
.collect();
self.backend.put_many(PutManyRequest { blocks }).await?;
Ok(())
}

#[tracing::instrument(skip(self))]
pub async fn get(&self, cid: Cid) -> Result<Option<Bytes>> {
let req = GetRequest {
Expand Down
7 changes: 7 additions & 0 deletions iroh-rpc-types/proto/store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "google/protobuf/empty.proto";
service Store {
rpc Version(google.protobuf.Empty) returns (VersionResponse) {}
rpc Put(PutRequest) returns (google.protobuf.Empty) {}
rpc PutMany(PutManyRequest) returns (google.protobuf.Empty) {}
rpc Get(GetRequest) returns (GetResponse) {}
rpc Has(HasRequest) returns (HasResponse) {}
rpc GetLinks(GetLinksRequest) returns(GetLinksResponse) {}
Expand All @@ -17,6 +18,12 @@ message VersionResponse {
string version = 1;
}

// this should really be a stream
// but that would require a rewrite of the proxy! macro
message PutManyRequest {
repeated PutRequest blocks = 1;
}

message PutRequest {
// Serialized CID of the given block.
bytes cid = 1;
Expand Down
2 changes: 2 additions & 0 deletions iroh-rpc-types/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ include_proto!("store");
proxy!(
Store,
version: () => VersionResponse => VersionResponse,

put: PutRequest => () => (),
put_many: PutManyRequest => () => (),
get: GetRequest => GetResponse => GetResponse,
has: HasRequest => HasResponse => HasResponse,
get_links: GetLinksRequest => GetLinksResponse => GetLinksResponse,
Expand Down
17 changes: 16 additions & 1 deletion iroh-store/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use bytes::BytesMut;
use cid::Cid;
use iroh_rpc_types::store::{
GetLinksRequest, GetLinksResponse, GetRequest, GetResponse, GetSizeRequest, GetSizeResponse,
HasRequest, HasResponse, PutRequest, Store as RpcStore, StoreServerAddr, VersionResponse,
HasRequest, HasResponse, PutManyRequest, PutRequest, Store as RpcStore, StoreServerAddr,
VersionResponse,
};
use tracing::info;

Expand Down Expand Up @@ -35,6 +36,20 @@ impl RpcStore for Store {
Ok(res)
}

#[tracing::instrument(skip(self, req))]
async fn put_many(&self, req: PutManyRequest) -> Result<()> {
let req = req
.blocks
.into_iter()
.map(|req| {
let cid = cid_from_bytes(req.cid)?;
let links = links_from_bytes(req.links)?;
Ok((cid, req.blob, links))
})
.collect::<Result<Vec<_>>>()?;
self.put_many(req)
}

#[tracing::instrument(skip(self))]
async fn get(&self, req: GetRequest) -> Result<GetResponse> {
let cid = cid_from_bytes(req.cid)?;
Expand Down
Loading

0 comments on commit a47d4ba

Please sign in to comment.