Skip to content

Commit

Permalink
Download pieces in batches for piece cache sync
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Oct 19, 2024
1 parent 1755b51 commit 52c5731
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 81 deletions.
196 changes: 115 additions & 81 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use futures::channel::mpsc;
use futures::future::FusedFuture;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, stream, FutureExt, SinkExt, Stream, StreamExt};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use rayon::prelude::*;
use std::collections::hash_map::Entry;
Expand All @@ -42,7 +43,8 @@ use tokio::task::{block_in_place, yield_now};
use tracing::{debug, error, info, info_span, trace, warn, Instrument};

const WORKER_CHANNEL_CAPACITY: usize = 100;
const CONCURRENT_PIECES_TO_DOWNLOAD: usize = 1_000;
const SYNC_BATCH_SIZE: usize = 256;
const SYNC_CONCURRENT_BATCHES: usize = 4;
/// Make caches available as they are building without waiting for the initialization to finish,
/// this number defines an interval in pieces after which cache is updated
const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100;
Expand Down Expand Up @@ -524,98 +526,130 @@ where
"Identified piece indices that should be cached",
);

let mut piece_indices_to_store = piece_indices_to_store.into_values().collect::<Vec<_>>();
// Sort pieces such that they are in ascending order and have higher chance of download
// overlapping with other processes like node's sync from DSN
piece_indices_to_store.par_sort_unstable();
let mut piece_indices_to_store = piece_indices_to_store.into_iter();

let download_piece = |piece_index| async move {
trace!(%piece_index, "Downloading piece");
let pieces_to_download_total = piece_indices_to_store.len();
let piece_indices_to_store = piece_indices_to_store
.into_values()
.collect::<Vec<_>>()
// TODO: Allocating chunks here shouldn't be necessary, but otherwise it fails with
// confusing error described in https://github.com/rust-lang/rust/issues/64552 and
// similar upstream issues
.chunks(SYNC_BATCH_SIZE)
.map(|chunk| chunk.to_vec())
.collect::<Vec<_>>();

let result = piece_getter.get_piece(piece_index).await;
let downloaded_pieces_count = AtomicUsize::new(0);
let caches = Mutex::new(caches);
self.handlers.progress.call_simple(&0.0);

match result {
Ok(Some(piece)) => {
trace!(%piece_index, "Downloaded piece successfully");
let downloading_pieces_stream =
stream::iter(piece_indices_to_store.into_iter().map(|piece_indices| {
let downloaded_pieces_count = &downloaded_pieces_count;
let caches = &caches;

Some((piece_index, piece))
}
Ok(None) => {
debug!(%piece_index, "Couldn't find piece");
None
}
Err(error) => {
debug!(%error, %piece_index, "Failed to get piece for piece cache");
None
}
}
};
async move {
let mut pieces_stream = match piece_getter.get_pieces(piece_indices).await {
Ok(pieces_stream) => pieces_stream,
Err(error) => {
error!(
%error,
"Failed to get pieces from piece getter"
);
return;
}
};

let pieces_to_download_total = piece_indices_to_store.len();
let mut downloading_pieces = piece_indices_to_store
.by_ref()
.take(CONCURRENT_PIECES_TO_DOWNLOAD)
.map(download_piece)
.collect::<FuturesUnordered<_>>();
while let Some((piece_index, result)) = pieces_stream.next().await {
let piece = match result {
Ok(Some(piece)) => {
trace!(%piece_index, "Downloaded piece successfully");
piece
}
Ok(None) => {
debug!(%piece_index, "Couldn't find piece");
continue;
}
Err(error) => {
debug!(
%error,
%piece_index,
"Failed to get piece for piece cache"
);
continue;
}
};

let mut downloaded_pieces_count = 0;
self.handlers.progress.call_simple(&0.0);
while let Some(maybe_piece) = downloading_pieces.next().await {
// Push another piece to download
if let Some(piece_index_to_download) = piece_indices_to_store.next() {
downloading_pieces.push(download_piece(piece_index_to_download));
}
let (offset, maybe_backend) = {
let mut caches = caches.lock();

let Some((piece_index, piece)) = &maybe_piece else {
continue;
};
// Find plot in which there is a place for new piece to be stored
let Some(offset) = caches.pop_free_offset() else {
error!(
%piece_index,
"Failed to store piece in cache, there was no space"
);
break;
};

// Find plot in which there is a place for new piece to be stored
let Some(offset) = caches.pop_free_offset() else {
error!(
%piece_index,
"Failed to store piece in cache, there was no space"
);
break;
};
(offset, caches.get_backend(offset.cache_index).cloned())
};

let cache_index = offset.cache_index;
let piece_offset = offset.piece_offset;
if let Some(backend) = caches.get_backend(cache_index)
&& let Err(error) = backend.write_piece(piece_offset, *piece_index, piece).await
{
// TODO: Will likely need to cache problematic backend indices to avoid hitting it over and over again repeatedly
error!(
%error,
%cache_index,
%piece_index,
%piece_offset,
"Failed to write piece into cache"
);
continue;
}
let cache_index = offset.cache_index;
let piece_offset = offset.piece_offset;

let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
caches.push_stored_piece(key, offset);
if let Some(backend) = maybe_backend
&& let Err(error) =
backend.write_piece(piece_offset, piece_index, &piece).await
{
// TODO: Will likely need to cache problematic backend indices to avoid hitting it over and over again repeatedly
error!(
%error,
%cache_index,
%piece_index,
%piece_offset,
"Failed to write piece into cache"
);
continue;
}

downloaded_pieces_count += 1;
// Do not print anything or send progress notification after last piece until piece
// cache is written fully below
if downloaded_pieces_count != pieces_to_download_total {
let progress =
downloaded_pieces_count as f32 / pieces_to_download_total as f32 * 100.0;
if downloaded_pieces_count % INTERMEDIATE_CACHE_UPDATE_INTERVAL == 0 {
self.piece_caches.write().await.clone_from(&caches);
let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash());
caches.lock().push_stored_piece(key, offset);

let prev_downloaded_pieces_count =
downloaded_pieces_count.fetch_add(1, Ordering::Relaxed);
// Do not print anything or send progress notification after last piece
// until piece cache is written fully below
if prev_downloaded_pieces_count != pieces_to_download_total {
let progress = prev_downloaded_pieces_count as f32
/ pieces_to_download_total as f32
* 100.0;
if prev_downloaded_pieces_count % INTERMEDIATE_CACHE_UPDATE_INTERVAL
== 0
{
let mut piece_caches = self.piece_caches.write().await;
piece_caches.clone_from(&caches.lock());

info!("Piece cache sync {progress:.2}% complete");
}

info!("Piece cache sync {progress:.2}% complete");
self.handlers.progress.call_simple(&progress);
}
}
}

self.handlers.progress.call_simple(&progress);
}
}));
// Download two batches concurrently to make sure slow tail of one is compensated by another
let mut downloading_pieces_stream =
downloading_pieces_stream.buffer_unordered(SYNC_CONCURRENT_BATCHES);
// TODO: Can't use this due to https://github.com/rust-lang/rust/issues/64650
// Simply drain everything
// .for_each(|()| async {})

// TODO: Remove once https://github.com/rust-lang/rust/issues/64650 is resolved
while let Some(()) = downloading_pieces_stream.next().await {
// Simply drain everything
}
drop(downloading_pieces_stream);

*self.piece_caches.write().await = caches;
*self.piece_caches.write().await = caches.into_inner();
self.handlers.progress.call_simple(&100.0);
*last_segment_index_internal = last_segment_index;

Expand Down Expand Up @@ -779,7 +813,7 @@ where
let piece_indices = (*last_segment_index_internal..=last_segment_index)
.flat_map(|segment_index| segment_index.segment_piece_indexes());

// TODO: Can probably do concurrency here
// TODO: Download pieces concurrently
for piece_index in piece_indices {
if !self
.piece_caches
Expand Down Expand Up @@ -1146,7 +1180,7 @@ where
piece_indices: PieceIndices,
) -> impl Stream<Item = (PieceIndex, Option<Piece>)> + Send + Unpin + 'a
where
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send> + Send + 'a,
PieceIndices: IntoIterator<Item = PieceIndex, IntoIter: Send + 'a> + Send + 'a,
{
let mut pieces_to_get_from_plot_cache = Vec::new();

Expand Down
2 changes: 2 additions & 0 deletions crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ where
Some(free_offset)
}
None => {
// TODO: Use simple round robin for uniform filling instead of re-sorting all the
// time
// Sort piece caches by number of stored pieces to fill those that are less
// populated first
let mut sorted_backends = self
Expand Down

0 comments on commit 52c5731

Please sign in to comment.