diff --git a/crates/rspack_core/src/cache/persistent/occasion/make/module_graph.rs b/crates/rspack_core/src/cache/persistent/occasion/make/module_graph.rs index 1162798fa10e..0c61b96c91e3 100644 --- a/crates/rspack_core/src/cache/persistent/occasion/make/module_graph.rs +++ b/crates/rspack_core/src/cache/persistent/occasion/make/module_graph.rs @@ -15,7 +15,7 @@ use crate::{ cache::persistent::cacheable_context::CacheableContext, AsyncDependenciesBlock, AsyncDependenciesBlockIdentifier, BoxDependency, BoxModule, BuildDependency, DependencyParents, ExportInfoData, ExportsInfoData, ModuleGraph, ModuleGraphConnection, ModuleGraphModule, - ModuleGraphPartial, + ModuleGraphPartial, RayonConsumer, }; const SCOPE: &str = "occasion_make_module_graph"; @@ -132,7 +132,7 @@ pub async fn recovery_module_graph( let mut need_check_dep = vec![]; let mut partial = ModuleGraphPartial::default(); let mut mg = ModuleGraph::new(vec![], Some(&mut partial)); - let nodes: Vec<_> = storage + storage .load(SCOPE) .await? .into_par_iter() @@ -140,37 +140,37 @@ pub async fn recovery_module_graph( from_bytes::(&v, context) .expect("unexpected module graph deserialize failed") }) - .collect(); - for mut node in nodes { - for (dep, parent_block) in node.dependencies { - mg.set_parents( - *dep.id(), - DependencyParents { - block: parent_block, - module: node.module.identifier(), - }, - ); - mg.add_dependency(dep); - } - for con in node.connections { - need_check_dep.push((con.dependency_id, *con.module_identifier())); - mg.cache_recovery_connection(con); - } - for block in node.blocks { - mg.add_block(Box::new(block)); - } - // recovery exports/export info - let other_exports_info = ExportInfoData::new(None, None); - let side_effects_only_info = ExportInfoData::new(Some("*side effects only*".into()), None); - let exports_info = ExportsInfoData::new(other_exports_info.id(), side_effects_only_info.id()); - node.mgm.exports = exports_info.id(); - mg.set_exports_info(exports_info.id(), exports_info); - mg.set_export_info(side_effects_only_info.id(), side_effects_only_info); - mg.set_export_info(other_exports_info.id(), other_exports_info); + .with_max_len(1) + .consume(|mut node| { + for (dep, parent_block) in node.dependencies { + mg.set_parents( + *dep.id(), + DependencyParents { + block: parent_block, + module: node.module.identifier(), + }, + ); + mg.add_dependency(dep); + } + for con in node.connections { + need_check_dep.push((con.dependency_id, *con.module_identifier())); + mg.cache_recovery_connection(con); + } + for block in node.blocks { + mg.add_block(Box::new(block)); + } + // recovery exports/export info + let other_exports_info = ExportInfoData::new(None, None); + let side_effects_only_info = ExportInfoData::new(Some("*side effects only*".into()), None); + let exports_info = ExportsInfoData::new(other_exports_info.id(), side_effects_only_info.id()); + node.mgm.exports = exports_info.id(); + mg.set_exports_info(exports_info.id(), exports_info); + mg.set_export_info(side_effects_only_info.id(), side_effects_only_info); + mg.set_export_info(other_exports_info.id(), other_exports_info); - mg.add_module_graph_module(node.mgm); - mg.add_module(node.module); - } + mg.add_module_graph_module(node.mgm); + mg.add_module(node.module); + }); // recovery incoming connections for (con_id, module_identifier) in &need_check_dep { if let Some(mgm) = mg.module_graph_module_by_identifier_mut(module_identifier) { diff --git a/crates/rspack_core/src/cache/persistent/snapshot/mod.rs b/crates/rspack_core/src/cache/persistent/snapshot/mod.rs index 7ee5fc3a43e5..5087dbffe9a5 100644 --- a/crates/rspack_core/src/cache/persistent/snapshot/mod.rs +++ b/crates/rspack_core/src/cache/persistent/snapshot/mod.rs @@ -13,6 +13,7 @@ use rustc_hash::FxHashSet as HashSet; pub use self::option::{PathMatcher, SnapshotOptions}; use self::strategy::{Strategy, StrategyHelper, ValidateResult}; use super::storage::Storage; +use crate::FutureConsumer; const SCOPE: &str = "snapshot"; @@ -87,26 +88,26 @@ impl Snapshot { #[tracing::instrument("Cache::Snapshot::calc_modified_path", skip_all)] pub async fn calc_modified_paths(&self) -> Result<(HashSet, HashSet)> { - let helper = StrategyHelper::new(self.fs.clone()); + let mut modified_path = HashSet::default(); + let mut deleted_path = HashSet::default(); + let helper = Arc::new(StrategyHelper::new(self.fs.clone())); - let results = self + self .storage .load(SCOPE) .await? - .iter() - .map(|(key, value)| async { - let path: ArcPath = Path::new(&*String::from_utf8_lossy(key)).into(); - let strategy: Strategy = - from_bytes::(value, &()).expect("should from bytes success"); - let validate = helper.validate(&path, &strategy).await; - (path, validate) + .into_iter() + .map(|(key, value)| { + let helper = helper.clone(); + async move { + let path: ArcPath = Path::new(&*String::from_utf8_lossy(&key)).into(); + let strategy: Strategy = + from_bytes::(&value, &()).expect("should from bytes success"); + let validate = helper.validate(&path, &strategy).await; + (path, validate) + } }) - .collect::>(); - - let mut modified_path = HashSet::default(); - let mut deleted_path = HashSet::default(); - for (path, validate) in results.into_inner() { - match validate { + .fut_consume(|(path, validate)| match validate { ValidateResult::Modified => { modified_path.insert(path); } @@ -114,8 +115,9 @@ impl Snapshot { deleted_path.insert(path); } ValidateResult::NoChanged => {} - } - } + }) + .await; + Ok((modified_path, deleted_path)) } } diff --git a/crates/rspack_core/src/utils/iterator_consumer/future.rs b/crates/rspack_core/src/utils/iterator_consumer/future.rs index cb8e28e1e81b..822d406a8110 100644 --- a/crates/rspack_core/src/utils/iterator_consumer/future.rs +++ b/crates/rspack_core/src/utils/iterator_consumer/future.rs @@ -10,9 +10,7 @@ pub trait FutureConsumer { /// Use to immediately consume the data produced by the future in the iterator /// without waiting for all the data to be processed. /// The closures runs in the current thread. - async fn fut_consume(self, func: impl Fn(Self::Item) -> F + Send) - where - F: Future + Send; + async fn fut_consume(self, func: impl FnMut(Self::Item) + Send); } #[async_trait::async_trait] @@ -23,10 +21,7 @@ where Fut::Output: Send + 'static, { type Item = Fut::Output; - async fn fut_consume(self, func: impl Fn(Self::Item) -> F + Send) - where - F: Future + Send, - { + async fn fut_consume(self, mut func: impl FnMut(Self::Item) + Send) { let mut rx = { // Create the channel in the closure to ensure all sender are dropped when iterator completes // This ensures that the receiver does not get stuck in an infinite loop. @@ -43,7 +38,7 @@ where }; while let Some(data) = rx.recv().await { - func(data).await; + func(data); } } } @@ -61,7 +56,7 @@ mod test { async fn available() { (0..10) .map(|item| async move { item * 2 }) - .fut_consume(|item| async move { assert_eq!(item % 2, 0) }) + .fut_consume(|item| assert_eq!(item % 2, 0)) .await; } @@ -74,8 +69,8 @@ mod test { sleep(Duration::from_millis(item)).await; item }) - .fut_consume(|_| async move { - sleep(Duration::from_millis(20)).await; + .fut_consume(|_| { + std::thread::sleep(std::time::Duration::from_millis(20)); }) .await; let time1 = SystemTime::now().duration_since(start).unwrap(); diff --git a/crates/rspack_core/src/utils/iterator_consumer/rayon.rs b/crates/rspack_core/src/utils/iterator_consumer/rayon.rs index 47495602155f..7ebe9e8efcf1 100644 --- a/crates/rspack_core/src/utils/iterator_consumer/rayon.rs +++ b/crates/rspack_core/src/utils/iterator_consumer/rayon.rs @@ -8,7 +8,7 @@ pub trait RayonConsumer { /// Use to immediately consume the data produced by the rayon iterator /// without waiting for all the data to be processed. /// The closures runs in the current thread. - fn consume(self, func: impl Fn(Self::Item)); + fn consume(self, func: impl FnMut(Self::Item)); } impl RayonConsumer for P @@ -17,7 +17,7 @@ where I: Send, { type Item = I; - fn consume(self, func: impl Fn(Self::Item)) { + fn consume(self, mut func: impl FnMut(Self::Item)) { let (tx, rx) = channel::(); std::thread::scope(|s| { // move rx to s.spawn, otherwise rx.into_iter will never stop diff --git a/crates/rspack_core/src/utils/iterator_consumer/rayon_fut.rs b/crates/rspack_core/src/utils/iterator_consumer/rayon_fut.rs index 5e0be15cf9cd..9b6b8c1ae3c6 100644 --- a/crates/rspack_core/src/utils/iterator_consumer/rayon_fut.rs +++ b/crates/rspack_core/src/utils/iterator_consumer/rayon_fut.rs @@ -13,9 +13,7 @@ pub trait RayonFutureConsumer { /// Use to immediately consume the data produced by the future in the rayon iterator /// without waiting for all the data to be processed. /// The closures runs in the current thread. - async fn fut_consume(self, func: impl Fn(Self::Item) -> F + Send) - where - F: Future + Send; + async fn fut_consume(self, func: impl FnMut(Self::Item) + Send); } #[async_trait::async_trait] @@ -26,10 +24,7 @@ where Fut::Output: Send + 'static, { type Item = Fut::Output; - async fn fut_consume(self, func: impl Fn(Self::Item) -> F + Send) - where - F: Future + Send, - { + async fn fut_consume(self, mut func: impl FnMut(Self::Item) + Send) { let mut rx = { // Create the channel in the closure to ensure all sender are dropped when iterator completes // This ensures that the receiver does not get stuck in an infinite loop. @@ -46,7 +41,7 @@ where }; while let Some(data) = rx.recv().await { - func(data).await; + func(data); } } } @@ -66,7 +61,7 @@ mod test { (0..10) .into_par_iter() .map(|item| async move { item * 2 }) - .fut_consume(|item| async move { assert_eq!(item % 2, 0) }) + .fut_consume(|item| assert_eq!(item % 2, 0)) .await; } @@ -79,8 +74,8 @@ mod test { sleep(Duration::from_millis(item)).await; item }) - .fut_consume(|_| async move { - sleep(Duration::from_millis(20)).await; + .fut_consume(|_| { + std::thread::sleep(std::time::Duration::from_millis(20)); }) .await; let time1 = SystemTime::now().duration_since(start).unwrap();