From 864a6ad483f2c2850000f999c609d8831c012c20 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 17 May 2024 17:26:19 +0200 Subject: [PATCH] Improve Garbage Collection (#8154) ### Description Simplify and improve GC This improves the GC queue. The job of the GC queue is to find tasks that should be garbage collected. There are three factors which influence that: * age of the task: Time since last access. * memory usage of the task * compute duration of the task: CPU time spend to compute the task. Memory usage and compute duration combine into a GC priority by calculating: `(memory_usage + C1) / (compute_duration + C2)`. C1 and C2 and constants to fine tune the priority. The age of the task is constantly changing so a different scheme is used here: Every task has a generation in which is was last accessed. The generation is increased every 100,000 tasks. We accumulate tasks in the current generation in a concurrent queue. Once 100,000 tasks are reached (atomic counter), we increase the generation and pop 100,000 tasks from the queue into an `OldGeneration`. These old generations are stored in another queue. No storing is apply so far. These are just lists of task ids. Once we need to perform GC, we pop the oldest old generation from the queue, filter out all tasks that are in a higher generation (they have been accessed in the meantime), and sort the list by GC priority. Then we take the 30% top tasks and garbage collect them. Then remaining tasks are pushed to the front of the queue again, intermixed with other tasks into existing old generations until we reach a maximum of 200,000 tasks in a generation item. In that case the generation item is split into two items. ### Testing Instructions --- Cargo.lock | 1 + crates/node-file-trace/src/lib.rs | 30 +- crates/turbo-tasks-malloc/src/counter.rs | 4 + crates/turbo-tasks-malloc/src/lib.rs | 4 + .../src/aggregation/new_edge.rs | 4 +- crates/turbo-tasks-memory/src/cell.rs | 38 +- .../src/concurrent_priority_queue.rs | 185 ------ crates/turbo-tasks-memory/src/gc.rs | 399 +++++++------ crates/turbo-tasks-memory/src/lib.rs | 3 - .../turbo-tasks-memory/src/memory_backend.rs | 115 ++-- .../src/memory_backend_with_pg.rs | 3 +- crates/turbo-tasks-memory/src/output.rs | 4 - crates/turbo-tasks-memory/src/stats.rs | 337 ----------- crates/turbo-tasks-memory/src/task.rs | 544 +++--------------- .../src/task/aggregation.rs | 9 +- .../turbo-tasks-memory/src/task/meta_state.rs | 37 +- crates/turbo-tasks-memory/src/task/stats.rs | 143 ----- crates/turbo-tasks-memory/src/viz/graph.rs | 293 ---------- crates/turbo-tasks-memory/src/viz/mod.rs | 236 -------- crates/turbo-tasks-memory/src/viz/table.rs | 239 -------- crates/turbo-tasks/src/backend.rs | 3 +- crates/turbo-tasks/src/completion.rs | 13 +- crates/turbo-tasks/src/lib.rs | 2 +- crates/turbo-tasks/src/manager.rs | 49 +- crates/turbopack-cli/src/dev/mod.rs | 17 +- .../turbopack-cli/src/dev/turbo_tasks_viz.rs | 142 ----- crates/turbopack-trace-utils/src/raw_trace.rs | 3 + crates/turbopack/examples/turbopack.rs | 40 +- 28 files changed, 439 insertions(+), 2458 deletions(-) delete mode 100644 crates/turbo-tasks-memory/src/concurrent_priority_queue.rs delete mode 100644 crates/turbo-tasks-memory/src/stats.rs delete mode 100644 crates/turbo-tasks-memory/src/task/stats.rs delete mode 100644 crates/turbo-tasks-memory/src/viz/graph.rs delete mode 100644 crates/turbo-tasks-memory/src/viz/mod.rs delete mode 100644 crates/turbo-tasks-memory/src/viz/table.rs delete mode 100644 crates/turbopack-cli/src/dev/turbo_tasks_viz.rs diff --git a/Cargo.lock b/Cargo.lock index 3612cbe2cf749..032c44d65f512 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10057,6 +10057,7 @@ dependencies = [ "turbo-tasks-build", "turbo-tasks-hash", "turbo-tasks-macros", + "turbo-tasks-malloc", ] [[package]] diff --git a/crates/node-file-trace/src/lib.rs b/crates/node-file-trace/src/lib.rs index e34f5e2535df8..34e21c08c5702 100644 --- a/crates/node-file-trace/src/lib.rs +++ b/crates/node-file-trace/src/lib.rs @@ -6,7 +6,6 @@ mod nft_json; use std::{ collections::{BTreeSet, HashMap}, env::current_dir, - fs, future::Future, path::{Path, PathBuf}, pin::Pin, @@ -24,15 +23,12 @@ use serde::Serialize; use tokio::sync::mpsc::channel; use turbo_tasks::{ backend::Backend, util::FormatDuration, TaskId, TransientInstance, TransientValue, TurboTasks, - TurboTasksBackendApi, UpdateInfo, Value, Vc, + UpdateInfo, Value, Vc, }; use turbo_tasks_fs::{ glob::Glob, DirectoryEntry, DiskFileSystem, FileSystem, FileSystemPath, ReadGlobResult, }; -use turbo_tasks_memory::{ - stats::{ReferenceType, Stats}, - viz, MemoryBackend, -}; +use turbo_tasks_memory::MemoryBackend; use turbopack::{ emit_asset, emit_with_completion, module_options::ModuleOptionsContext, rebase::RebasedAsset, ModuleAssetContext, @@ -101,10 +97,6 @@ pub struct CommonArgs { #[cfg_attr(feature = "node-api", serde(default))] cache: CacheArgs, - #[cfg_attr(feature = "cli", clap(short, long))] - #[cfg_attr(feature = "node-api", serde(default))] - visualize_graph: bool, - #[cfg_attr(feature = "cli", clap(short, long))] #[cfg_attr(feature = "node-api", serde(default))] watch: bool, @@ -325,7 +317,6 @@ pub async fn start( ) -> Result> { register(); let &CommonArgs { - visualize_graph, memory_limit, #[cfg(feature = "persistent_cache")] cache: CacheArgs { @@ -390,22 +381,7 @@ pub async fn start( TurboTasks::new(MemoryBackend::new(memory_limit.unwrap_or(usize::MAX))) }) }, - |tt, root_task, _| async move { - if visualize_graph { - let mut stats = Stats::new(); - let b = tt.backend(); - b.with_all_cached_tasks(|task| { - stats.add_id(b, task); - }); - stats.add_id(b, root_task); - // stats.merge_resolve(); - let tree = stats.treeify(ReferenceType::Child); - let graph = - viz::graph::visualize_stats_tree(tree, ReferenceType::Child, tt.stats_type()); - fs::write("graph.html", viz::graph::wrap_html(&graph)).unwrap(); - println!("graph.html written"); - } - }, + |_, _, _| async move {}, module_options, resolve_options, ) diff --git a/crates/turbo-tasks-malloc/src/counter.rs b/crates/turbo-tasks-malloc/src/counter.rs index abefbebb15a78..e0765f51101d1 100644 --- a/crates/turbo-tasks-malloc/src/counter.rs +++ b/crates/turbo-tasks-malloc/src/counter.rs @@ -70,6 +70,10 @@ pub fn allocation_counters() -> AllocationCounters { with_local_counter(|local| local.allocation_counters.clone()) } +pub fn reset_allocation_counters(start: AllocationCounters) { + with_local_counter(|local| local.allocation_counters = start); +} + fn with_local_counter(f: impl FnOnce(&mut ThreadLocalCounter) -> T) -> T { LOCAL_COUNTER.with(|local| { let ptr = local.get(); diff --git a/crates/turbo-tasks-malloc/src/lib.rs b/crates/turbo-tasks-malloc/src/lib.rs index 5624cc279befc..5fa6cc4c7d7cf 100644 --- a/crates/turbo-tasks-malloc/src/lib.rs +++ b/crates/turbo-tasks-malloc/src/lib.rs @@ -61,6 +61,10 @@ impl TurboMalloc { pub fn allocation_counters() -> AllocationCounters { self::counter::allocation_counters() } + + pub fn reset_allocation_counters(start: AllocationCounters) { + self::counter::reset_allocation_counters(start); + } } #[cfg(all( diff --git a/crates/turbo-tasks-memory/src/aggregation/new_edge.rs b/crates/turbo-tasks-memory/src/aggregation/new_edge.rs index 1c295434a2316..cb7131a09511d 100644 --- a/crates/turbo-tasks-memory/src/aggregation/new_edge.rs +++ b/crates/turbo-tasks-memory/src/aggregation/new_edge.rs @@ -20,9 +20,9 @@ const MAX_AFFECTED_NODES: usize = 4096; /// Handle the addition of a new edge to a node. The the edge is propagated to /// the uppers of that node or added a inner node. #[tracing::instrument(level = tracing::Level::TRACE, name = "handle_new_edge_preparation", skip_all)] -pub fn handle_new_edge<'l, C: AggregationContext>( +pub fn handle_new_edge( ctx: &C, - origin: &mut C::Guard<'l>, + origin: &mut C::Guard<'_>, origin_id: &C::NodeRef, target_id: &C::NodeRef, number_of_children: usize, diff --git a/crates/turbo-tasks-memory/src/cell.rs b/crates/turbo-tasks-memory/src/cell.rs index d006bebd1ead6..d2b8998d25fb8 100644 --- a/crates/turbo-tasks-memory/src/cell.rs +++ b/crates/turbo-tasks-memory/src/cell.rs @@ -75,41 +75,6 @@ impl Cell { } } - /// Returns true if the cell has dependent tasks. - pub fn has_dependent_tasks(&self) -> bool { - match self { - Cell::Empty => false, - Cell::Recomputing { - dependent_tasks, .. - } - | Cell::Value { - dependent_tasks, .. - } - | Cell::TrackedValueless { - dependent_tasks, .. - } => !dependent_tasks.is_empty(), - } - } - - /// Returns the list of dependent tasks. - pub fn dependent_tasks(&self) -> &TaskIdSet { - match self { - Cell::Empty => { - static EMPTY: TaskIdSet = AutoSet::with_hasher(); - &EMPTY - } - Cell::Value { - dependent_tasks, .. - } - | Cell::TrackedValueless { - dependent_tasks, .. - } - | Cell::Recomputing { - dependent_tasks, .. - } => dependent_tasks, - } - } - /// Switch the cell to recomputing state. fn recompute( &mut self, @@ -235,6 +200,9 @@ impl Cell { } => { // Assigning to a cell will invalidate all dependent tasks as the content might // have changed. + // TODO this leads to flagging task unnecessarily dirty when a GC'ed task is + // recomputed. We need to use the notification of changed cells for the current + // task to check if it's valid to skip the invalidation here if !dependent_tasks.is_empty() { turbo_tasks.schedule_notify_tasks_set(dependent_tasks); } diff --git a/crates/turbo-tasks-memory/src/concurrent_priority_queue.rs b/crates/turbo-tasks-memory/src/concurrent_priority_queue.rs deleted file mode 100644 index 53c193072db0b..0000000000000 --- a/crates/turbo-tasks-memory/src/concurrent_priority_queue.rs +++ /dev/null @@ -1,185 +0,0 @@ -use std::{ - cmp::min, - collections::{hash_map::RandomState, BinaryHeap}, - fmt::Debug, - hash::{BuildHasher, Hash}, -}; - -use once_cell::sync::OnceCell; -use parking_lot::{Mutex, MutexGuard}; -use priority_queue::PriorityQueue; - -// shard_amount and shift are stolen from the dashmap crate implementation: -// https://github.com/xacrimon/dashmap/blob/0b2a2269b2d368494eeb41d4218da1b142da8e77/src/lib.rs#L64-L69 -// They are changed to use use 4 times the number of shards, since inserting -// into a priority queue is more expensive than a hashmap. So creating more -// shards will reduce contention. - -// Returns the number of shards to use. -fn shard_amount() -> usize { - static SHARD_AMOUNT: OnceCell = OnceCell::new(); - *SHARD_AMOUNT.get_or_init(|| { - (std::thread::available_parallelism().map_or(1, usize::from) * 16).next_power_of_two() - }) -} - -/// Returns the number of bits to shift a hash to get the shard index. -fn shift() -> usize { - static SHIFT: OnceCell = OnceCell::new(); - *SHIFT - .get_or_init(|| std::mem::size_of::() * 8 - shard_amount().trailing_zeros() as usize) -} - -pub struct ConcurrentPriorityQueue { - shards: Box<[Mutex>]>, - hasher: H, -} - -impl - ConcurrentPriorityQueue -{ - pub fn new() -> Self { - Self::with_hasher(Default::default()) - } -} - -impl - ConcurrentPriorityQueue -{ - pub fn with_hasher(hasher: H) -> Self { - let shards = (0..shard_amount()) - .map(|_| Mutex::new(PriorityQueue::with_hasher(hasher.clone()))) - .collect::>(); - Self { - shards: shards.into_boxed_slice(), - hasher, - } - } - - fn shard(&self, key: &K) -> MutexGuard> { - // Leave the high 7 bits for the HashBrown SIMD tag. - // see https://github.com/xacrimon/dashmap/blob/0b2a2269b2d368494eeb41d4218da1b142da8e77/src/lib.rs#L374 - let index = ((self.hasher.hash_one(key) as usize) << 7) >> shift(); - unsafe { self.shards.get_unchecked(index) }.lock() - } - - #[allow(dead_code)] - pub fn len(&self) -> usize { - let mut len = 0; - for shard in self.shards.iter() { - len += shard.lock().len() - } - len - } - - pub fn insert(&self, key: K, value: V) { - let mut inner = self.shard(&key); - inner.push(key, value); - } - - pub fn upsert_with( - &self, - key: K, - default_value: impl FnOnce() -> V, - value_update: impl FnOnce(&mut V), - ) { - let mut inner = self.shard(&key); - if !inner.change_priority_by(&key, value_update) { - inner.push(key, default_value()); - } - } - - /// Pops at most `min(factor / 256 * len, max_count)` items from the queue. - /// Due to concurrency, the actual amount of items may vary. The - /// returned vector is in any order, if you want items to be ordered you - /// need to sort it. - pub fn pop_factor(&self, factor: u8, max_count: usize) -> Vec<(K, V)> { - struct ShardsQueueItem { - index: usize, - value: V, - len: usize, - } - impl Ord for ShardsQueueItem { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.value - .cmp(&other.value) - // In case of equal priority we want to select the shard with the - // largest number of items, so we balance the load on shards. - .then_with(|| self.len.cmp(&other.len)) - } - } - impl PartialOrd for ShardsQueueItem { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } - } - impl PartialEq for ShardsQueueItem { - fn eq(&self, _other: &Self) -> bool { - unreachable!() - } - } - impl Eq for ShardsQueueItem {} - - // We build a priority queue of the shards so we can select from the shards in - // correct order. But this is only a snapshot of the shards, so any - // concurrent change to the shard will not be respected, but we are fine with - // that. - let mut shards_queue = BinaryHeap::with_capacity(self.shards.len()); - let mut total_len = 0; - for (i, shard) in self.shards.iter().enumerate() { - let shard = shard.lock(); - let len = shard.len(); - total_len += len; - if let Some((_, v)) = shard.peek() { - shards_queue.push(ShardsQueueItem { - index: i, - value: v.clone(), - len, - }); - } - } - let count = min(factor as usize * total_len / u8::MAX as usize, max_count); - let mut result = Vec::with_capacity(count); - loop { - if let Some(ShardsQueueItem { index: i, .. }) = shards_queue.pop() { - let mut shard = unsafe { self.shards.get_unchecked(i) }.lock(); - if let Some(ShardsQueueItem { - value: next_value, .. - }) = shards_queue.peek() - { - // The peek will be None if the shard has become empty concurrently - while let Some((_, peeked_value)) = shard.peek() { - // Is the next item in this shard (still) the global next - if peeked_value < next_value { - // some other shard is next - // enqueue this shard again - shards_queue.push(ShardsQueueItem { - index: i, - value: peeked_value.clone(), - len: shard.len(), - }); - break; - } - - result.push(shard.pop().unwrap()); - if result.len() >= count { - return result; - } - // We keep the shard lock and check the next item for a - // fast path - } - } else { - // No other shards - for _ in result.len()..count { - if let Some(item) = shard.pop() { - result.push(item); - } else { - break; - } - } - return result; - } - } - } - } -} diff --git a/crates/turbo-tasks-memory/src/gc.rs b/crates/turbo-tasks-memory/src/gc.rs index c3f117346e7ce..b0253fa0e8110 100644 --- a/crates/turbo-tasks-memory/src/gc.rs +++ b/crates/turbo-tasks-memory/src/gc.rs @@ -1,224 +1,269 @@ use std::{ - cmp::Reverse, - collections::HashMap, - time::{Duration, Instant}, + cmp::{max, Reverse}, + collections::VecDeque, + fmt::Debug, + mem::take, + sync::atomic::{AtomicU32, AtomicUsize, Ordering}, + time::Duration, }; use concurrent_queue::ConcurrentQueue; -use nohash_hasher::BuildNoHashHasher; -use turbo_tasks::{small_duration::SmallDuration, TaskId, TurboTasksBackendApi}; +use parking_lot::Mutex; +use tracing::field::{debug, Empty}; +use turbo_tasks::TaskId; -use crate::{concurrent_priority_queue::ConcurrentPriorityQueue, MemoryBackend}; +use crate::MemoryBackend; /// The priority of a task for garbage collection. /// Any action will shrink the internal memory structures of the task in a /// transparent way. -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] -pub enum GcPriority { - // The order influences priority. Put the highest priority first. - /// Unload cells that are currently not read by any task. This might cause - /// the task to recompute when these cells are read. - InactiveEmptyUnusedCells { - compute_duration: SmallDuration<10_000>, - }, - /// Unload the whole task. Only available for inactive tasks. - InactiveUnload { - /// The age of the task. Stored as 2^x seconds to - /// bucket tasks and avoid frequent revalidation. - age: Reverse, - /// Aggregated recompute time. Stored as 2^x milliseconds to bucket - /// tasks and avoid frequent revalidation. - total_compute_duration: u8, - }, - /// Unload cells that are currently not read by any task. This might cause - /// the task to recompute when these cells are read. - EmptyUnusedCells { - compute_duration: SmallDuration<10_000>, - }, - /// Unload all cells, and continue tracking them valueless. This might cause - /// the task and dependent tasks to recompute when these cells are read. - EmptyCells { - /// Aggregated recompute time. Stored as 2^x milliseconds to bucket - /// tasks and avoid frequent revalidation. - total_compute_duration: u8, - /// The age of the task. Stored as 2^x seconds to - /// bucket tasks and avoid frequent revalidation. - age: Reverse, - }, - Placeholder, -} - -/// Statistics about actions performed during garbage collection. -#[derive(Default, Debug)] -pub struct GcStats { - /// How many tasks were unloaded. - pub unloaded: usize, - /// How many unused cells were emptied. - pub empty_unused: usize, - /// How many unused cells were emptied (on the fast path). - pub empty_unused_fast: usize, - /// How many used cells were emptied. - pub empty_cells: usize, - /// How often the priority of a task was updated. - pub priority_updated: usize, - /// How often the priority of a task was updated (on the fast path). - pub priority_updated_fast: usize, - /// How many tasks were checked but did not need to have any action taken. - pub no_gc_needed: usize, - /// How many tasks were checked but were in a state where no action could be - /// taken. - pub no_gc_possible: usize, +#[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] +pub struct GcPriority { + // Memory usage divided by compute duration. Specifies how efficient garbage collection would + // be with this task. Higher memory usage and lower compute duration makes it more likely to be + // garbage collected. + memory_per_time: u16, } /// State about garbage collection for a task. -#[derive(Debug, Default)] +#[derive(Clone, Copy, Debug, Default)] pub struct GcTaskState { - pub inactive: bool, + pub priority: GcPriority, + /// The generation where the task was last accessed. + pub generation: u32, +} + +impl GcTaskState { + pub(crate) fn execution_completed( + &mut self, + duration: Duration, + memory_usage: usize, + generation: u32, + ) { + self.generation = generation; + self.priority = GcPriority { + memory_per_time: ((memory_usage + TASK_BASE_MEMORY_USAGE) as u64 + / (duration.as_micros() as u64 + TASK_BASE_COMPUTE_DURATION_IN_MICROS)) + .try_into() + .unwrap_or(u16::MAX), + }; + } + + pub(crate) fn on_read(&mut self, generation: u32) -> bool { + if self.generation < generation { + self.generation = generation; + true + } else { + false + } + } +} + +const TASKS_PER_NEW_GENERATION: usize = 100_000; +const MAX_TASKS_PER_OLD_GENERATION: usize = 200_000; +const PERCENTAGE_TO_COLLECT: usize = 30; +const TASK_BASE_MEMORY_USAGE: usize = 1_000; +const TASK_BASE_COMPUTE_DURATION_IN_MICROS: u64 = 1_000; +pub const PERCENTAGE_TARGET_MEMORY: usize = 88; +pub const PERCENTAGE_IDLE_TARGET_MEMORY: usize = 75; + +struct OldGeneration { + tasks: Vec, + generation: u32, +} + +struct ProcessGenerationResult { + priority: Option, + count: usize, } /// The queue of actions that garbage collection should perform. pub struct GcQueue { - /// Tasks that should be checked for inactive propagation. - inactive_propagate_queue: ConcurrentQueue, - /// Tasks ordered by gc priority. - queue: ConcurrentPriorityQueue>, + /// The current generation number. + generation: AtomicU32, + /// Fresh or read tasks that should added to the queue. + incoming_tasks: ConcurrentQueue, + /// Number of tasks in `incoming_tasks`. + incoming_tasks_count: AtomicUsize, + /// Tasks from old generations. The oldest generation will be garbage + /// collected next. + generations: Mutex>, } impl GcQueue { pub fn new() -> Self { Self { - inactive_propagate_queue: ConcurrentQueue::unbounded(), - queue: ConcurrentPriorityQueue::new(), + generation: AtomicU32::new(0), + incoming_tasks: ConcurrentQueue::unbounded(), + incoming_tasks_count: AtomicUsize::new(0), + generations: Mutex::new(VecDeque::with_capacity(128)), } } + /// Get the current generation number. + pub fn generation(&self) -> u32 { + self.generation.load(Ordering::Relaxed) + } + /// Notify the GC queue that a task has been executed. - pub fn task_executed(&self, task: TaskId, duration: Duration) { - // A freshly executed task will start on EmptyUnusedCells, even while we are not - // sure if there are unused cells. - let compute_duration = duration.into(); - let value = Reverse(GcPriority::EmptyUnusedCells { compute_duration }); - self.queue.insert(task, value); + pub fn task_executed(&self, task: TaskId) -> u32 { + self.add_task(task) } - /// Notify the GC queue that a task might become inactive. - pub fn task_might_become_inactive(&self, task: TaskId) { - let _ = self.inactive_propagate_queue.push(task); + /// Notify the GC queue that a task has been accessed. + pub fn task_accessed(&self, task: TaskId) -> u32 { + self.add_task(task) } - /// Notify the GC queue that a task has become inactive. This means the - /// [GcTaskState]::inactive has been set. - pub fn task_flagged_inactive(&self, task: TaskId, compute_duration: Duration) { - self.queue.upsert_with( - task, - || { - // When there is no entry, we schedule the minimum priority. - Reverse(GcPriority::InactiveEmptyUnusedCells { - compute_duration: compute_duration.into(), - }) - }, - |value| { - match &mut value.0 { - GcPriority::InactiveEmptyUnusedCells { .. } - | GcPriority::InactiveUnload { .. } => { - // already inactive + fn add_task(&self, task: TaskId) -> u32 { + let _ = self.incoming_tasks.push(task); + if self.incoming_tasks_count.fetch_add(1, Ordering::Acquire) % TASKS_PER_NEW_GENERATION + == TASKS_PER_NEW_GENERATION - 1 + { + self.incoming_tasks_count + .fetch_sub(TASKS_PER_NEW_GENERATION, Ordering::Release); + // We are selected to move TASKS_PER_NEW_GENERATION tasks into a generation + let gen = self.generation.fetch_add(1, Ordering::Relaxed); + let mut tasks = Vec::with_capacity(TASKS_PER_NEW_GENERATION); + for _ in 0..TASKS_PER_NEW_GENERATION { + match self.incoming_tasks.pop() { + Ok(task) => { + tasks.push(task); } - GcPriority::EmptyUnusedCells { compute_duration } => { - // Convert to the higher priority inactive version. - *value = Reverse(GcPriority::InactiveEmptyUnusedCells { - compute_duration: *compute_duration, - }) + Err(_) => { + // That will not happen, since we only pop the same amount as we have + // pushed. + unreachable!(); } - GcPriority::EmptyCells { - age, - total_compute_duration, - } => { - // Convert to the higher priority inactive version. - *value = Reverse(GcPriority::InactiveUnload { - age: *age, - total_compute_duration: *total_compute_duration, - }) - } - GcPriority::Placeholder => unreachable!(), } - }, - ); + } + self.generations.lock().push_front(OldGeneration { + tasks, + generation: gen, + }); + gen + } else { + self.generation.load(Ordering::Relaxed) + } } - /// Run garbage collection on the queue. The `factor` parameter controls how - /// much work should be done. It's a value between 0 and 255, where 255 - /// performs all the work possible. - pub fn run_gc( - &self, - factor: u8, - backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, - ) -> Option<(GcPriority, usize, GcStats)> { - // Process through the inactive propagation queue. - while let Ok(task) = self.inactive_propagate_queue.pop() { - backend.with_task(task, |task| { - task.gc_check_inactive(backend); + fn process_old_generation(&self, backend: &MemoryBackend) -> ProcessGenerationResult { + let old_generation = { + let guard = &mut self.generations.lock(); + guard.pop_back() + }; + let Some(OldGeneration { + mut tasks, + generation, + }) = old_generation + else { + // No old generation to process + return ProcessGenerationResult { + priority: None, + count: 0, + }; + }; + // Check all tasks for the correct generation + let mut indices = Vec::with_capacity(tasks.len()); + assert!(tasks.len() <= MAX_TASKS_PER_OLD_GENERATION); + for (i, task) in tasks.iter().enumerate() { + backend.with_task(*task, |task| { + if let Some(state) = task.gc_state() { + if state.generation <= generation { + indices.push((Reverse(state.priority), i as u32)); + } + } }); } - if factor == 0 { - return None; + if indices.is_empty() { + // No valid tasks in old generation to process + return ProcessGenerationResult { + priority: None, + count: 0, + }; } - // Process through the gc queue. - let now = turbo_tasks.program_duration_until(Instant::now()); - let mut task_duration_cache = HashMap::with_hasher(BuildNoHashHasher::default()); - let mut stats = GcStats::default(); - let result = self.select_tasks(factor, |task_id, _priority, max_priority| { - backend.with_task(task_id, |task| { - task.run_gc( - now, - max_priority, - &mut task_duration_cache, - &mut stats, - backend, - turbo_tasks, - ) - }) - }); - result.map(|(p, c)| (p, c, stats)) - } - - /// Select a number of tasks to run garbage collection on and run the - /// `execute` function on them. - pub fn select_tasks( - &self, - factor: u8, - mut execute: impl FnMut(TaskId, GcPriority, GcPriority) -> Option, - ) -> Option<(GcPriority, usize)> { - const MAX_POP: usize = 1000000; - let jobs = self.queue.pop_factor(factor, MAX_POP); - if jobs.is_empty() { - return None; + // Sorting based on sort_by_cached_key from std lib + indices.sort_unstable(); + for i in 0..indices.len() { + let mut index = indices[i].1; + while (index as usize) < i { + index = indices[index as usize].1; + } + indices[i].1 = index; + tasks.swap(i, index as usize); } - let highest_priority = jobs.iter().map(|&(_, Reverse(p))| p).max().unwrap(); - let len = jobs.len(); - for (task, Reverse(priority)) in jobs { - if let Some(new_priority) = execute(task, priority, highest_priority) { - self.queue.upsert_with( - task, - || Reverse(new_priority), - |value| { - if *value < Reverse(new_priority) { - *value = Reverse(new_priority); + tasks.truncate(indices.len()); + + let tasks_to_collect = max(1, tasks.len() * PERCENTAGE_TO_COLLECT / 100); + let (Reverse(max_priority), _) = indices[0]; + drop(indices); + + // Put back remaining tasks into the queue + let remaining_tasks = &tasks[tasks_to_collect..]; + { + let mut guard = self.generations.lock(); + if !remaining_tasks.is_empty() { + if let Some(first) = guard.front_mut() { + first.tasks.extend(remaining_tasks); + if first.tasks.len() > MAX_TASKS_PER_OLD_GENERATION { + // Need to split the tasks into two generations + let mut gen_b = Vec::with_capacity(first.tasks.len() / 2); + let mut gen_a = Vec::with_capacity(first.tasks.len() - gen_b.capacity()); + for (i, task) in take(&mut first.tasks).into_iter().enumerate() { + if i % 2 == 0 { + gen_a.push(task); + } else { + gen_b.push(task); + } } - }, - ); + let generation = first.generation; + first.tasks = gen_a; + guard.push_front(OldGeneration { + tasks: gen_b, + generation, + }); + } + } else { + guard.push_front(OldGeneration { + tasks: remaining_tasks.to_vec(), + generation, + }); + } } } - Some((highest_priority, len)) + + // GC the tasks + let mut count = 0; + for task in tasks[..tasks_to_collect].iter() { + backend.with_task(*task, |task| { + if task.run_gc(generation) { + count += 1; + } + }); + } + + ProcessGenerationResult { + priority: Some(max_priority), + count, + } } -} -/// Converts a value to an logarithmic scale. -pub fn to_exp_u8(value: u64) -> u8 { - value - .checked_next_power_of_two() - .unwrap_or(0x7000_0000_0000_0000) - .trailing_zeros() as u8 + /// Run garbage collection on the queue. + pub fn run_gc(&self, backend: &MemoryBackend) -> Option<(GcPriority, usize)> { + let span = + tracing::trace_span!("garbage collection", priority = Empty, count = Empty).entered(); + + let ProcessGenerationResult { priority, count } = self.process_old_generation(backend); + + span.record("count", count); + if let Some(priority) = &priority { + span.record("priority", debug(priority)); + } else { + span.record("priority", ""); + } + + priority.map(|p| (p, count)) + } } diff --git a/crates/turbo-tasks-memory/src/lib.rs b/crates/turbo-tasks-memory/src/lib.rs index c0fecc3796963..25a0536212132 100644 --- a/crates/turbo-tasks-memory/src/lib.rs +++ b/crates/turbo-tasks-memory/src/lib.rs @@ -9,16 +9,13 @@ mod aggregation; mod cell; -mod concurrent_priority_queue; mod count_hash_set; mod gc; mod map_guard; mod memory_backend; mod memory_backend_with_pg; mod output; -pub mod stats; mod task; -pub mod viz; pub use memory_backend::MemoryBackend; pub use memory_backend_with_pg::MemoryBackendWithPersistedGraph; diff --git a/crates/turbo-tasks-memory/src/memory_backend.rs b/crates/turbo-tasks-memory/src/memory_backend.rs index 612359852b7ad..a2822316d400c 100644 --- a/crates/turbo-tasks-memory/src/memory_backend.rs +++ b/crates/turbo-tasks-memory/src/memory_backend.rs @@ -1,7 +1,6 @@ use std::{ borrow::{Borrow, Cow}, cell::RefCell, - cmp::min, future::Future, hash::{BuildHasher, BuildHasherDefault, Hash}, pin::Pin, @@ -9,7 +8,7 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, - time::{Duration, Instant}, + time::Duration, }; use anyhow::{bail, Result}; @@ -30,7 +29,7 @@ use turbo_tasks::{ use crate::{ cell::RecomputingCell, - gc::GcQueue, + gc::{GcQueue, PERCENTAGE_IDLE_TARGET_MEMORY, PERCENTAGE_TARGET_MEMORY}, output::Output, task::{Task, TaskDependency, TaskDependencySet, DEPENDENCIES_TO_TRACK}, }; @@ -89,6 +88,10 @@ impl MemoryBackend { id } + pub(crate) fn has_gc(&self) -> bool { + self.gc_queue.is_some() + } + fn try_get_output Result>( &self, id: TaskId, @@ -118,54 +121,40 @@ impl MemoryBackend { self.memory_tasks.get(*id as usize).unwrap() } - pub fn on_task_might_become_inactive(&self, task: TaskId) { - if let Some(gc_queue) = &self.gc_queue { - gc_queue.task_might_become_inactive(task); - } - } - - pub fn on_task_flagged_inactive(&self, task: TaskId, compute_duration: Duration) { + /// Runs the garbage collection until reaching the target memory. An `idle` + /// garbage collection has a lower target memory. Returns true, when + /// memory was collected. + pub fn run_gc( + &self, + idle: bool, + _turbo_tasks: &dyn TurboTasksBackendApi, + ) -> bool { if let Some(gc_queue) = &self.gc_queue { - gc_queue.task_flagged_inactive(task, compute_duration); - } - } + let mut did_something = false; + loop { + let mem_limit = self.memory_limit; - pub fn run_gc(&self, idle: bool, turbo_tasks: &dyn TurboTasksBackendApi) { - if let Some(gc_queue) = &self.gc_queue { - const MAX_COLLECT_FACTOR: u8 = u8::MAX / 8; - - let mem_limit = self.memory_limit; - - let usage = turbo_tasks_malloc::TurboMalloc::memory_usage(); - let target = if idle { - mem_limit * 3 / 4 - } else { - mem_limit * 7 / 8 - }; - if usage < target { - if idle { - // Always run propagation when idle - gc_queue.run_gc(0, self, turbo_tasks); + let usage = turbo_tasks_malloc::TurboMalloc::memory_usage(); + let target = if idle { + mem_limit * PERCENTAGE_IDLE_TARGET_MEMORY / 100 + } else { + mem_limit * PERCENTAGE_TARGET_MEMORY / 100 + }; + if usage < target { + return did_something; } - return; - } - - let collect_factor = min( - MAX_COLLECT_FACTOR as usize, - (usage - target) * u8::MAX as usize / (mem_limit - target), - ) as u8; - let collected = gc_queue.run_gc(collect_factor, self, turbo_tasks); + let collected = gc_queue.run_gc(self); - if idle { - if let Some((_collected, _count, _stats)) = collected { - let job = self.create_backend_job(Job::GarbageCollection); - turbo_tasks.schedule_backend_background_job(job); - } else { - self.idle_gc_active.store(false, Ordering::Release); + // Collecting less than 100 tasks is not worth it + if !collected.map_or(false, |(_, count)| count > 100) { + return true; } + + did_something = true; } } + false } fn insert_and_connect_fresh_task( @@ -320,18 +309,29 @@ impl Backend for MemoryBackend { &self, task_id: TaskId, duration: Duration, - instant: Instant, memory_usage: usize, stateful: bool, turbo_tasks: &dyn TurboTasksBackendApi, ) -> bool { + let generation = if let Some(gc_queue) = &self.gc_queue { + gc_queue.generation() + } else { + 0 + }; let reexecute = self.with_task(task_id, |task| { - task.execution_completed(duration, instant, memory_usage, stateful, self, turbo_tasks) + task.execution_completed( + duration, + memory_usage, + generation, + stateful, + self, + turbo_tasks, + ) }); if !reexecute { - self.run_gc(false, turbo_tasks); if let Some(gc_queue) = &self.gc_queue { - gc_queue.task_executed(task_id, duration); + gc_queue.task_executed(task_id); + self.run_gc(false, turbo_tasks); } } reexecute @@ -388,7 +388,7 @@ impl Backend for MemoryBackend { } else { Task::add_dependency_to_current(TaskDependency::Cell(task_id, index)); self.with_task(task_id, |task| { - match task.with_cell_mut(index, |cell| { + match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell| { cell.read_content( reader, move || format!("{task_id} {index}"), @@ -425,7 +425,7 @@ impl Backend for MemoryBackend { turbo_tasks: &dyn TurboTasksBackendApi, ) -> Result> { self.with_task(task_id, |task| { - match task.with_cell_mut(index, |cell| { + match task.with_cell_mut(index, self.gc_queue.as_ref(), |cell| { cell.read_content_untracked( move || format!("{task_id}"), move || format!("reading {} {} untracked", task_id, index), @@ -485,7 +485,9 @@ impl Backend for MemoryBackend { turbo_tasks: &dyn TurboTasksBackendApi, ) { self.with_task(task, |task| { - task.with_cell_mut(index, |cell| cell.assign(content, turbo_tasks)) + task.with_cell_mut(index, self.gc_queue.as_ref(), |cell| { + cell.assign(content, turbo_tasks) + }) }) } @@ -532,7 +534,6 @@ impl Backend for MemoryBackend { // control of the task *unsafe { id.get_unchecked() }, task_type.clone(), - turbo_tasks.stats_type(), ); self.insert_and_connect_fresh_task( parent_task, @@ -568,17 +569,16 @@ impl Backend for MemoryBackend { turbo_tasks: &dyn TurboTasksBackendApi, ) -> TaskId { let id = turbo_tasks.get_fresh_task_id(); - let stats_type = turbo_tasks.stats_type(); let id = id.into(); match task_type { TransientTaskType::Root(f) => { - let task = Task::new_root(id, move || f() as _, stats_type); + let task = Task::new_root(id, move || f() as _); // SAFETY: We have a fresh task id where nobody knows about yet unsafe { self.memory_tasks.insert(*id as usize, task) }; Task::set_root(id, self, turbo_tasks); } TransientTaskType::Once(f) => { - let task = Task::new_once(id, f, stats_type); + let task = Task::new_once(id, f); // SAFETY: We have a fresh task id where nobody knows about yet unsafe { self.memory_tasks.insert(*id as usize, task) }; Task::set_once(id, self, turbo_tasks); @@ -608,7 +608,12 @@ impl Job { match self { Job::GarbageCollection => { let _guard = trace_span!("Job::GarbageCollection").entered(); - backend.run_gc(true, turbo_tasks); + if backend.run_gc(true, turbo_tasks) { + let job = backend.create_backend_job(Job::GarbageCollection); + turbo_tasks.schedule_backend_background_job(job); + } else { + backend.idle_gc_active.store(false, Ordering::Release); + } } } } diff --git a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs index 07d2305d7e571..a8fa8104b8af2 100644 --- a/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs +++ b/crates/turbo-tasks-memory/src/memory_backend_with_pg.rs @@ -9,7 +9,7 @@ use std::{ atomic::{AtomicU32, AtomicUsize, Ordering}, Mutex, MutexGuard, }, - time::{Duration, Instant}, + time::Duration, }; use anyhow::{anyhow, Result}; @@ -1153,7 +1153,6 @@ impl Backend for MemoryBackendWithPersistedGraph

{ &self, task: TaskId, duration: Duration, - _instant: Instant, _memory_usage: usize, _stateful: bool, turbo_tasks: &dyn TurboTasksBackendApi>, diff --git a/crates/turbo-tasks-memory/src/output.rs b/crates/turbo-tasks-memory/src/output.rs index 443b5d5e2d267..4b74dfe9de9e9 100644 --- a/crates/turbo-tasks-memory/src/output.rs +++ b/crates/turbo-tasks-memory/src/output.rs @@ -95,10 +95,6 @@ impl Output { } } - pub fn dependent_tasks(&self) -> &TaskIdSet { - &self.dependent_tasks - } - pub fn gc_drop(self, turbo_tasks: &dyn TurboTasksBackendApi) { // notify if !self.dependent_tasks.is_empty() { diff --git a/crates/turbo-tasks-memory/src/stats.rs b/crates/turbo-tasks-memory/src/stats.rs deleted file mode 100644 index 83e65d456fd77..0000000000000 --- a/crates/turbo-tasks-memory/src/stats.rs +++ /dev/null @@ -1,337 +0,0 @@ -use std::{ - cmp::{ - max, {self}, - }, - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, - fmt::Display, - mem::take, - time::Duration, -}; - -use turbo_tasks::{registry, FunctionId, TaskId, TraitTypeId}; - -use crate::{ - task::{Task, TaskStatsInfo}, - MemoryBackend, -}; - -pub struct StatsReferences { - pub tasks: Vec<(ReferenceType, TaskId)>, -} - -#[derive(PartialEq, Eq, Hash, Clone, Debug)] -pub enum StatsTaskType { - Root(TaskId), - Once(TaskId), - Native(FunctionId), - ResolveNative(FunctionId), - ResolveTrait(TraitTypeId, String), - Collectibles(TraitTypeId), -} - -impl Display for StatsTaskType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - StatsTaskType::Root(_) => write!(f, "root"), - StatsTaskType::Once(_) => write!(f, "once"), - StatsTaskType::Collectibles(t) => { - write!(f, "read collectibles {}", registry::get_trait(*t).name) - } - StatsTaskType::Native(nf) => write!(f, "{}", registry::get_function(*nf).name), - StatsTaskType::ResolveNative(nf) => { - write!(f, "resolve {}", registry::get_function(*nf).name) - } - StatsTaskType::ResolveTrait(t, n) => { - write!(f, "resolve trait {}::{}", registry::get_trait(*t).name, n) - } - } - } -} - -#[derive(Default, Clone, Debug)] -pub struct ReferenceStats { - pub count: usize, -} - -#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy)] -pub enum ReferenceType { - Child, - Dependency, - Input, -} - -#[derive(Clone, Debug)] -pub struct ExportedTaskStats { - pub count: usize, - pub unloaded_count: usize, - pub executions: Option, - pub total_duration: Option, - pub total_current_duration: Duration, - pub total_update_duration: Duration, - pub max_duration: Duration, - pub references: HashMap<(ReferenceType, StatsTaskType), ReferenceStats>, -} - -impl Default for ExportedTaskStats { - fn default() -> Self { - Self { - count: 0, - unloaded_count: 0, - executions: None, - total_duration: None, - total_current_duration: Duration::ZERO, - total_update_duration: Duration::ZERO, - max_duration: Duration::ZERO, - references: Default::default(), - } - } -} - -pub struct Stats { - tasks: HashMap, -} - -impl Default for Stats { - fn default() -> Self { - Self::new() - } -} - -impl Stats { - pub fn new() -> Self { - Self { - tasks: Default::default(), - } - } - - pub fn add(&mut self, backend: &MemoryBackend, task: &Task) { - self.add_conditional(backend, task, |_, _| true) - } - - pub fn add_conditional( - &mut self, - backend: &MemoryBackend, - task: &Task, - condition: impl FnOnce(&StatsTaskType, &TaskStatsInfo) -> bool, - ) { - let info = task.get_stats_info(backend); - let ty = task.get_stats_type(); - if !condition(&ty, &info) { - return; - } - let TaskStatsInfo { - total_duration, - last_duration, - executions, - unloaded, - } = info; - let stats = self.tasks.entry(ty).or_default(); - stats.count += 1; - if let Some(total_duration) = total_duration { - *stats.total_duration.get_or_insert(Duration::ZERO) += total_duration; - } - if unloaded { - stats.unloaded_count += 1 - } - stats.total_current_duration += last_duration; - if executions.map(|executions| executions > 1).unwrap_or(true) { - stats.total_update_duration += last_duration; - } - stats.max_duration = max(stats.max_duration, last_duration); - if let Some(executions) = executions { - *stats.executions.get_or_insert(0) += executions; - } - - let StatsReferences { tasks, .. } = task.get_stats_references(); - let set: HashSet<_> = tasks.into_iter().collect(); - for (ref_type, task) in set { - backend.with_task(task, |task| { - let ty = task.get_stats_type(); - let ref_stats = stats.references.entry((ref_type, ty)).or_default(); - ref_stats.count += 1; - }) - } - } - - pub fn add_id(&mut self, backend: &MemoryBackend, id: TaskId) { - backend.with_task(id, |task| { - self.add(backend, task); - }); - } - - pub fn add_id_conditional( - &mut self, - backend: &MemoryBackend, - id: TaskId, - condition: impl FnOnce(&StatsTaskType, &TaskStatsInfo) -> bool, - ) { - backend.with_task(id, |task| { - self.add_conditional(backend, task, condition); - }); - } - - pub fn merge_resolve(&mut self) { - self.merge(|ty, _stats| match ty { - StatsTaskType::Root(_) - | StatsTaskType::Once(_) - | StatsTaskType::Native(_) - | StatsTaskType::Collectibles(..) => false, - StatsTaskType::ResolveNative(_) | StatsTaskType::ResolveTrait(_, _) => true, - }) - } - - pub fn merge(&mut self, mut select: impl FnMut(&StatsTaskType, &ExportedTaskStats) -> bool) { - let merged: HashMap<_, _> = self - .tasks - .extract_if(|ty, stats| select(ty, stats)) - .collect(); - - for stats in self.tasks.values_mut() { - fn merge_refs( - refs: HashMap<(ReferenceType, StatsTaskType), ReferenceStats>, - merged: &HashMap, - ) -> HashMap<(ReferenceType, StatsTaskType), ReferenceStats> { - refs.into_iter() - .flat_map(|((ref_ty, ty), stats)| { - if let Some(merged_stats) = merged.get(&ty) { - if ref_ty == ReferenceType::Child { - merge_refs(merged_stats.references.clone(), merged) - .into_iter() - .map(|((ref_ty, ty), _)| ((ref_ty, ty), stats.clone())) - .collect() - } else { - vec![] - } - } else { - vec![((ref_ty, ty), stats)] - } - }) - .collect() - } - stats.references = merge_refs(take(&mut stats.references), &merged); - } - } - - pub fn treeify(&self, tree_ref_type: ReferenceType) -> GroupTree { - let mut incoming_references_count = self - .tasks - .keys() - .map(|ty| (ty, 0)) - .collect::>(); - for stats in self.tasks.values() { - for (ref_type, ty) in stats.references.keys() { - if ref_type == &tree_ref_type { - *incoming_references_count.entry(ty).or_default() += 1; - } - } - } - let mut root_queue = incoming_references_count.into_iter().collect::>(); - root_queue.sort_by_key(|(_, c)| *c); - - let mut task_placement: HashMap<&StatsTaskType, Option<&StatsTaskType>> = HashMap::new(); - fn get_path<'a>( - ty: Option<&'a StatsTaskType>, - task_placement: &HashMap<&'a StatsTaskType, Option<&'a StatsTaskType>>, - ) -> Vec<&'a StatsTaskType> { - if let Some(mut ty) = ty { - let mut path = vec![ty]; - while let Some(parent) = task_placement[ty] { - ty = parent; - path.push(ty); - } - path.reverse(); - path - } else { - Vec::new() - } - } - fn find_common<'a>( - p1: Vec<&'a StatsTaskType>, - p2: Vec<&'a StatsTaskType>, - ) -> Option<&'a StatsTaskType> { - let mut i = cmp::min(p1.len(), p2.len()); - loop { - if i == 0 { - return None; - } - i -= 1; - if p1[i] == p2[i] { - return Some(p1[i]); - } - } - } - for (root, _) in root_queue.into_iter() { - if task_placement.contains_key(root) { - continue; - } - let mut queue: VecDeque<(&StatsTaskType, Option<&StatsTaskType>)> = - [(root, None)].into_iter().collect(); - - while let Some((ty, placement)) = queue.pop_front() { - match task_placement.entry(ty) { - Entry::Occupied(e) => { - let current_placement = *e.get(); - if placement != current_placement { - let new_placement = find_common( - get_path(placement, &task_placement), - get_path(current_placement, &task_placement), - ); - task_placement.insert(ty, new_placement); - } - } - Entry::Vacant(e) => { - if let Some(task) = self.tasks.get(ty) { - e.insert(placement); - - for (ref_type, child_ty) in task.references.keys() { - if ref_type == &tree_ref_type { - queue.push_back((child_ty, Some(ty))); - } - } - } - } - } - } - } - - let mut children: HashMap, Vec<&StatsTaskType>> = HashMap::new(); - for (child, parent) in task_placement { - children.entry(parent).or_default().push(child); - } - - fn into_group<'a>( - tasks: &HashMap, - children: &HashMap, Vec<&'a StatsTaskType>>, - ty: Option<&'a StatsTaskType>, - ) -> GroupTree { - let inner = &children[&ty]; - let inner_with_children = inner.iter().filter(|c| children.contains_key(&Some(*c))); - let leafs = inner.iter().filter(|c| !children.contains_key(&Some(*c))); - let task_types: Vec<_> = leafs.map(|&ty| (ty.clone(), tasks[ty].clone())).collect(); - GroupTree { - primary: ty.map(|ty| (ty.clone(), tasks[ty].clone())), - children: inner_with_children - .map(|ty| into_group(tasks, children, Some(ty))) - .collect(), - task_types, - } - } - - if children.is_empty() { - GroupTree { - primary: None, - children: Vec::new(), - task_types: Vec::new(), - } - } else { - into_group(&self.tasks, &children, None) - } - } -} - -#[derive(Debug)] -pub struct GroupTree { - pub primary: Option<(StatsTaskType, ExportedTaskStats)>, - pub children: Vec, - pub task_types: Vec<(StatsTaskType, ExportedTaskStats)>, -} diff --git a/crates/turbo-tasks-memory/src/task.rs b/crates/turbo-tasks-memory/src/task.rs index 3dba4c3657875..cd9e3cd4fca69 100644 --- a/crates/turbo-tasks-memory/src/task.rs +++ b/crates/turbo-tasks-memory/src/task.rs @@ -1,15 +1,13 @@ use std::{ borrow::Cow, cell::RefCell, - cmp::{max, Reverse}, - collections::{HashMap, HashSet}, fmt::{self, Debug, Display, Formatter}, future::Future, hash::{BuildHasherDefault, Hash}, mem::{replace, take}, pin::Pin, sync::{atomic::AtomicU32, Arc}, - time::{Duration, Instant}, + time::Duration, }; use anyhow::Result; @@ -18,14 +16,13 @@ use nohash_hasher::BuildNoHashHasher; use parking_lot::{Mutex, RwLock}; use rustc_hash::FxHasher; use smallvec::SmallVec; -use stats::TaskStats; use tokio::task_local; use tracing::Span; use turbo_tasks::{ backend::{PersistentTaskType, TaskExecutionSpec}, event::{Event, EventListener}, - get_invalidator, registry, CellId, Invalidator, NativeFunction, RawVc, StatsType, TaskId, - TaskIdSet, TraitType, TraitTypeId, TurboTasksBackendApi, ValueTypeId, + get_invalidator, registry, CellId, Invalidator, NativeFunction, RawVc, TaskId, TaskIdSet, + TraitType, TraitTypeId, TurboTasksBackendApi, ValueTypeId, }; use crate::{ @@ -34,9 +31,8 @@ use crate::{ AggregationDataGuard, PreparedOperation, }, cell::Cell, - gc::{to_exp_u8, GcPriority, GcStats, GcTaskState}, + gc::{GcQueue, GcTaskState}, output::{Output, OutputContent}, - stats::{ReferenceType, StatsReferences, StatsTaskType}, task::aggregation::{TaskAggregationContext, TaskChange}, MemoryBackend, }; @@ -46,7 +42,6 @@ pub type NativeTaskFn = Box NativeTaskFuture + Send + Sync>; mod aggregation; mod meta_state; -mod stats; #[derive(Hash, Copy, Clone, PartialEq, Eq)] pub enum TaskDependency { @@ -192,16 +187,10 @@ struct TaskState { // GC state: gc: GcTaskState, - - // Stats: - stats: TaskStats, } impl TaskState { - fn new( - description: impl Fn() -> String + Send + Sync + 'static, - stats_type: StatsType, - ) -> Self { + fn new(description: impl Fn() -> String + Send + Sync + 'static) -> Self { Self { aggregation_node: TaskAggregationNode::new(), state_type: Dirty { @@ -215,16 +204,12 @@ impl TaskState { prepared_type: PrepareTaskType::None, cells: Default::default(), gc: Default::default(), - stats: TaskStats::new(stats_type), #[cfg(feature = "track_wait_dependencies")] last_waiting_task: Default::default(), } } - fn new_scheduled( - description: impl Fn() -> String + Send + Sync + 'static, - stats_type: StatsType, - ) -> Self { + fn new_scheduled(description: impl Fn() -> String + Send + Sync + 'static) -> Self { Self { aggregation_node: TaskAggregationNode::new(), state_type: Scheduled { @@ -238,7 +223,6 @@ impl TaskState { prepared_type: PrepareTaskType::None, cells: Default::default(), gc: Default::default(), - stats: TaskStats::new(stats_type), #[cfg(feature = "track_wait_dependencies")] last_waiting_task: Default::default(), } @@ -251,14 +235,13 @@ impl TaskState { /// A Task can get into this state when it is unloaded by garbage collection, /// but is still attached to parents and aggregated. struct PartialTaskState { - stats_type: StatsType, - aggregation_leaf: TaskAggregationNode, + aggregation_node: TaskAggregationNode, } impl PartialTaskState { fn into_full(self, description: impl Fn() -> String + Send + Sync + 'static) -> TaskState { TaskState { - aggregation_node: self.aggregation_leaf, + aggregation_node: self.aggregation_node, state_type: Dirty { event: Event::new(move || format!("TaskState({})::event", description())), outdated_dependencies: Default::default(), @@ -270,7 +253,6 @@ impl PartialTaskState { output: Default::default(), cells: Default::default(), gc: Default::default(), - stats: TaskStats::new(self.stats_type), } } } @@ -279,9 +261,7 @@ impl PartialTaskState { /// being referenced by any parent. This state is stored inlined instead of in a /// [Box] to reduce the memory consumption. Make sure to not add more fields /// than the size of a [Box]. -struct UnloadedTaskState { - stats_type: StatsType, -} +struct UnloadedTaskState {} #[cfg(test)] #[test] @@ -304,14 +284,12 @@ impl UnloadedTaskState { output: Default::default(), cells: Default::default(), gc: Default::default(), - stats: TaskStats::new(self.stats_type), } } fn into_partial(self) -> PartialTaskState { PartialTaskState { - aggregation_leaf: TaskAggregationNode::new(), - stats_type: self.stats_type, + aggregation_node: TaskAggregationNode::new(), } } } @@ -414,7 +392,7 @@ enum TaskStateType { /// Invalid execution is happening /// - /// on finish this will move to Dirty or Scheduled depending on active flag + /// on finish this will move to Scheduled InProgressDirty { event: Event }, } @@ -428,20 +406,13 @@ use self::{ }; impl Task { - pub(crate) fn new_persistent( - id: TaskId, - task_type: Arc, - stats_type: StatsType, - ) -> Self { + pub(crate) fn new_persistent(id: TaskId, task_type: Arc) -> Self { let ty = TaskType::Persistent { ty: task_type }; let description = Self::get_event_description_static(id, &ty); Self { id, ty, - state: RwLock::new(TaskMetaState::Full(Box::new(TaskState::new( - description, - stats_type, - )))), + state: RwLock::new(TaskMetaState::Full(Box::new(TaskState::new(description)))), graph_modification_in_progress_counter: AtomicU32::new(0), } } @@ -449,7 +420,6 @@ impl Task { pub(crate) fn new_root( id: TaskId, functor: impl Fn() -> NativeTaskFuture + Sync + Send + 'static, - stats_type: StatsType, ) -> Self { let ty = TaskType::Root(Box::new(Box::new(functor))); let description = Self::get_event_description_static(id, &ty); @@ -458,7 +428,6 @@ impl Task { ty, state: RwLock::new(TaskMetaState::Full(Box::new(TaskState::new_scheduled( description, - stats_type, )))), graph_modification_in_progress_counter: AtomicU32::new(0), } @@ -467,7 +436,6 @@ impl Task { pub(crate) fn new_once( id: TaskId, functor: impl Future> + Send + 'static, - stats_type: StatsType, ) -> Self { let ty = TaskType::Once(Box::new(Mutex::new(Some(Box::pin(functor))))); let description = Self::get_event_description_static(id, &ty); @@ -476,7 +444,6 @@ impl Task { ty, state: RwLock::new(TaskMetaState::Full(Box::new(TaskState::new_scheduled( description, - stats_type, )))), graph_modification_in_progress_counter: AtomicU32::new(0), } @@ -724,7 +691,7 @@ impl Task { #[cfg(not(feature = "lazy_remove_children"))] { remove_job = state - .aggregation_leaf + .aggregation_node .remove_children_job(&aggregation_context, outdated_children); } state.state_type = InProgress { @@ -734,7 +701,6 @@ impl Task { outdated_children, outdated_collectibles, }; - state.stats.increment_executions(); } Dirty { .. } => { let state_type = Task::state_string(&state); @@ -960,8 +926,8 @@ impl Task { pub(crate) fn execution_completed( &self, duration: Duration, - instant: Instant, - _memory_usage: usize, + memory_usage: usize, + generation: u32, stateful: bool, backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, @@ -977,8 +943,8 @@ impl Task { let mut state = self.full_state_mut(); state - .stats - .register_execution(duration, turbo_tasks.program_duration_until(instant)); + .gc + .execution_completed(duration, memory_usage, generation); match state.state_type { InProgress { ref mut event, @@ -992,13 +958,15 @@ impl Task { let outdated_children = take(outdated_children); let outdated_collectibles = outdated_collectibles.take_collectibles(); let mut dependencies = take(&mut dependencies); - // This will stay here for longer, so make sure to not consume too much - // memory - dependencies.shrink_to_fit(); - for cells in state.cells.values_mut() { - cells.shrink_to_fit(); + if !backend.has_gc() { + // This will stay here for longer, so make sure to not consume too much + // memory + dependencies.shrink_to_fit(); + for cells in state.cells.values_mut() { + cells.shrink_to_fit(); + } + state.cells.shrink_to_fit(); } - state.cells.shrink_to_fit(); state.stateful = stateful; state.state_type = Done { dependencies }; if !count_as_finished { @@ -1285,6 +1253,7 @@ impl Task { backend: &MemoryBackend, turbo_tasks: &dyn TurboTasksBackendApi, ) { + let _span = tracing::trace_span!("turbo_tasks::recompute", id = *self.id).entered(); self.make_dirty_internal(true, backend, turbo_tasks) } @@ -1301,8 +1270,19 @@ impl Task { } /// Access to a cell. - pub(crate) fn with_cell_mut(&self, index: CellId, func: impl FnOnce(&mut Cell) -> T) -> T { + pub(crate) fn with_cell_mut( + &self, + index: CellId, + gc_queue: Option<&GcQueue>, + func: impl FnOnce(&mut Cell) -> T, + ) -> T { let mut state = self.full_state_mut(); + if let Some(gc_queue) = gc_queue { + let generation = gc_queue.generation(); + if state.gc.on_read(generation) { + gc_queue.task_accessed(self.id); + } + } let list = state.cells.entry(index.type_id).or_default(); let i = index.index as usize; if list.len() <= i { @@ -1337,13 +1317,6 @@ impl Task { } } - /// For testing purposes - pub fn reset_executions(&self) { - if let TaskMetaStateWriteGuard::Full(mut state) = self.state_mut() { - state.stats.reset_executions() - } - } - pub fn is_pending(&self) -> bool { if let TaskMetaStateReadGuard::Full(state) = self.state() { !matches!(state.state_type, TaskStateType::Done { .. }) @@ -1360,94 +1333,6 @@ impl Task { } } - pub fn reset_stats(&self) { - if let TaskMetaStateWriteGuard::Full(mut state) = self.state_mut() { - state.stats.reset(); - } - } - - pub fn get_stats_info(&self, _backend: &MemoryBackend) -> TaskStatsInfo { - match self.state() { - TaskMetaStateReadGuard::Full(state) => { - let (total_duration, last_duration, executions) = match &state.stats { - TaskStats::Essential(stats) => (None, stats.last_duration(), None), - TaskStats::Full(stats) => ( - Some(stats.total_duration()), - stats.last_duration(), - Some(stats.executions()), - ), - }; - - TaskStatsInfo { - total_duration, - last_duration, - executions, - unloaded: false, - } - } - TaskMetaStateReadGuard::Partial(_) => TaskStatsInfo { - total_duration: None, - last_duration: Duration::ZERO, - executions: None, - unloaded: true, - }, - TaskMetaStateReadGuard::Unloaded => TaskStatsInfo { - total_duration: None, - last_duration: Duration::ZERO, - executions: None, - unloaded: true, - }, - } - } - - pub fn get_stats_type(self: &Task) -> StatsTaskType { - match &self.ty { - TaskType::Root(_) => StatsTaskType::Root(self.id), - TaskType::Once(_) => StatsTaskType::Once(self.id), - TaskType::Persistent { ty, .. } => match &**ty { - PersistentTaskType::Native(f, _) => StatsTaskType::Native(*f), - PersistentTaskType::ResolveNative(f, _) => StatsTaskType::ResolveNative(*f), - PersistentTaskType::ResolveTrait(t, n, _) => { - StatsTaskType::ResolveTrait(*t, n.to_string()) - } - }, - } - } - - pub fn get_stats_references(&self) -> StatsReferences { - let mut refs = Vec::new(); - if let TaskMetaStateReadGuard::Full(state) = self.state() { - for child in state.children.iter() { - refs.push((ReferenceType::Child, *child)); - } - if let Done { ref dependencies } = state.state_type { - for dep in dependencies.iter() { - match dep { - TaskDependency::Output(task) - | TaskDependency::Cell(task, _) - | TaskDependency::Collectibles(task, _) => { - refs.push((ReferenceType::Dependency, *task)) - } - } - } - } - } - if let TaskType::Persistent { ty, .. } = &self.ty { - match &**ty { - PersistentTaskType::Native(_, inputs) - | PersistentTaskType::ResolveNative(_, inputs) - | PersistentTaskType::ResolveTrait(_, _, inputs) => { - for input in inputs.iter() { - if let Some(task) = input.get_task_id() { - refs.push((ReferenceType::Input, task)); - } - } - } - } - } - StatsReferences { tasks: refs } - } - fn state_string(state: &TaskState) -> &'static str { match state.state_type { Scheduled { .. } => "scheduled", @@ -1483,6 +1368,8 @@ impl Task { } = &mut state.state_type { if outdated_children.remove(&child_id) { + drop(state); + aggregation_context.apply_queued_updates(); return; } } @@ -1653,297 +1540,62 @@ impl Task { aggregation_context.apply_queued_updates(); } - pub(crate) fn gc_check_inactive(&self, backend: &MemoryBackend) { - if let TaskMetaStateWriteGuard::Full(mut state) = self.state_mut() { - if state.gc.inactive { - return; - } - state.gc.inactive = true; - backend.on_task_flagged_inactive(self.id, state.stats.last_duration()); - for &child in state.children.iter() { - backend.on_task_might_become_inactive(child); - } - } - } - - pub(crate) fn run_gc( - &self, - now_relative_to_start: Duration, - max_priority: GcPriority, - task_duration_cache: &mut HashMap>, - stats: &mut GcStats, - backend: &MemoryBackend, - turbo_tasks: &dyn TurboTasksBackendApi, - ) -> Option { + pub(crate) fn run_gc(&self, generation: u32) -> bool { if !self.is_pure() { - stats.no_gc_needed += 1; - return None; + return false; } + let mut cells_to_drop = Vec::new(); - // We don't want to access other tasks under this task lock, so we aggregate - // missing information first, gather it and then retry. - let mut missing_durations = Vec::new(); - loop { - // This might be slightly inaccurate as we don't hold the lock for the whole - // duration so it might be too large when concurrent modifications - // happen, but that's fine. - let mut dependent_tasks_compute_duration = Duration::ZERO; - let mut included_tasks = HashSet::with_hasher(BuildNoHashHasher::::default()); - // Fill up missing durations - for task_id in missing_durations.drain(..) { - backend.with_task(task_id, |task| { - let duration = task.gc_compute_duration(); - task_duration_cache.insert(task_id, duration); - dependent_tasks_compute_duration += duration; - }) - } - let aggregation_context = TaskAggregationContext::new(turbo_tasks, backend); - let active = query_root_info(&aggregation_context, ActiveQuery::default(), self.id); + let result = if let TaskMetaStateWriteGuard::Full(mut state) = self.state_mut() { + if state.gc.generation > generation || state.stateful { + return false; + } - if let TaskMetaStateWriteGuard::Full(mut state) = self.state_mut() { - if state.stateful { - stats.no_gc_possible += 1; - return None; + match &mut state.state_type { + TaskStateType::Done { dependencies } => { + dependencies.shrink_to_fit(); } - match &mut state.state_type { - TaskStateType::Done { dependencies } => { - dependencies.shrink_to_fit(); - } - TaskStateType::Dirty { .. } => {} - _ => { - // GC can't run in this state. We will reschedule it when the execution - // completes. - stats.no_gc_needed += 1; - return None; - } - } - - // Check if the task need to be activated again - if state.gc.inactive && active { - state.gc.inactive = false; + TaskStateType::Dirty { .. } => {} + _ => { + // GC can't run in this state. We will reschedule it when the execution + // completes. + return false; } + } - let last_duration = state.stats.last_duration(); - let compute_duration = last_duration.into(); - - let age = to_exp_u8( - (now_relative_to_start - .saturating_sub(state.stats.last_execution_relative_to_start())) - .as_secs(), - ); - - let min_prio_that_needs_total_duration = if active { - GcPriority::EmptyCells { - total_compute_duration: to_exp_u8(last_duration.as_millis() as u64), - age: Reverse(age), - } - } else { - GcPriority::InactiveUnload { - total_compute_duration: to_exp_u8(last_duration.as_millis() as u64), - age: Reverse(age), - } - }; - - let need_total_duration = max_priority >= min_prio_that_needs_total_duration; - let has_unused_cells = state.cells.values().any(|cells| { - cells - .iter() - .any(|cell| cell.has_value() && !cell.has_dependent_tasks()) - }); - - let empty_unused_priority = if active { - GcPriority::EmptyUnusedCells { compute_duration } - } else { - GcPriority::InactiveEmptyUnusedCells { compute_duration } - }; - - if !need_total_duration { - // Fast mode, no need for total duration - - if has_unused_cells { - if empty_unused_priority <= max_priority { - // Empty unused cells - for cells in state.cells.values_mut() { - cells.shrink_to_fit(); - for cell in cells.iter_mut() { - if !cell.has_dependent_tasks() { - cells_to_drop.extend(cell.gc_content()); - } - cell.shrink_to_fit(); - } - } - stats.empty_unused_fast += 1; - return Some(GcPriority::EmptyCells { - total_compute_duration: to_exp_u8( - Duration::from(compute_duration).as_millis() as u64, - ), - age: Reverse(age), - }); - } else { - stats.priority_updated_fast += 1; - return Some(empty_unused_priority); - } - } else if active { - stats.priority_updated += 1; - return Some(GcPriority::EmptyCells { - total_compute_duration: to_exp_u8( - Duration::from(compute_duration).as_millis() as u64, - ), - age: Reverse(age), - }); - } else { - stats.priority_updated += 1; - return Some(GcPriority::InactiveUnload { - total_compute_duration: to_exp_u8( - Duration::from(compute_duration).as_millis() as u64, - ), - age: Reverse(age), - }); - } - } else { - // Slow mode, need to compute total duration - - let mut has_used_cells = false; - for cells in state.cells.values_mut() { - for cell in cells.iter_mut() { - if cell.has_value() && cell.has_dependent_tasks() { - has_used_cells = true; - for &task_id in cell.dependent_tasks() { - if included_tasks.insert(task_id) { - if let Some(duration) = task_duration_cache.get(&task_id) { - dependent_tasks_compute_duration += *duration; - } else { - missing_durations.push(task_id); - } - } - } - } - } - } - - if !active { - for &task_id in state.output.dependent_tasks() { - if included_tasks.insert(task_id) { - if let Some(duration) = task_duration_cache.get(&task_id) { - dependent_tasks_compute_duration += *duration; - } else { - missing_durations.push(task_id); - } - } - } - } - - let total_compute_duration = - max(last_duration, dependent_tasks_compute_duration); - let total_compute_duration_u8 = - to_exp_u8(total_compute_duration.as_millis() as u64); - - // When we have all information available, we can either run the GC or return a - // new GC priority. - if missing_durations.is_empty() { - let mut new_priority = GcPriority::Placeholder; - if !active { - new_priority = GcPriority::InactiveUnload { - age: Reverse(age), - total_compute_duration: total_compute_duration_u8, - }; - if new_priority <= max_priority { - // Unload task - if self.unload(state, backend, turbo_tasks) { - stats.unloaded += 1; - return None; - } else { - // unloading will fail if the task go active again - return Some(GcPriority::EmptyCells { - total_compute_duration: total_compute_duration_u8, - age: Reverse(age), - }); - } - } - } - - // always shrinking memory - state.output.dependent_tasks.shrink_to_fit(); - if active && (has_unused_cells || has_used_cells) { - new_priority = GcPriority::EmptyCells { - total_compute_duration: total_compute_duration_u8, - age: Reverse(age), - }; - if new_priority <= max_priority { - // Empty cells - let cells = take(&mut state.cells); - for cells in cells.into_values() { - for mut cell in cells { - if cell.has_value() { - cells_to_drop.extend(cell.gc_content()); - } - } - } - stats.empty_cells += 1; - return None; - } - } - - // always shrinking memory - state.cells.shrink_to_fit(); - if has_unused_cells && active { - new_priority = empty_unused_priority; - if new_priority <= max_priority { - // Empty unused cells - for cells in state.cells.values_mut() { - cells.shrink_to_fit(); - for cell in cells.iter_mut() { - if !cell.has_dependent_tasks() { - cells_to_drop.extend(cell.gc_content()); - } - cell.shrink_to_fit(); - } - } - stats.empty_unused += 1; - return Some(GcPriority::EmptyCells { - total_compute_duration: total_compute_duration_u8, - age: Reverse(age), - }); - } - } - - // Shrink memory - for cells in state.cells.values_mut() { - cells.shrink_to_fit(); - for cell in cells.iter_mut() { - cell.shrink_to_fit(); - } - } - - // Return new gc priority if any - if new_priority != GcPriority::Placeholder { - stats.priority_updated += 1; - - return Some(new_priority); - } else { - stats.no_gc_needed += 1; - - return None; - } + // shrinking memory and dropping cells + state.output.dependent_tasks.shrink_to_fit(); + state.cells.shrink_to_fit(); + for cells in state.cells.values_mut() { + cells.shrink_to_fit(); + for cell in cells.iter_mut() { + if cell.has_value() { + cells_to_drop.extend(cell.gc_content()); } + cell.shrink_to_fit(); } - } else { - // Task is already unloaded, we are done with GC for it - stats.no_gc_needed += 1; - return None; } - } + true + } else { + false + }; + + drop(cells_to_drop); + + result } - pub(crate) fn gc_compute_duration(&self) -> Duration { + pub(crate) fn gc_state(&self) -> Option { if let TaskMetaStateReadGuard::Full(state) = self.state() { - state.stats.last_duration() + Some(state.gc) } else { - Duration::ZERO + None } } + // TODO not used yet, but planned + #[allow(dead_code)] fn unload( &self, mut full_state: FullTaskWriteGuard<'_>, @@ -1954,7 +1606,7 @@ impl Task { let mut clear_dependencies = None; let mut change_job = None; let TaskState { - aggregation_node: ref mut aggregation_leaf, + ref mut aggregation_node, ref mut state_type, .. } = *full_state; @@ -1962,7 +1614,7 @@ impl Task { Done { ref mut dependencies, } => { - change_job = aggregation_leaf.apply_change( + change_job = aggregation_node.apply_change( &aggregation_context, TaskChange { unfinished: 1, @@ -1979,10 +1631,7 @@ impl Task { // We want to get rid of this Event, so notify it to make sure it's empty. event.notify(usize::MAX); if !outdated_dependencies.is_empty() { - // TODO we need to find a way to handle this case without introducting a race - // condition - - return false; + clear_dependencies = Some(take(outdated_dependencies)); } } _ => { @@ -1996,9 +1645,7 @@ impl Task { let old_state = replace( &mut *state, // placeholder - TaskMetaState::Unloaded(UnloadedTaskState { - stats_type: StatsType::Essential, - }), + TaskMetaState::Unloaded(UnloadedTaskState {}), ); let TaskState { children, @@ -2006,7 +1653,6 @@ impl Task { output, collectibles, mut aggregation_node, - stats, // can be dropped as it will be recomputed on next execution stateful: _, // can be dropped as it can be recomputed @@ -2039,20 +1685,13 @@ impl Task { None }; - // TODO aggregation_leaf + // TODO aggregation_node let unset = false; - let stats_type = match stats { - TaskStats::Essential(_) => StatsType::Essential, - TaskStats::Full(_) => StatsType::Full, - }; if unset { - *state = TaskMetaState::Unloaded(UnloadedTaskState { stats_type }); + *state = TaskMetaState::Unloaded(UnloadedTaskState {}); } else { - *state = TaskMetaState::Partial(Box::new(PartialTaskState { - aggregation_leaf: aggregation_node, - stats_type, - })); + *state = TaskMetaState::Partial(Box::new(PartialTaskState { aggregation_node })); } drop(state); @@ -2070,7 +1709,9 @@ impl Task { } output.gc_drop(turbo_tasks); - // We can clear the dependencies as we are already marked as dirty + // TODO This is a race condition, the task might be executed again while + // removing dependencies We can clear the dependencies as we are already + // marked as dirty if let Some(dependencies) = clear_dependencies { self.clear_dependencies(dependencies, backend, turbo_tasks); } @@ -2109,10 +1750,3 @@ impl PartialEq for Task { } impl Eq for Task {} - -pub struct TaskStatsInfo { - pub total_duration: Option, - pub last_duration: Duration, - pub executions: Option, - pub unloaded: bool, -} diff --git a/crates/turbo-tasks-memory/src/task/aggregation.rs b/crates/turbo-tasks-memory/src/task/aggregation.rs index 088beb941379e..f790436f8bf0f 100644 --- a/crates/turbo-tasks-memory/src/task/aggregation.rs +++ b/crates/turbo-tasks-memory/src/task/aggregation.rs @@ -432,8 +432,9 @@ impl<'l> Deref for TaskGuard<'l> { fn deref(&self) -> &Self::Target { match self.guard { TaskMetaStateWriteGuard::Full(ref guard) => &guard.aggregation_node, - TaskMetaStateWriteGuard::Partial(ref guard) => &guard.aggregation_leaf, + TaskMetaStateWriteGuard::Partial(ref guard) => &guard.aggregation_node, TaskMetaStateWriteGuard::Unloaded(_) => unreachable!(), + TaskMetaStateWriteGuard::TemporaryFiller => unreachable!(), } } } @@ -442,8 +443,9 @@ impl<'l> DerefMut for TaskGuard<'l> { fn deref_mut(&mut self) -> &mut Self::Target { match self.guard { TaskMetaStateWriteGuard::Full(ref mut guard) => &mut guard.aggregation_node, - TaskMetaStateWriteGuard::Partial(ref mut guard) => &mut guard.aggregation_leaf, + TaskMetaStateWriteGuard::Partial(ref mut guard) => &mut guard.aggregation_node, TaskMetaStateWriteGuard::Unloaded(_) => unreachable!(), + TaskMetaStateWriteGuard::TemporaryFiller => unreachable!(), } } } @@ -483,6 +485,7 @@ impl<'l> AggregationNodeGuard for TaskGuard<'l> { TaskMetaStateWriteGuard::Partial(_) | TaskMetaStateWriteGuard::Unloaded(_) => { None.into_iter().flatten() } + TaskMetaStateWriteGuard::TemporaryFiller => unreachable!(), } } @@ -528,6 +531,7 @@ impl<'l> AggregationNodeGuard for TaskGuard<'l> { } } TaskMetaStateWriteGuard::Partial(_) | TaskMetaStateWriteGuard::Unloaded(_) => None, + TaskMetaStateWriteGuard::TemporaryFiller => unreachable!(), } } @@ -573,6 +577,7 @@ impl<'l> AggregationNodeGuard for TaskGuard<'l> { } } TaskMetaStateWriteGuard::Partial(_) | TaskMetaStateWriteGuard::Unloaded(_) => None, + TaskMetaStateWriteGuard::TemporaryFiller => unreachable!(), } } diff --git a/crates/turbo-tasks-memory/src/task/meta_state.rs b/crates/turbo-tasks-memory/src/task/meta_state.rs index 381a81e90a274..9d38c49eb6a82 100644 --- a/crates/turbo-tasks-memory/src/task/meta_state.rs +++ b/crates/turbo-tasks-memory/src/task/meta_state.rs @@ -1,10 +1,12 @@ use std::mem::replace; use parking_lot::{RwLockReadGuard, RwLockWriteGuard}; -use turbo_tasks::StatsType; use super::{PartialTaskState, Task, TaskState, UnloadedTaskState}; -use crate::map_guard::{ReadGuard, WriteGuard}; +use crate::{ + aggregation::AggregationNode, + map_guard::{ReadGuard, WriteGuard}, +}; pub(super) enum TaskMetaState { Full(Box), @@ -118,6 +120,7 @@ pub(super) enum TaskMetaStateWriteGuard<'a> { TaskMetaStateAsUnloadedMut, >, ), + TemporaryFiller, } impl<'a> From> for TaskMetaStateReadGuard<'a> { @@ -176,9 +179,7 @@ impl<'a> TaskMetaStateWriteGuard<'a> { let partial = replace( &mut *guard, // placeholder - TaskMetaState::Unloaded(UnloadedTaskState { - stats_type: StatsType::Essential, - }), + TaskMetaState::Unloaded(UnloadedTaskState {}), ) .into_partial() .unwrap(); @@ -189,9 +190,7 @@ impl<'a> TaskMetaStateWriteGuard<'a> { let unloaded = replace( &mut *guard, // placeholder - TaskMetaState::Unloaded(UnloadedTaskState { - stats_type: StatsType::Essential, - }), + TaskMetaState::Unloaded(UnloadedTaskState {}), ) .into_unloaded() .unwrap(); @@ -219,9 +218,7 @@ impl<'a> TaskMetaStateWriteGuard<'a> { let unloaded = replace( &mut *guard, // placeholder - TaskMetaState::Unloaded(UnloadedTaskState { - stats_type: StatsType::Essential, - }), + TaskMetaState::Unloaded(UnloadedTaskState {}), ) .into_unloaded() .unwrap(); @@ -247,12 +244,26 @@ impl<'a> TaskMetaStateWriteGuard<'a> { TaskMetaStateWriteGuard::Full(state) => state.into_inner(), TaskMetaStateWriteGuard::Partial(state) => state.into_inner(), TaskMetaStateWriteGuard::Unloaded(state) => state.into_inner(), + TaskMetaStateWriteGuard::TemporaryFiller => unreachable!(), } } pub(super) fn ensure_at_least_partial(&mut self) { - if let TaskMetaStateWriteGuard::Unloaded(_state) = self { - todo!() + if matches!(self, TaskMetaStateWriteGuard::Unloaded(..)) { + let TaskMetaStateWriteGuard::Unloaded(state) = + replace(self, TaskMetaStateWriteGuard::TemporaryFiller) + else { + unreachable!(); + }; + let mut state = state.into_inner(); + *state = TaskMetaState::Partial(Box::new(PartialTaskState { + aggregation_node: AggregationNode::new(), + })); + *self = TaskMetaStateWriteGuard::Partial(WriteGuard::new( + state, + TaskMetaState::as_partial, + TaskMetaState::as_partial_mut, + )); } } } diff --git a/crates/turbo-tasks-memory/src/task/stats.rs b/crates/turbo-tasks-memory/src/task/stats.rs deleted file mode 100644 index 141a18a8205a2..0000000000000 --- a/crates/turbo-tasks-memory/src/task/stats.rs +++ /dev/null @@ -1,143 +0,0 @@ -use std::time::Duration; - -use turbo_tasks::{small_duration::SmallDuration, StatsType}; - -/// Keeps track of the number of times a task has been executed, and its -/// duration. -#[derive(Debug, Clone, Eq, PartialEq)] -pub enum TaskStats { - Essential(TaskStatsEssential), - Full(Box), -} - -impl TaskStats { - /// Creates a new [`TaskStats`]. - pub fn new(stats_type: StatsType) -> Self { - match stats_type { - turbo_tasks::StatsType::Essential => Self::Essential(TaskStatsEssential::default()), - turbo_tasks::StatsType::Full => Self::Full(Box::default()), - } - } - - /// Resets the number of executions to 1 only if it was greater than 1. - pub fn reset_executions(&mut self) { - if let Self::Full(stats) = self { - if stats.executions > 1 { - stats.executions = 1; - } - } - } - - /// Increments the number of executions by 1. - pub fn increment_executions(&mut self) { - if let Self::Full(stats) = self { - stats.executions += 1; - } - } - - /// Registers a task duration. - pub fn register_execution(&mut self, duration: Duration, duration_since_start: Duration) { - match self { - Self::Full(stats) => { - stats.total_duration += duration; - stats.last_duration = duration; - } - Self::Essential(stats) => { - stats.last_duration = duration.into(); - stats.last_execution_relative_to_start = duration_since_start.into(); - } - } - } - - /// Resets stats to their default, zero-value. - pub fn reset(&mut self) { - match self { - Self::Full(stats) => { - stats.executions = 0; - stats.total_duration = Duration::ZERO; - stats.last_duration = Duration::ZERO; - } - Self::Essential(stats) => { - stats.last_duration = SmallDuration::MIN; - stats.last_execution_relative_to_start = SmallDuration::MIN; - } - } - } - - /// Returns the last duration of the task. - pub fn last_duration(&self) -> Duration { - match self { - Self::Full(stats) => stats.last_duration(), - Self::Essential(stats) => stats.last_duration(), - } - } - - /// Returns the last execution of the task relative to the start of the - /// program. - pub fn last_execution_relative_to_start(&self) -> Duration { - match self { - Self::Full(stats) => stats.last_execution_relative_to_start(), - Self::Essential(stats) => stats.last_execution_relative_to_start(), - } - } -} - -#[derive(Debug, Default, Clone, Eq, PartialEq)] -pub struct TaskStatsEssential { - /// The last duration of the task, with a precision of 10 microseconds. - last_duration: SmallDuration<10_000>, - /// The last execution of the task relative to the start of the program, - /// with a precision of 1 millisecond. - last_execution_relative_to_start: SmallDuration<1_000_000>, -} - -impl TaskStatsEssential { - /// Returns the last duration of the task. - pub fn last_duration(&self) -> Duration { - self.last_duration.into() - } - - /// Returns the last execution of the task relative to the start of the - /// program. - #[allow(dead_code)] // NOTE(alexkirsz) This will be useful for GC. - pub fn last_execution_relative_to_start(&self) -> Duration { - self.last_execution_relative_to_start.into() - } -} - -#[derive(Debug, Default, Clone, Eq, PartialEq)] -pub struct TaskStatsFull { - /// The number of times the task has been executed. - executions: u32, - /// The last duration of the task. - last_duration: Duration, - /// The total duration of the task. - total_duration: Duration, - /// The last execution of the task relative to the start of the program, - /// with a precision of 1 millisecond. - last_execution_relative_to_start: SmallDuration<1_000_000>, -} - -impl TaskStatsFull { - /// Returns the number of times the task has been executed. - pub fn executions(&self) -> u32 { - self.executions - } - - /// Returns the last duration of the task. - pub fn last_duration(&self) -> Duration { - self.last_duration - } - - /// Returns the total duration of the task. - pub fn total_duration(&self) -> Duration { - self.total_duration - } - - /// Returns the last execution of the task relative to the start of the - /// program. - #[allow(dead_code)] // NOTE(alexkirsz) This will be useful for GC. - pub fn last_execution_relative_to_start(&self) -> Duration { - self.last_execution_relative_to_start.into() - } -} diff --git a/crates/turbo-tasks-memory/src/viz/graph.rs b/crates/turbo-tasks-memory/src/viz/graph.rs deleted file mode 100644 index 6677ecaae564a..0000000000000 --- a/crates/turbo-tasks-memory/src/viz/graph.rs +++ /dev/null @@ -1,293 +0,0 @@ -use turbo_tasks::StatsType; - -use super::*; - -pub fn wrap_html(graph: &str) -> String { - format!( - r#" - - - - -turbo-tasks graph - - - - -"#, - escape_in_template_str(graph) - ) -} - -struct GlobalData<'a> { - ids: HashMap<&'a StatsTaskType, usize>, - depths: HashMap<&'a StatsTaskType, usize>, - output: String, - edges: String, -} - -impl<'a> GlobalData<'a> { - fn get_id(&mut self, ty: &'a StatsTaskType) -> usize { - get_id(ty, &mut self.ids) - } -} - -pub fn visualize_stats_tree( - root: GroupTree, - tree_ref_type: ReferenceType, - stats_type: StatsType, -) -> String { - let max_values = get_max_values(&root); - let mut depths = HashMap::new(); - compute_depths(&root, 0, &mut depths); - let mut global_data = GlobalData { - ids: HashMap::new(), - depths, - output: "digraph {\nrankdir=LR\n\n".to_string(), - edges: String::new(), - }; - visualize_stats_tree_internal( - &root, - 0, - tree_ref_type, - &max_values, - &mut global_data, - stats_type, - ); - global_data.output += &global_data.edges; - global_data.output += "\n}"; - global_data.output -} - -fn visualize_stats_tree_internal<'a>( - node: &'a GroupTree, - depth: usize, - tree_ref_type: ReferenceType, - max_values: &MaxValues, - global_data: &mut GlobalData<'a>, - stats_type: StatsType, -) { - let GroupTree { - primary, - children, - task_types, - } = node; - if let Some((ty, stats)) = primary { - let id = global_data.get_id(ty); - let label = get_task_label(ty, stats, max_values, stats_type); - writeln!( - &mut global_data.output, - "subgraph cluster_{id} {{\ncolor=lightgray;" - ) - .unwrap(); - writeln!( - &mut global_data.output, - "task_{id} [shape=plaintext, label={label}]" - ) - .unwrap(); - visualize_stats_references_internal( - id, - stats.count, - &stats.references, - depth, - tree_ref_type, - global_data, - ); - } - for (ty, stats) in task_types.iter() { - let id = global_data.get_id(ty); - let label = get_task_label(ty, stats, max_values, stats_type); - writeln!( - &mut global_data.output, - "task_{id} [shape=plaintext, label={label}]" - ) - .unwrap(); - visualize_stats_references_internal( - id, - stats.count, - &stats.references, - depth, - tree_ref_type, - global_data, - ); - } - for child in children.iter() { - visualize_stats_tree_internal( - child, - depth + 1, - tree_ref_type, - max_values, - global_data, - stats_type, - ); - } - if primary.is_some() { - global_data.output.push_str("}\n"); - } -} - -fn visualize_stats_references_internal<'a>( - source_id: usize, - source_count: usize, - references: &'a HashMap<(ReferenceType, StatsTaskType), ReferenceStats>, - depth: usize, - tree_ref_type: ReferenceType, - global_data: &mut GlobalData<'a>, -) { - let mut far_types = Vec::new(); - for ((ref_ty, ty), stats) in references.iter() { - let target_id = global_data.get_id(ty); - let is_far = global_data - .depths - .get(ty) - .map(|d| *d < depth) - .unwrap_or(false); - if ref_ty == &tree_ref_type { - let label = get_child_label(ref_ty, stats, source_count); - if is_far { - far_types.push((ty, label)); - } else { - writeln!( - &mut global_data.edges, - "task_{source_id} -> task_{target_id} [style=dashed, color=lightgray, \ - label=\"{label}\"]" - ) - .unwrap(); - } - } - } - if !far_types.is_empty() { - if far_types.len() == 1 { - let (ty, label) = far_types.first().unwrap(); - let target_id = global_data.get_id(ty); - writeln!( - &mut global_data.output, - "far_task_{source_id}_{target_id} [label=\"{ty}\", style=dashed]" - ) - .unwrap(); - writeln!( - &mut global_data.edges, - "task_{source_id} -> far_task_{source_id}_{target_id} [style=dashed, \ - color=lightgray, label=\"{label}\"]" - ) - .unwrap(); - } else { - writeln!( - &mut global_data.output, - "far_tasks_{source_id} [label=\"{}\", style=dashed]", - escape_in_template_str( - &far_types - .iter() - .map(|(ty, label)| format!("{label} {ty}")) - .collect::>() - .join("\n") - ) - ) - .unwrap(); - writeln!( - &mut global_data.edges, - "task_{source_id} -> far_tasks_{source_id} [style=dashed, color=lightgray]" - ) - .unwrap(); - } - } -} - -fn get_task_label( - ty: &StatsTaskType, - stats: &ExportedTaskStats, - max_values: &MaxValues, - stats_type: StatsType, -) -> String { - let (total_millis, total_color) = if let Some((total_duration, max_total_duration)) = - stats.total_duration.zip(max_values.total_duration) - { - let total_color = as_color(as_frac( - total_duration.as_millis(), - max_total_duration.as_millis(), - )); - let total_millis = format!("{}ms", total_duration.as_millis()); - (total_millis, total_color) - } else { - ("N/A".to_string(), "#ffffff".to_string()) - }; - - let (avg_label, avg_color) = if let Some(((executions, total_duration), max_avg_duration)) = - stats - .executions - .zip(stats.total_duration) - .zip(max_values.avg_duration) - { - let avg_color = as_color(as_frac( - total_duration.as_micros() / (executions as u128), - max_avg_duration.as_micros(), - )); - let avg = (total_duration.as_micros() as u32 / executions) as f32 / 1000.0; - (format!("avg {}ms", avg), avg_color) - } else { - ("avg N/A".to_string(), "#ffffff".to_string()) - }; - let count = as_frac(stats.count, max_values.count); - let (updates_label, updates_color) = - if let Some((executions, max_updates)) = stats.executions.zip(max_values.updates) { - let updates_color = as_color(as_frac( - executions.saturating_sub(stats.count as u32), - max_updates as u32, - )); - let updates = executions - stats.count as u32; - (format!("{}", updates), updates_color) - } else { - ("N/A".to_string(), "#ffffff".to_string()) - }; - - let full_stats_disclaimer = if stats_type.is_full() { - "".to_string() - } else { - r##" - - - Full stats collection is disabled. Pass --full-stats to enable it. - - -"## - .to_string() - }; - - format!( - "< - - - - - - - - - - - - - {} -
{}
count{}+ {}
exec{}{}
>", - total_color, - escape_html(&ty.to_string()), - as_color(count), - stats.count, - updates_color, - updates_label, - total_color, - total_millis, - avg_color, - avg_label, - full_stats_disclaimer - ) -} diff --git a/crates/turbo-tasks-memory/src/viz/mod.rs b/crates/turbo-tasks-memory/src/viz/mod.rs deleted file mode 100644 index 8656c812c4811..0000000000000 --- a/crates/turbo-tasks-memory/src/viz/mod.rs +++ /dev/null @@ -1,236 +0,0 @@ -pub mod graph; -pub mod table; - -use std::{ - cmp::max, - collections::HashMap, - fmt::{Debug, Write}, - ops::{Div, Mul}, - time::Duration, -}; - -use turbo_tasks_hash::hash_xxh3_hash64; - -use crate::stats::{ExportedTaskStats, GroupTree, ReferenceStats, ReferenceType, StatsTaskType}; - -fn escape_in_template_str(s: &str) -> String { - s.replace('\\', "\\\\") - .replace('\"', "\\\"") - .replace('\n', "\\n") -} - -fn escape_html(s: &str) -> String { - s.replace('>', ">").replace('<', "<") -} - -fn get_id<'a>(ty: &'a StatsTaskType, ids: &mut HashMap<&'a StatsTaskType, usize>) -> usize { - let len = ids.len(); - *ids.entry(ty).or_insert(len) -} - -struct MaxValues { - pub total_duration: Option, - pub total_current_duration: Duration, - pub total_update_duration: Duration, - pub avg_duration: Option, - pub max_duration: Duration, - pub count: usize, - pub unloaded_count: usize, - pub updates: Option, - /// stored as dependencies * 100 - pub dependencies: usize, - /// stored as children * 100 - pub children: usize, - pub depth: u32, -} - -fn get_max_values(node: &GroupTree) -> MaxValues { - get_max_values_internal(0, node) -} - -pub fn get_avg_dependencies_count_times_100(stats: &ExportedTaskStats) -> usize { - stats - .references - .iter() - .filter(|((ty, _), _)| *ty == ReferenceType::Dependency) - .map(|(_, ref_stats)| ref_stats.count) - .sum::() - * 100 - / stats.count -} - -pub fn get_avg_children_count_times_100(stats: &ExportedTaskStats) -> usize { - stats - .references - .iter() - .filter(|((ty, _), _)| *ty == ReferenceType::Child) - .map(|(_, ref_stats)| ref_stats.count) - .sum::() - * 100 - / stats.count -} - -fn get_max_values_internal(depth: u32, node: &GroupTree) -> MaxValues { - let mut max_total_duration = None; - let mut max_total_current_duration = Duration::ZERO; - let mut max_total_update_duration = Duration::ZERO; - let mut max_avg_duration = None; - let mut max_max_duration = Duration::ZERO; - let mut max_count = 0; - let mut max_unloaded_count = 0; - let mut max_updates = None; - let mut max_dependencies = 0; - let mut max_children = 0; - let mut max_depth = 0; - for (_, ref s) in node.task_types.iter().chain(node.primary.iter()) { - if let Some(total_duration) = s.total_duration { - max_total_duration = max_total_duration - .map(|max_total_duration| max(max_total_duration, total_duration)) - .or(Some(total_duration)); - } - max_total_current_duration = max(max_total_current_duration, s.total_current_duration); - max_total_update_duration = max(max_total_update_duration, s.total_update_duration); - if let Some((total_duration, executions)) = s.total_duration.zip(s.executions) { - if executions > 0 { - let avg_duration = total_duration / executions; - max_avg_duration = max_avg_duration - .map(|max_avg_duration| max(max_avg_duration, avg_duration)) - .or(Some(avg_duration)); - } - } - max_max_duration = max(max_max_duration, s.max_duration); - max_count = max(max_count, s.count); - max_unloaded_count = max(max_unloaded_count, s.unloaded_count); - if let Some(executions) = s.executions { - let updates = (executions as usize).saturating_sub(s.count); - max_updates = max_updates - .map(|max_updates| max(max_updates, updates)) - .or(Some(updates)); - } - max_dependencies = max(max_dependencies, get_avg_dependencies_count_times_100(s)); - max_children = max(max_children, get_avg_children_count_times_100(s)); - } - max_depth = max( - max_depth, - if node.task_types.is_empty() { - depth - } else { - depth + 1 - }, - ); - for child in node.children.iter() { - let MaxValues { - total_duration, - total_current_duration, - total_update_duration, - avg_duration, - max_duration, - count, - unloaded_count, - updates, - dependencies, - children, - depth: inner_depth, - } = get_max_values_internal(depth + 1, child); - max_total_duration = max_total_duration - .zip(total_duration) - .map(|(a, b)| max(a, b)); - max_total_current_duration = max(max_total_current_duration, total_current_duration); - max_total_update_duration = max(max_total_update_duration, total_update_duration); - max_avg_duration = max_avg_duration.zip(avg_duration).map(|(a, b)| max(a, b)); - max_max_duration = max(max_max_duration, max_duration); - max_count = max(max_count, count); - max_unloaded_count = max(max_unloaded_count, unloaded_count); - max_updates = max_updates.zip(updates).map(|(a, b)| max(a, b)); - max_dependencies = max(max_dependencies, dependencies); - max_children = max(max_children, children); - max_depth = max(max_depth, inner_depth); - } - MaxValues { - total_duration: max_total_duration, - total_current_duration: max_total_current_duration, - total_update_duration: max_total_update_duration, - avg_duration: max_avg_duration, - max_duration: max_max_duration, - count: max_count, - unloaded_count: max_unloaded_count, - updates: max_updates, - dependencies: max_dependencies, - children: max_children, - depth: max_depth, - } -} - -fn compute_depths<'a>( - node: &'a GroupTree, - depth: usize, - output: &mut HashMap<&'a StatsTaskType, usize>, -) { - if let Some((ty, _)) = &node.primary { - output.insert(ty, depth); - } - for (ty, _) in node.task_types.iter() { - output.insert(ty, depth); - } - for child in node.children.iter() { - compute_depths(child, depth + 1, output); - } -} - -fn as_frac< - T: From + TryInto + Mul + Div + PartialEq + Ord + Copy, ->( - current: T, - total: T, -) -> u8 -where - >::Error: Debug, -{ - let min: T = u8::MIN.into(); - let max: T = u8::MAX.into(); - let result = if total == min { - min - } else { - max * current / total - }; - result.clamp(min, max).try_into().unwrap() -} - -fn as_color(n: u8) -> String { - // interpolate #fff -> #ff0 -> #f00 - if n >= 64 { - format!("#ff{:0>2x}00", u8::MAX - ((n as u32 - 64) * 4 / 3) as u8) - } else { - format!("#ffff{:0>2x}", u8::MAX - n * 4) - } -} - -fn as_hash_color(value: &String) -> String { - let hash = hash_xxh3_hash64(value.as_bytes()); - format!( - "#{:0>2x}{:0>2x}{:0>2x}", - (hash & 0x7f) + 0x80, - ((hash >> 7) & 0x7f) + 0x80, - ((hash >> 14) & 0x7f) + 0x80 - ) -} - -fn as_frac_color< - T: From + TryInto + Mul + Div + PartialEq + Ord + Copy, ->( - current: T, - total: T, -) -> String -where - >::Error: Debug, -{ - as_color(as_frac(current, total)) -} - -fn get_child_label(_ty: &ReferenceType, stats: &ReferenceStats, source_count: usize) -> String { - if stats.count == source_count { - "".to_string() - } else { - format!("{}", stats.count) - } -} diff --git a/crates/turbo-tasks-memory/src/viz/table.rs b/crates/turbo-tasks-memory/src/viz/table.rs deleted file mode 100644 index a8de44354471f..0000000000000 --- a/crates/turbo-tasks-memory/src/viz/table.rs +++ /dev/null @@ -1,239 +0,0 @@ -use turbo_tasks::{util::FormatDuration, StatsType}; - -use super::*; - -pub fn wrap_html(table_html: &str) -> String { - format!( - r#" - - - - turbo-tasks table - - - - {table_html} - - -"#, - script = r#"// https://github.com/tofsjonas/sortable -document.addEventListener("click",function(b){try{var p=function(a){return v&&a.getAttribute("data-sort-alt")||a.getAttribute("data-sort")||a.innerText},q=function(a,c){a.className=a.className.replace(w,"")+c},e=function(a,c){return a.nodeName===c?a:e(a.parentNode,c)},w=/ dir-(u|d) /,v=b.shiftKey||b.altKey,f=e(b.target,"TH"),r=e(f,"TR"),g=e(r,"TABLE");if(/\bsortable\b/.test(g.className)){var h,d=r.cells;for(b=0;b String { - let max_values = get_max_values(&root); - let mut out = String::new(); - if !stats_type.is_full() { - out += r#"

Full stats collection is disabled. Run with --full-stats to enable it.

"#; - } - out += r#""#; - out += r#""#; - out += r#""#; - out += r#""#; - out += r#""#; - out += r#""#; - out += r#""#; - out += r#""#; - out += r#""#; - out += r#""#; - out += r#""#; - out += r#""#; - out += r#""#; - out += r#""#; - out += r#""#; - out += r#""#; - let mut queue = Vec::new(); - queue.push((0, None, &root)); - fn add_task( - out: &mut String, - max_values: &MaxValues, - depth: u32, - parent: Option<&(StatsTaskType, ExportedTaskStats)>, - (ty, stats): &(StatsTaskType, ExportedTaskStats), - ) -> Result<(), std::fmt::Error> { - *out += r#""#; - let name = ty.to_string(); - // name - write!( - out, - "", - as_hash_color(&name), - escape_html(&name) - )?; - // count - write!( - out, - "", - as_frac_color(stats.count, max_values.count), - stats.count - )?; - // unloaded - write!( - out, - "", - as_frac_color(stats.unloaded_count, max_values.unloaded_count), - stats.unloaded_count - )?; - // reexecutions - let (executions_label, executions_color) = - if let Some((executions, max_updates)) = stats.executions.zip(max_values.updates) { - ( - executions.saturating_sub(stats.count as u32).to_string(), - as_frac_color( - executions.saturating_sub(stats.count as u32), - max_updates as u32, - ), - ) - } else { - ("N/A".to_string(), "white".to_string()) - }; - write!( - out, - "", - executions_color, executions_label - )?; - // total duration - let (total_duration_micros, total_duration_label, total_duration_color) = - if let Some((total_duration, max_total_duration)) = - stats.total_duration.zip(max_values.total_duration) - { - ( - format!("{}", total_duration.as_micros()), - FormatDuration(total_duration).to_string(), - as_frac_color(total_duration.as_millis(), max_total_duration.as_millis()), - ) - } else { - (String::new(), "N/A".to_string(), "white".to_string()) - }; - write!( - out, - "", - total_duration_color, total_duration_micros, total_duration_label - )?; - // total current duration - write!( - out, - "", - as_frac_color( - stats.total_current_duration.as_millis(), - max_values.total_current_duration.as_millis(), - ), - stats.total_current_duration.as_micros(), - FormatDuration(stats.total_current_duration) - )?; - // total update duration - write!( - out, - "", - as_frac_color( - stats.total_update_duration.as_millis(), - max_values.total_update_duration.as_millis(), - ), - stats.total_update_duration.as_micros(), - FormatDuration(stats.total_update_duration) - )?; - // avg duration - let (avg_duration_micros, avg_duration_label, avg_duration_color) = - if let Some(((total_duration, executions), max_avg_duration)) = stats - .total_duration - .zip(stats.executions) - .zip(max_values.avg_duration) - { - ( - format!("{}", (total_duration / executions).as_micros()), - FormatDuration(total_duration / executions).to_string(), - as_frac_color( - total_duration.as_micros() / (executions as u128), - max_avg_duration.as_micros(), - ), - ) - } else { - (String::new(), "N/A".to_string(), "white".to_string()) - }; - write!( - out, - "", - avg_duration_color, avg_duration_micros, avg_duration_label - )?; - // max duration - write!( - out, - "", - as_frac_color( - stats.max_duration.as_millis(), - max_values.max_duration.as_millis(), - ), - stats.max_duration.as_micros(), - FormatDuration(stats.max_duration) - )?; - // avg dependencies - let dependencies = get_avg_dependencies_count_times_100(stats); - write!( - out, - "", - as_frac_color(dependencies, max_values.dependencies), - (dependencies as f32) / 100.0 - )?; - // avg children - let children = get_avg_children_count_times_100(stats); - write!( - out, - "", - as_frac_color(children, max_values.children), - (children as f32) / 100.0 - )?; - // depth - write!( - out, - "", - as_frac_color(depth, max_values.depth), - depth - )?; - // common parent - if let Some((ty, _)) = parent { - let name = ty.to_string(); - write!( - out, - "", - as_hash_color(&name), - escape_html(&name) - )?; - } else { - write!(out, "",)?; - } - *out += r#""#; - Ok(()) - } - while let Some((depth, parent, node)) = queue.pop() { - let GroupTree { - primary, - children, - task_types, - } = node; - if let Some(primary) = primary { - add_task(&mut out, &max_values, depth, parent, primary).unwrap(); - } - for task in task_types.iter() { - add_task(&mut out, &max_values, depth + 1, primary.as_ref(), task).unwrap(); - } - for child in children.iter() { - queue.push((depth + 1, primary.as_ref(), child)); - } - } - out += r#""#; - out += r#"
functioncountunloadedreexecutionstotal durationtotal current durationtotal update durationavg durationmax durationavg dependenciesavg childrendepthcommon parent
{}{}{}{}{}{}{}{}{}{}{}{}{}
"#; - out -} diff --git a/crates/turbo-tasks/src/backend.rs b/crates/turbo-tasks/src/backend.rs index 90025f90feb01..cfd556905ba5c 100644 --- a/crates/turbo-tasks/src/backend.rs +++ b/crates/turbo-tasks/src/backend.rs @@ -7,7 +7,7 @@ use std::{ mem::take, pin::Pin, sync::Arc, - time::{Duration, Instant}, + time::Duration, }; use anyhow::{anyhow, bail, Result}; @@ -227,7 +227,6 @@ pub trait Backend: Sync + Send { &self, task: TaskId, duration: Duration, - instant: Instant, memory_usage: usize, stateful: bool, turbo_tasks: &dyn TurboTasksBackendApi, diff --git a/crates/turbo-tasks/src/completion.rs b/crates/turbo-tasks/src/completion.rs index 7949de3966dd2..a49687107594b 100644 --- a/crates/turbo-tasks/src/completion.rs +++ b/crates/turbo-tasks/src/completion.rs @@ -1,4 +1,4 @@ -use crate::{self as turbo_tasks, RawVc, Vc}; +use crate::{self as turbo_tasks, RawVc, TryJoinIterExt, Vc}; /// Just an empty type, but it's never equal to itself. /// [Vc] can be used as return value instead of `()` /// to have a concrete reference that can be awaited. @@ -55,9 +55,14 @@ impl Completions { /// Merges the list of completions into one. #[turbo_tasks::function] pub async fn completed(self: Vc) -> anyhow::Result> { - for c in self.await?.iter() { - c.await?; - } + self.await? + .iter() + .map(|&c| async move { + c.await?; + Ok(()) + }) + .try_join() + .await?; Ok(Completion::new()) } } diff --git a/crates/turbo-tasks/src/lib.rs b/crates/turbo-tasks/src/lib.rs index d6054f40d3039..93bda5364a01f 100644 --- a/crates/turbo-tasks/src/lib.rs +++ b/crates/turbo-tasks/src/lib.rs @@ -89,7 +89,7 @@ pub use keyed_cell::{global_keyed_cell, keyed_cell}; pub use manager::{ dynamic_call, emit, get_invalidator, mark_finished, mark_stateful, prevent_gc, run_once, run_once_with_reason, spawn_blocking, spawn_thread, trait_call, turbo_tasks, CurrentCellRef, - Invalidator, StatsType, TaskIdProvider, TurboTasks, TurboTasksApi, TurboTasksBackendApi, + Invalidator, TaskIdProvider, TurboTasks, TurboTasksApi, TurboTasksBackendApi, TurboTasksCallApi, Unused, UpdateInfo, }; pub use native_function::NativeFunction; diff --git a/crates/turbo-tasks/src/manager.rs b/crates/turbo-tasks/src/manager.rs index d648e89bbb77e..2dd0691dc99e1 100644 --- a/crates/turbo-tasks/src/manager.rs +++ b/crates/turbo-tasks/src/manager.rs @@ -129,18 +129,6 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send { ) -> Pin> + Send + 'static>>; } -/// The type of stats reporting. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum StatsType { - /// Only report stats essential to Turbo Tasks' operation. - Essential, - /// Full stats reporting. - /// - /// This is useful for debugging, but it has a slight memory and performance - /// impact. - Full, -} - pub trait TaskIdProvider { fn get_fresh_task_id(&self) -> Unused; fn reuse_task_id(&self, id: Unused); @@ -209,28 +197,12 @@ pub trait TurboTasksBackendApi: /// eventually call `invalidate_tasks()` on all tasks. fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet); - /// Returns the stats reporting type. - fn stats_type(&self) -> StatsType; - /// Sets the stats reporting type. - fn set_stats_type(&self, stats_type: StatsType); /// Returns the duration from the start of the program to the given instant. fn program_duration_until(&self, instant: Instant) -> Duration; /// Returns a reference to the backend. fn backend(&self) -> &B; } -impl StatsType { - /// Returns `true` if the stats type is `Essential`. - pub fn is_essential(self) -> bool { - matches!(self, Self::Essential) - } - - /// Returns `true` if the stats type is `Full`. - pub fn is_full(self) -> bool { - matches!(self, Self::Full) - } -} - impl TaskIdProvider for &dyn TurboTasksBackendApi { fn get_fresh_task_id(&self) -> Unused { (*self).get_fresh_task_id() @@ -275,9 +247,6 @@ pub struct TurboTasks { event_start: Event, event_foreground: Event, event_background: Event, - // NOTE(alexkirsz) We use an atomic bool instead of a lock around `StatsType` to avoid the - // locking overhead. - enable_full_stats: AtomicBool, program_start: Instant, } @@ -328,7 +297,6 @@ impl TurboTasks { event_start: Event::new(|| "TurboTasks::event_start".to_string()), event_foreground: Event::new(|| "TurboTasks::event_foreground".to_string()), event_background: Event::new(|| "TurboTasks::event_background".to_string()), - enable_full_stats: AtomicBool::new(false), program_start: Instant::now(), }); this.backend.startup(&*this); @@ -479,7 +447,7 @@ impl TurboTasks { }; async { - let (result, duration, instant, memory_usage) = + let (result, duration, _instant, memory_usage) = CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()) .await; @@ -495,7 +463,6 @@ impl TurboTasks { this.backend.task_execution_completed( task_id, duration, - instant, memory_usage, stateful, &*this, @@ -1132,20 +1099,6 @@ impl TurboTasksBackendApi for TurboTasks { self.schedule(task) } - fn stats_type(&self) -> StatsType { - match self.enable_full_stats.load(Ordering::Acquire) { - true => StatsType::Full, - false => StatsType::Essential, - } - } - - fn set_stats_type(&self, stats_type: StatsType) { - match stats_type { - StatsType::Full => self.enable_full_stats.store(true, Ordering::Release), - StatsType::Essential => self.enable_full_stats.store(false, Ordering::Release), - } - } - fn program_duration_until(&self, instant: Instant) -> Duration { instant - self.program_start } diff --git a/crates/turbopack-cli/src/dev/mod.rs b/crates/turbopack-cli/src/dev/mod.rs index afcfca7a216d2..17695ec11651f 100644 --- a/crates/turbopack-cli/src/dev/mod.rs +++ b/crates/turbopack-cli/src/dev/mod.rs @@ -13,7 +13,7 @@ use anyhow::{Context, Result}; use owo_colors::OwoColorize; use turbo_tasks::{ util::{FormatBytes, FormatDuration}, - StatsType, TransientInstance, TurboTasks, TurboTasksBackendApi, UpdateInfo, Value, Vc, + TransientInstance, TurboTasks, UpdateInfo, Value, Vc, }; use turbo_tasks_fs::FileSystem; use turbo_tasks_malloc::TurboMalloc; @@ -47,7 +47,6 @@ use crate::{ }, }; -pub(crate) mod turbo_tasks_viz; pub(crate) mod web_entry_source; pub struct TurbopackDevServerBuilder { @@ -217,7 +216,6 @@ impl TurbopackDevServerBuilder { project_dir.clone(), entry_requests.clone().into(), eager_compile, - turbo_tasks.clone().into(), browserslist_query.clone(), ) }; @@ -233,7 +231,6 @@ async fn source( project_dir: String, entry_requests: TransientInstance>, eager_compile: bool, - turbo_tasks: TransientInstance>, browserslist_query: String, ) -> Result>> { let project_relative = project_dir.strip_prefix(&root_dir).unwrap(); @@ -293,7 +290,6 @@ async fn source( NodeEnv::Development.cell(), browserslist_query, ); - let viz = Vc::upcast(turbo_tasks_viz::TurboTasksSource::new(turbo_tasks.into())); let static_source = Vc::upcast(StaticAssetsContentSource::new( String::new(), project_path.join("public".to_string()), @@ -308,10 +304,7 @@ async fn source( let main_source = Vc::upcast(main_source); let source = Vc::upcast(PrefixedRouterContentSource::new( Default::default(), - vec![ - ("__turbopack__".to_string(), introspect), - ("__turbo_tasks__".to_string(), viz), - ], + vec![("__turbopack__".to_string(), introspect)], main_source, )); @@ -342,12 +335,6 @@ pub async fn start_server(args: &DevArguments) -> Result<()> { .map_or(usize::MAX, |l| l * 1024 * 1024), )); - let stats_type = match args.common.full_stats { - true => StatsType::Full, - false => StatsType::Essential, - }; - tt.set_stats_type(stats_type); - let tt_clone = tt.clone(); let mut server = TurbopackDevServerBuilder::new(tt, project_dir, root_dir) diff --git a/crates/turbopack-cli/src/dev/turbo_tasks_viz.rs b/crates/turbopack-cli/src/dev/turbo_tasks_viz.rs deleted file mode 100644 index bbd80f70b313e..0000000000000 --- a/crates/turbopack-cli/src/dev/turbo_tasks_viz.rs +++ /dev/null @@ -1,142 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use anyhow::{bail, Result}; -use mime::TEXT_HTML_UTF_8; -use turbo_tasks::{get_invalidator, TurboTasks, TurboTasksBackendApi, Value, Vc}; -use turbo_tasks_fs::File; -use turbo_tasks_memory::{ - stats::{ReferenceType, Stats}, - viz, MemoryBackend, -}; -use turbopack_core::{asset::AssetContent, version::VersionedContentExt}; -use turbopack_dev_server::source::{ - route_tree::{BaseSegment, RouteTree, RouteTrees, RouteType}, - ContentSource, ContentSourceContent, ContentSourceData, ContentSourceDataFilter, - ContentSourceDataVary, GetContentSourceContent, -}; - -#[turbo_tasks::value(serialization = "none", eq = "manual", cell = "new", into = "new")] -pub struct TurboTasksSource { - #[turbo_tasks(debug_ignore, trace_ignore)] - turbo_tasks: Arc>, -} - -impl TurboTasksSource { - pub fn new(turbo_tasks: Arc>) -> Vc { - Self::cell(TurboTasksSource { turbo_tasks }) - } -} - -const INVALIDATION_INTERVAL: Duration = Duration::from_secs(3); - -#[turbo_tasks::value_impl] -impl ContentSource for TurboTasksSource { - #[turbo_tasks::function] - fn get_routes(self: Vc) -> Vc { - Vc::::cell(vec![ - RouteTree::new_route( - vec![BaseSegment::Static("graph".to_string())], - RouteType::Exact, - Vc::upcast(self), - ), - RouteTree::new_route( - vec![BaseSegment::Static("call-graph".to_string())], - RouteType::Exact, - Vc::upcast(self), - ), - RouteTree::new_route( - vec![BaseSegment::Static("table".to_string())], - RouteType::Exact, - Vc::upcast(self), - ), - RouteTree::new_route( - vec![BaseSegment::Static("reset".to_string())], - RouteType::Exact, - Vc::upcast(self), - ), - ]) - .merge() - } -} - -#[turbo_tasks::value_impl] -impl GetContentSourceContent for TurboTasksSource { - #[turbo_tasks::function] - fn vary(&self) -> Vc { - ContentSourceDataVary { - query: Some(ContentSourceDataFilter::All), - ..Default::default() - } - .cell() - } - - #[turbo_tasks::function] - async fn get( - self: Vc, - path: String, - data: Value, - ) -> Result> { - let this = self.await?; - let tt = &this.turbo_tasks; - let invalidator = get_invalidator(); - tokio::spawn({ - async move { - tokio::time::sleep(INVALIDATION_INTERVAL).await; - invalidator.invalidate(); - } - }); - let html = match path.as_str() { - "graph" => { - let mut stats = Stats::new(); - let b = tt.backend(); - b.with_all_cached_tasks(|task| { - stats.add_id(b, task); - }); - let tree = stats.treeify(ReferenceType::Dependency); - let graph = viz::graph::visualize_stats_tree( - tree, - ReferenceType::Dependency, - tt.stats_type(), - ); - viz::graph::wrap_html(&graph) - } - "call-graph" => { - let mut stats = Stats::new(); - let b = tt.backend(); - b.with_all_cached_tasks(|task| { - stats.add_id(b, task); - }); - let tree = stats.treeify(ReferenceType::Child); - let graph = - viz::graph::visualize_stats_tree(tree, ReferenceType::Child, tt.stats_type()); - viz::graph::wrap_html(&graph) - } - "table" => { - let Some(query) = &data.query else { - bail!("Missing query"); - }; - let mut stats = Stats::new(); - let b = tt.backend(); - let include_unloaded = query.contains_key("unloaded"); - b.with_all_cached_tasks(|task| { - stats.add_id_conditional(b, task, |_, info| include_unloaded || !info.unloaded); - }); - let tree = stats.treeify(ReferenceType::Dependency); - let table = viz::table::create_table(tree, tt.stats_type()); - viz::table::wrap_html(&table) - } - "reset" => { - let b = tt.backend(); - b.with_all_cached_tasks(|task| { - b.with_task(task, |task| task.reset_stats()); - }); - "Done".to_string() - } - _ => bail!("Unknown path: {}", path), - }; - Ok(ContentSourceContent::static_content( - AssetContent::file(File::from(html).with_content_type(TEXT_HTML_UTF_8).into()) - .versioned(), - )) - } -} diff --git a/crates/turbopack-trace-utils/src/raw_trace.rs b/crates/turbopack-trace-utils/src/raw_trace.rs index 4caf1188c5919..9e4bc5f55f8ab 100644 --- a/crates/turbopack-trace-utils/src/raw_trace.rs +++ b/crates/turbopack-trace-utils/src/raw_trace.rs @@ -5,6 +5,7 @@ use tracing::{ span, Subscriber, }; use tracing_subscriber::{registry::LookupSpan, Layer}; +use turbo_tasks_malloc::TurboMalloc; use crate::{ flavor::BufFlavor, @@ -30,10 +31,12 @@ impl LookupSpan<'a>> RawTraceLayer { } fn write(&self, data: TraceRow<'_>) { + let start = TurboMalloc::allocation_counters(); // Buffer is recycled let buf = self.trace_writer.try_get_buffer().unwrap_or_default(); let buf = postcard::serialize_with_flavor(&data, BufFlavor { buf }).unwrap(); self.trace_writer.write(buf); + TurboMalloc::reset_allocation_counters(start); } fn report_allocations(&self, ts: u64, thread_id: u64) { diff --git a/crates/turbopack/examples/turbopack.rs b/crates/turbopack/examples/turbopack.rs index afca14caef563..f73cb1cde4162 100644 --- a/crates/turbopack/examples/turbopack.rs +++ b/crates/turbopack/examples/turbopack.rs @@ -3,19 +3,14 @@ use std::{ collections::HashMap, env::current_dir, - fs, time::{Duration, Instant}, }; use anyhow::Result; use tokio::{spawn, time::sleep}; -use turbo_tasks::{util::FormatDuration, TurboTasks, TurboTasksBackendApi, UpdateInfo, Value, Vc}; +use turbo_tasks::{util::FormatDuration, TurboTasks, UpdateInfo, Value, Vc}; use turbo_tasks_fs::{DiskFileSystem, FileSystem}; -use turbo_tasks_memory::{ - stats::{ReferenceType, Stats}, - viz::graph::{visualize_stats_tree, wrap_html}, - MemoryBackend, -}; +use turbo_tasks_memory::MemoryBackend; use turbopack::{emit_with_completion, rebase::RebasedAsset, register}; use turbopack_core::{ compile_time_info::CompileTimeInfo, @@ -94,37 +89,6 @@ async fn main() -> Result<()> { .unwrap(); loop { - println!("writing graph.html..."); - // create a graph - let mut stats = Stats::new(); - - let b = tt.backend(); - - // graph root node - stats.add_id(b, task); - - // graph tasks in cache - b.with_all_cached_tasks(|task| { - stats.add_id(b, task); - }); - - // prettify graph - stats.merge_resolve(); - - let tree = stats.treeify(ReferenceType::Child); - - // write HTML - fs::write( - "graph.html", - wrap_html(&visualize_stats_tree( - tree, - ReferenceType::Child, - tt.stats_type(), - )), - ) - .unwrap(); - println!("graph.html written"); - sleep(Duration::from_secs(10)).await; } }