diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 6ec7a1a139..a215176817 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -19,6 +19,7 @@ use futures::channel::mpsc; use futures::future::FusedFuture; use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::{select, stream, FutureExt, SinkExt, Stream, StreamExt}; +use parking_lot::Mutex; use prometheus_client::registry::Registry; use rayon::prelude::*; use std::collections::hash_map::Entry; @@ -42,7 +43,8 @@ use tokio::task::{block_in_place, yield_now}; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; const WORKER_CHANNEL_CAPACITY: usize = 100; -const CONCURRENT_PIECES_TO_DOWNLOAD: usize = 1_000; +const SYNC_BATCH_SIZE: usize = 256; +const SYNC_CONCURRENT_BATCHES: usize = 4; /// Make caches available as they are building without waiting for the initialization to finish, /// this number defines an interval in pieces after which cache is updated const INTERMEDIATE_CACHE_UPDATE_INTERVAL: usize = 100; @@ -524,98 +526,130 @@ where "Identified piece indices that should be cached", ); - let mut piece_indices_to_store = piece_indices_to_store.into_values().collect::>(); - // Sort pieces such that they are in ascending order and have higher chance of download - // overlapping with other processes like node's sync from DSN - piece_indices_to_store.par_sort_unstable(); - let mut piece_indices_to_store = piece_indices_to_store.into_iter(); - - let download_piece = |piece_index| async move { - trace!(%piece_index, "Downloading piece"); + let pieces_to_download_total = piece_indices_to_store.len(); + let piece_indices_to_store = piece_indices_to_store + .into_values() + .collect::>() + // TODO: Allocating chunks here shouldn't be necessary, but otherwise it fails with + // confusing error described in https://github.com/rust-lang/rust/issues/64552 and + // similar upstream issues + .chunks(SYNC_BATCH_SIZE) + .map(|chunk| chunk.to_vec()) + .collect::>(); - let result = piece_getter.get_piece(piece_index).await; + let downloaded_pieces_count = AtomicUsize::new(0); + let caches = Mutex::new(caches); + self.handlers.progress.call_simple(&0.0); - match result { - Ok(Some(piece)) => { - trace!(%piece_index, "Downloaded piece successfully"); + let downloading_pieces_stream = + stream::iter(piece_indices_to_store.into_iter().map(|piece_indices| { + let downloaded_pieces_count = &downloaded_pieces_count; + let caches = &caches; - Some((piece_index, piece)) - } - Ok(None) => { - debug!(%piece_index, "Couldn't find piece"); - None - } - Err(error) => { - debug!(%error, %piece_index, "Failed to get piece for piece cache"); - None - } - } - }; + async move { + let mut pieces_stream = match piece_getter.get_pieces(piece_indices).await { + Ok(pieces_stream) => pieces_stream, + Err(error) => { + error!( + %error, + "Failed to get pieces from piece getter" + ); + return; + } + }; - let pieces_to_download_total = piece_indices_to_store.len(); - let mut downloading_pieces = piece_indices_to_store - .by_ref() - .take(CONCURRENT_PIECES_TO_DOWNLOAD) - .map(download_piece) - .collect::>(); + while let Some((piece_index, result)) = pieces_stream.next().await { + let piece = match result { + Ok(Some(piece)) => { + trace!(%piece_index, "Downloaded piece successfully"); + piece + } + Ok(None) => { + debug!(%piece_index, "Couldn't find piece"); + continue; + } + Err(error) => { + debug!( + %error, + %piece_index, + "Failed to get piece for piece cache" + ); + continue; + } + }; - let mut downloaded_pieces_count = 0; - self.handlers.progress.call_simple(&0.0); - while let Some(maybe_piece) = downloading_pieces.next().await { - // Push another piece to download - if let Some(piece_index_to_download) = piece_indices_to_store.next() { - downloading_pieces.push(download_piece(piece_index_to_download)); - } + let (offset, maybe_backend) = { + let mut caches = caches.lock(); - let Some((piece_index, piece)) = &maybe_piece else { - continue; - }; + // Find plot in which there is a place for new piece to be stored + let Some(offset) = caches.pop_free_offset() else { + error!( + %piece_index, + "Failed to store piece in cache, there was no space" + ); + break; + }; - // Find plot in which there is a place for new piece to be stored - let Some(offset) = caches.pop_free_offset() else { - error!( - %piece_index, - "Failed to store piece in cache, there was no space" - ); - break; - }; + (offset, caches.get_backend(offset.cache_index).cloned()) + }; - let cache_index = offset.cache_index; - let piece_offset = offset.piece_offset; - if let Some(backend) = caches.get_backend(cache_index) - && let Err(error) = backend.write_piece(piece_offset, *piece_index, piece).await - { - // TODO: Will likely need to cache problematic backend indices to avoid hitting it over and over again repeatedly - error!( - %error, - %cache_index, - %piece_index, - %piece_offset, - "Failed to write piece into cache" - ); - continue; - } + let cache_index = offset.cache_index; + let piece_offset = offset.piece_offset; - let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash()); - caches.push_stored_piece(key, offset); + if let Some(backend) = maybe_backend + && let Err(error) = + backend.write_piece(piece_offset, piece_index, &piece).await + { + // TODO: Will likely need to cache problematic backend indices to avoid hitting it over and over again repeatedly + error!( + %error, + %cache_index, + %piece_index, + %piece_offset, + "Failed to write piece into cache" + ); + continue; + } - downloaded_pieces_count += 1; - // Do not print anything or send progress notification after last piece until piece - // cache is written fully below - if downloaded_pieces_count != pieces_to_download_total { - let progress = - downloaded_pieces_count as f32 / pieces_to_download_total as f32 * 100.0; - if downloaded_pieces_count % INTERMEDIATE_CACHE_UPDATE_INTERVAL == 0 { - self.piece_caches.write().await.clone_from(&caches); + let key = KeyWithDistance::new(self.peer_id, piece_index.to_multihash()); + caches.lock().push_stored_piece(key, offset); + + let prev_downloaded_pieces_count = + downloaded_pieces_count.fetch_add(1, Ordering::Relaxed); + // Do not print anything or send progress notification after last piece + // until piece cache is written fully below + if prev_downloaded_pieces_count != pieces_to_download_total { + let progress = prev_downloaded_pieces_count as f32 + / pieces_to_download_total as f32 + * 100.0; + if prev_downloaded_pieces_count % INTERMEDIATE_CACHE_UPDATE_INTERVAL + == 0 + { + let mut piece_caches = self.piece_caches.write().await; + piece_caches.clone_from(&caches.lock()); + + info!("Piece cache sync {progress:.2}% complete"); + } - info!("Piece cache sync {progress:.2}% complete"); + self.handlers.progress.call_simple(&progress); + } + } } - - self.handlers.progress.call_simple(&progress); - } + })); + // Download two batches concurrently to make sure slow tail of one is compensated by another + let mut downloading_pieces_stream = + downloading_pieces_stream.buffer_unordered(SYNC_CONCURRENT_BATCHES); + // TODO: Can't use this due to https://github.com/rust-lang/rust/issues/64650 + // Simply drain everything + // .for_each(|()| async {}) + + // TODO: Remove once https://github.com/rust-lang/rust/issues/64650 is resolved + while let Some(()) = downloading_pieces_stream.next().await { + // Simply drain everything } + drop(downloading_pieces_stream); - *self.piece_caches.write().await = caches; + *self.piece_caches.write().await = caches.into_inner(); self.handlers.progress.call_simple(&100.0); *last_segment_index_internal = last_segment_index; @@ -779,7 +813,7 @@ where let piece_indices = (*last_segment_index_internal..=last_segment_index) .flat_map(|segment_index| segment_index.segment_piece_indexes()); - // TODO: Can probably do concurrency here + // TODO: Download pieces concurrently for piece_index in piece_indices { if !self .piece_caches @@ -1146,7 +1180,7 @@ where piece_indices: PieceIndices, ) -> impl Stream)> + Send + Unpin + 'a where - PieceIndices: IntoIterator + Send + 'a, + PieceIndices: IntoIterator + Send + 'a, { let mut pieces_to_get_from_plot_cache = Vec::new(); diff --git a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs index 978d92d6cb..12d31dd8a5 100644 --- a/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs +++ b/crates/subspace-farmer/src/farmer_cache/piece_cache_state.rs @@ -46,6 +46,8 @@ where Some(free_offset) } None => { + // TODO: Use simple round robin for uniform filling instead of re-sorting all the + // time // Sort piece caches by number of stored pieces to fill those that are less // populated first let mut sorted_backends = self