Skip to content

Commit

Permalink
perf: persistent cache recovery use consumer mode (#9019)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrykingxyz authored Jan 17, 2025
1 parent 2cb67d1 commit 8d6406d
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
cache::persistent::cacheable_context::CacheableContext, AsyncDependenciesBlock,
AsyncDependenciesBlockIdentifier, BoxDependency, BoxModule, BuildDependency, DependencyParents,
ExportInfoData, ExportsInfoData, ModuleGraph, ModuleGraphConnection, ModuleGraphModule,
ModuleGraphPartial,
ModuleGraphPartial, RayonConsumer,
};

const SCOPE: &str = "occasion_make_module_graph";
Expand Down Expand Up @@ -132,45 +132,45 @@ pub async fn recovery_module_graph(
let mut need_check_dep = vec![];
let mut partial = ModuleGraphPartial::default();
let mut mg = ModuleGraph::new(vec![], Some(&mut partial));
let nodes: Vec<_> = storage
storage
.load(SCOPE)
.await?
.into_par_iter()
.map(|(_, v)| {
from_bytes::<Node, CacheableContext>(&v, context)
.expect("unexpected module graph deserialize failed")
})
.collect();
for mut node in nodes {
for (dep, parent_block) in node.dependencies {
mg.set_parents(
*dep.id(),
DependencyParents {
block: parent_block,
module: node.module.identifier(),
},
);
mg.add_dependency(dep);
}
for con in node.connections {
need_check_dep.push((con.dependency_id, *con.module_identifier()));
mg.cache_recovery_connection(con);
}
for block in node.blocks {
mg.add_block(Box::new(block));
}
// recovery exports/export info
let other_exports_info = ExportInfoData::new(None, None);
let side_effects_only_info = ExportInfoData::new(Some("*side effects only*".into()), None);
let exports_info = ExportsInfoData::new(other_exports_info.id(), side_effects_only_info.id());
node.mgm.exports = exports_info.id();
mg.set_exports_info(exports_info.id(), exports_info);
mg.set_export_info(side_effects_only_info.id(), side_effects_only_info);
mg.set_export_info(other_exports_info.id(), other_exports_info);
.with_max_len(1)
.consume(|mut node| {
for (dep, parent_block) in node.dependencies {
mg.set_parents(
*dep.id(),
DependencyParents {
block: parent_block,
module: node.module.identifier(),
},
);
mg.add_dependency(dep);
}
for con in node.connections {
need_check_dep.push((con.dependency_id, *con.module_identifier()));
mg.cache_recovery_connection(con);
}
for block in node.blocks {
mg.add_block(Box::new(block));
}
// recovery exports/export info
let other_exports_info = ExportInfoData::new(None, None);
let side_effects_only_info = ExportInfoData::new(Some("*side effects only*".into()), None);
let exports_info = ExportsInfoData::new(other_exports_info.id(), side_effects_only_info.id());
node.mgm.exports = exports_info.id();
mg.set_exports_info(exports_info.id(), exports_info);
mg.set_export_info(side_effects_only_info.id(), side_effects_only_info);
mg.set_export_info(other_exports_info.id(), other_exports_info);

mg.add_module_graph_module(node.mgm);
mg.add_module(node.module);
}
mg.add_module_graph_module(node.mgm);
mg.add_module(node.module);
});
// recovery incoming connections
for (con_id, module_identifier) in &need_check_dep {
if let Some(mgm) = mg.module_graph_module_by_identifier_mut(module_identifier) {
Expand Down
36 changes: 19 additions & 17 deletions crates/rspack_core/src/cache/persistent/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use rustc_hash::FxHashSet as HashSet;
pub use self::option::{PathMatcher, SnapshotOptions};
use self::strategy::{Strategy, StrategyHelper, ValidateResult};
use super::storage::Storage;
use crate::FutureConsumer;

const SCOPE: &str = "snapshot";

Expand Down Expand Up @@ -87,35 +88,36 @@ impl Snapshot {

#[tracing::instrument("Cache::Snapshot::calc_modified_path", skip_all)]
pub async fn calc_modified_paths(&self) -> Result<(HashSet<ArcPath>, HashSet<ArcPath>)> {
let helper = StrategyHelper::new(self.fs.clone());
let mut modified_path = HashSet::default();
let mut deleted_path = HashSet::default();
let helper = Arc::new(StrategyHelper::new(self.fs.clone()));

let results = self
self
.storage
.load(SCOPE)
.await?
.iter()
.map(|(key, value)| async {
let path: ArcPath = Path::new(&*String::from_utf8_lossy(key)).into();
let strategy: Strategy =
from_bytes::<Strategy, ()>(value, &()).expect("should from bytes success");
let validate = helper.validate(&path, &strategy).await;
(path, validate)
.into_iter()
.map(|(key, value)| {
let helper = helper.clone();
async move {
let path: ArcPath = Path::new(&*String::from_utf8_lossy(&key)).into();
let strategy: Strategy =
from_bytes::<Strategy, ()>(&value, &()).expect("should from bytes success");
let validate = helper.validate(&path, &strategy).await;
(path, validate)
}
})
.collect::<FuturesResults<_>>();

let mut modified_path = HashSet::default();
let mut deleted_path = HashSet::default();
for (path, validate) in results.into_inner() {
match validate {
.fut_consume(|(path, validate)| match validate {
ValidateResult::Modified => {
modified_path.insert(path);
}
ValidateResult::Deleted => {
deleted_path.insert(path);
}
ValidateResult::NoChanged => {}
}
}
})
.await;

Ok((modified_path, deleted_path))
}
}
Expand Down
17 changes: 6 additions & 11 deletions crates/rspack_core/src/utils/iterator_consumer/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ pub trait FutureConsumer {
/// Use to immediately consume the data produced by the future in the iterator
/// without waiting for all the data to be processed.
/// The closures runs in the current thread.
async fn fut_consume<F>(self, func: impl Fn(Self::Item) -> F + Send)
where
F: Future + Send;
async fn fut_consume(self, func: impl FnMut(Self::Item) + Send);
}

#[async_trait::async_trait]
Expand All @@ -23,10 +21,7 @@ where
Fut::Output: Send + 'static,
{
type Item = Fut::Output;
async fn fut_consume<F>(self, func: impl Fn(Self::Item) -> F + Send)
where
F: Future + Send,
{
async fn fut_consume(self, mut func: impl FnMut(Self::Item) + Send) {
let mut rx = {
// Create the channel in the closure to ensure all sender are dropped when iterator completes
// This ensures that the receiver does not get stuck in an infinite loop.
Expand All @@ -43,7 +38,7 @@ where
};

while let Some(data) = rx.recv().await {
func(data).await;
func(data);
}
}
}
Expand All @@ -61,7 +56,7 @@ mod test {
async fn available() {
(0..10)
.map(|item| async move { item * 2 })
.fut_consume(|item| async move { assert_eq!(item % 2, 0) })
.fut_consume(|item| assert_eq!(item % 2, 0))
.await;
}

Expand All @@ -74,8 +69,8 @@ mod test {
sleep(Duration::from_millis(item)).await;
item
})
.fut_consume(|_| async move {
sleep(Duration::from_millis(20)).await;
.fut_consume(|_| {
std::thread::sleep(std::time::Duration::from_millis(20));
})
.await;
let time1 = SystemTime::now().duration_since(start).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/rspack_core/src/utils/iterator_consumer/rayon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub trait RayonConsumer {
/// Use to immediately consume the data produced by the rayon iterator
/// without waiting for all the data to be processed.
/// The closures runs in the current thread.
fn consume(self, func: impl Fn(Self::Item));
fn consume(self, func: impl FnMut(Self::Item));
}

impl<P, I> RayonConsumer for P
Expand All @@ -17,7 +17,7 @@ where
I: Send,
{
type Item = I;
fn consume(self, func: impl Fn(Self::Item)) {
fn consume(self, mut func: impl FnMut(Self::Item)) {
let (tx, rx) = channel::<Self::Item>();
std::thread::scope(|s| {
// move rx to s.spawn, otherwise rx.into_iter will never stop
Expand Down
17 changes: 6 additions & 11 deletions crates/rspack_core/src/utils/iterator_consumer/rayon_fut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ pub trait RayonFutureConsumer {
/// Use to immediately consume the data produced by the future in the rayon iterator
/// without waiting for all the data to be processed.
/// The closures runs in the current thread.
async fn fut_consume<F>(self, func: impl Fn(Self::Item) -> F + Send)
where
F: Future + Send;
async fn fut_consume(self, func: impl FnMut(Self::Item) + Send);
}

#[async_trait::async_trait]
Expand All @@ -26,10 +24,7 @@ where
Fut::Output: Send + 'static,
{
type Item = Fut::Output;
async fn fut_consume<F>(self, func: impl Fn(Self::Item) -> F + Send)
where
F: Future + Send,
{
async fn fut_consume(self, mut func: impl FnMut(Self::Item) + Send) {
let mut rx = {
// Create the channel in the closure to ensure all sender are dropped when iterator completes
// This ensures that the receiver does not get stuck in an infinite loop.
Expand All @@ -46,7 +41,7 @@ where
};

while let Some(data) = rx.recv().await {
func(data).await;
func(data);
}
}
}
Expand All @@ -66,7 +61,7 @@ mod test {
(0..10)
.into_par_iter()
.map(|item| async move { item * 2 })
.fut_consume(|item| async move { assert_eq!(item % 2, 0) })
.fut_consume(|item| assert_eq!(item % 2, 0))
.await;
}

Expand All @@ -79,8 +74,8 @@ mod test {
sleep(Duration::from_millis(item)).await;
item
})
.fut_consume(|_| async move {
sleep(Duration::from_millis(20)).await;
.fut_consume(|_| {
std::thread::sleep(std::time::Duration::from_millis(20));
})
.await;
let time1 = SystemTime::now().duration_since(start).unwrap();
Expand Down

2 comments on commit 8d6406d

@github-actions
Copy link
Contributor

@github-actions github-actions bot commented on 8d6406d Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Ecosystem CI detail: Open

suite result
modernjs ❌ failure
rspress ✅ success
rslib ✅ success
rsbuild ✅ success
rsdoctor ✅ success
examples ✅ success
devserver ✅ success
nuxt ✅ success

@github-actions
Copy link
Contributor

@github-actions github-actions bot commented on 8d6406d Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Benchmark detail: Open

Name Base (2025-01-17 2cb67d1) Current Change
10000_big_production-mode_disable-minimize + exec 37.5 s ± 811 ms 38.3 s ± 1.11 s +2.11 %
10000_development-mode + exec 1.86 s ± 165 ms 1.84 s ± 19 ms -0.99 %
10000_development-mode_hmr + exec 687 ms ± 25 ms 680 ms ± 2.5 ms -1.03 %
10000_production-mode + exec 2.43 s ± 122 ms 2.44 s ± 117 ms +0.73 %
10000_production-mode_persistent-cold + exec 2.52 s ± 19 ms 2.54 s ± 43 ms +0.90 %
10000_production-mode_persistent-hot + exec 1.77 s ± 23 ms 1.75 s ± 51 ms -1.32 %
arco-pro_development-mode + exec 1.78 s ± 111 ms 1.76 s ± 132 ms -1.28 %
arco-pro_development-mode_hmr + exec 388 ms ± 6.1 ms 386 ms ± 1 ms -0.58 %
arco-pro_production-mode + exec 3.78 s ± 221 ms 3.66 s ± 165 ms -3.10 %
arco-pro_production-mode_generate-package-json-webpack-plugin + exec 3.72 s ± 187 ms 3.68 s ± 153 ms -1.13 %
arco-pro_production-mode_persistent-cold + exec 3.92 s ± 119 ms 3.83 s ± 131 ms -2.20 %
arco-pro_production-mode_persistent-hot + exec 2.47 s ± 107 ms 2.53 s ± 147 ms +2.55 %
arco-pro_production-mode_traverse-chunk-modules + exec 3.76 s ± 91 ms 3.69 s ± 160 ms -1.85 %
large-dyn-imports_development-mode + exec 2.1 s ± 26 ms 2.14 s ± 125 ms +1.70 %
large-dyn-imports_production-mode + exec 2.14 s ± 29 ms 2.18 s ± 64 ms +1.52 %
threejs_development-mode_10x + exec 1.62 s ± 36 ms 1.63 s ± 36 ms +0.90 %
threejs_development-mode_10x_hmr + exec 760 ms ± 13 ms 769 ms ± 12 ms +1.16 %
threejs_production-mode_10x + exec 5.48 s ± 193 ms 5.45 s ± 53 ms -0.54 %
threejs_production-mode_10x_persistent-cold + exec 5.52 s ± 50 ms 5.52 s ± 43 ms -0.07 %
threejs_production-mode_10x_persistent-hot + exec 4.68 s ± 344 ms 4.69 s ± 350 ms +0.12 %
10000_big_production-mode_disable-minimize + rss memory 9567 MiB ± 503 MiB 9642 MiB ± 577 MiB +0.79 %
10000_development-mode + rss memory 662 MiB ± 6.56 MiB 641 MiB ± 7.52 MiB -3.05 %
10000_development-mode_hmr + rss memory 1399 MiB ± 344 MiB 1430 MiB ± 342 MiB +2.24 %
10000_production-mode + rss memory 674 MiB ± 55.3 MiB 641 MiB ± 43.4 MiB -4.86 %
10000_production-mode_persistent-cold + rss memory 763 MiB ± 46.4 MiB 751 MiB ± 39.7 MiB -1.58 %
10000_production-mode_persistent-hot + rss memory 776 MiB ± 54.8 MiB 715 MiB ± 32.9 MiB -7.83 %
arco-pro_development-mode + rss memory 570 MiB ± 60.3 MiB 560 MiB ± 66.1 MiB -1.80 %
arco-pro_development-mode_hmr + rss memory 590 MiB ± 69.9 MiB 569 MiB ± 66.4 MiB -3.62 %
arco-pro_production-mode + rss memory 748 MiB ± 52.4 MiB 729 MiB ± 86 MiB -2.59 %
arco-pro_production-mode_generate-package-json-webpack-plugin + rss memory 719 MiB ± 92.8 MiB 728 MiB ± 91.8 MiB +1.25 %
arco-pro_production-mode_persistent-cold + rss memory 819 MiB ± 45.7 MiB 850 MiB ± 60.6 MiB +3.71 %
arco-pro_production-mode_persistent-hot + rss memory 682 MiB ± 31.6 MiB 664 MiB ± 37.8 MiB -2.60 %
arco-pro_production-mode_traverse-chunk-modules + rss memory 710 MiB ± 87 MiB 712 MiB ± 75.8 MiB +0.30 %
large-dyn-imports_development-mode + rss memory 636 MiB ± 5.9 MiB 622 MiB ± 6.71 MiB -2.23 %
large-dyn-imports_production-mode + rss memory 531 MiB ± 6.77 MiB 516 MiB ± 2.47 MiB -2.83 %
threejs_development-mode_10x + rss memory 567 MiB ± 24 MiB 544 MiB ± 22.7 MiB -4.04 %
threejs_development-mode_10x_hmr + rss memory 1139 MiB ± 227 MiB 1157 MiB ± 61.2 MiB +1.54 %
threejs_production-mode_10x + rss memory 870 MiB ± 67.3 MiB 852 MiB ± 68.9 MiB -2.12 %
threejs_production-mode_10x_persistent-cold + rss memory 980 MiB ± 54.2 MiB 992 MiB ± 41.5 MiB +1.30 %
threejs_production-mode_10x_persistent-hot + rss memory 904 MiB ± 46.5 MiB 928 MiB ± 64.1 MiB +2.68 %

Please sign in to comment.