Skip to content

Commit

Permalink
feat(tasks-fs): bump up notify, fix empty file watch (vercel/turborep…
Browse files Browse the repository at this point in the history
…o#7340)

### Description

This PR upgrades `notify` pkg to create a filesystem watcher. 

It is due to recent issue we found in PACK-2437, the edge cases like

- start a watcher with file have any contents
- open editor, clear all (make file empty), save

doesn't create any file watcher event. This makes some cases turbopack
does not trigger hmr even if it's desired.

One thing to note is even with upgrade its event kind is somewhat
unexpected; it's not `Modify` event but `Metadata` event. I can't say
why it emits in that way, but add it as workaround for now.


We had experiences of trying to upgrade & revert this pkg before, so the
upgrade plan need some caution.

Plan is

- Once PR is approved, cut turbopack _before_ PR and bump next.js first
- Land PR, create new turbopack release so release contains only 1
changes to notify
- Ensure new turbopack update in next.js won't break (like upgrading
front, etcs) and make it easy to revert


Closes PACK-2437.
  • Loading branch information
kwonoj authored Feb 12, 2024
1 parent 0af3c7d commit 44a2fa8
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 64 deletions.
2 changes: 1 addition & 1 deletion crates/turbo-tasks-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ include_dir = { version = "0.7.2", features = ["nightly"] }
indexmap = { workspace = true }
jsonc-parser = { version = "0.21.0", features = ["serde"] }
mime = { workspace = true }
notify = "4.0.17"
notify-debouncer-full = { workspace = true }
parking_lot = { workspace = true }
serde = { workspace = true, features = ["rc"] }
serde_json = { workspace = true }
Expand Down
12 changes: 9 additions & 3 deletions crates/turbo-tasks-fs/benches/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use criterion::{
measurement::{Measurement, WallTime},
BenchmarkId, Criterion,
};
use notify::{watcher, RecursiveMode, Watcher};
use notify_debouncer_full::{
new_debouncer,
notify::{RecursiveMode, Watcher},
};
use tokio::runtime::Runtime;
use turbo_tasks::event::Event;
use turbo_tasks_fs::rope::{Rope, RopeBuilder};
Expand All @@ -35,8 +38,11 @@ fn bench_file_watching(c: &mut Criterion) {
let (tx, rx) = channel();
let event = Arc::new(Event::new(|| "test event".to_string()));

let mut watcher = watcher(tx, Duration::from_micros(1)).unwrap();
watcher.watch(temp_path, RecursiveMode::Recursive).unwrap();
let mut watcher = new_debouncer(Duration::from_micros(1), None, tx).unwrap();
watcher
.watcher()
.watch(temp_path, RecursiveMode::Recursive)
.unwrap();

let t = thread::spawn({
let event = event.clone();
Expand Down
191 changes: 131 additions & 60 deletions crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ use glob::Glob;
use invalidator_map::InvalidatorMap;
use jsonc_parser::{parse_to_serde_value, ParseOptions};
use mime::Mime;
use notify::{watcher, DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher};
use notify_debouncer_full::{
notify::{
event::{MetadataKind, ModifyKind, RenameMode},
EventKind, RecommendedWatcher, RecursiveMode, Watcher,
},
DebouncedEvent, Debouncer, FileIdMap,
};
use read_glob::read_glob;
pub use read_glob::ReadGlobResult;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -104,7 +110,7 @@ pub trait FileSystem: ValueToString {

#[derive(Default)]
struct DiskWatcher {
watcher: Mutex<Option<RecommendedWatcher>>,
watcher: Mutex<Option<Debouncer<RecommendedWatcher, FileIdMap>>>,
/// Keeps track of which directories are currently watched. This is only
/// used on a OS that doesn't support recursive watching.
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
Expand Down Expand Up @@ -136,13 +142,13 @@ impl DiskWatcher {
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
fn start_watching(
&self,
watcher: &mut std::sync::MutexGuard<Option<RecommendedWatcher>>,
watcher: &mut std::sync::MutexGuard<Option<Debouncer<RecommendedWatcher, FileIdMap>>>,
dir_path: &Path,
root_path: &Path,
) -> Result<()> {
if let Some(watcher) = watcher.as_mut() {
let mut path = dir_path;
while let Err(err) = watcher.watch(path, RecursiveMode::NonRecursive) {
while let Err(err) = watcher.watcher().watch(path, RecursiveMode::NonRecursive) {
if path == root_path {
return Err(err).context(format!(
"Unable to watch {} (tried up to {})",
Expand Down Expand Up @@ -264,6 +270,23 @@ impl DiskFileSystem {
self.start_watching_internal(true)
}

///
/// Create a watcher and start watching by creating `debounced` watcher
/// via `full debouncer`
///
/// `notify` provides 2 different debouncer implementation, `-full`
/// provides below differences for the easy of use:
///
/// - Only emits a single Rename event if the rename From and To events can
/// be matched
/// - Merges multiple Rename events
/// - Takes Rename events into account and updates paths for events that
/// occurred before the rename event, but which haven't been emitted, yet
/// - Optionally keeps track of the file system IDs all files and stiches
/// rename events together (FSevents, Windows)
/// - Emits only one Remove event when deleting a directory (inotify)
/// - Doesn't emit duplicate create events
/// - Doesn't emit Modify events after a Create event
fn start_watching_internal(&self, report_invalidation_reason: bool) -> Result<()> {
let mut watcher_guard = self.watcher.watcher.lock().unwrap();
if watcher_guard.is_some() {
Expand All @@ -288,14 +311,19 @@ impl DiskFileSystem {
let delay = Duration::from_millis(1);
// Create a watcher object, delivering debounced events.
// The notification back-end is selected based on the platform.
let mut watcher = watcher(tx, delay)?;
let mut debounced_watcher = notify_debouncer_full::new_debouncer(delay, None, tx)?;
// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
#[cfg(any(target_os = "macos", target_os = "windows"))]
watcher.watch(&root_path, RecursiveMode::Recursive)?;
debounced_watcher
.watcher()
.watch(&root_path, RecursiveMode::Recursive)?;

#[cfg(not(any(target_os = "macos", target_os = "windows")))]
for dir_path in self.watcher.watching.iter() {
watcher.watch(&*dir_path, RecursiveMode::NonRecursive)?;
debounced_watcher
.watcher()
.watch(&dir_path, RecursiveMode::NonRecursive)?;
}

// We need to invalidate all reads that happened before watching
Expand Down Expand Up @@ -323,7 +351,7 @@ impl DiskFileSystem {
});
}

watcher_guard.replace(watcher);
watcher_guard.replace(debounced_watcher);
drop(watcher_guard);

#[cfg(not(any(target_os = "macos", target_os = "windows")))]
Expand All @@ -343,60 +371,103 @@ impl DiskFileSystem {
});
loop {
match event {
Ok(DebouncedEvent::Write(path)) => {
batched_invalidate_path.insert(path);
}
Ok(DebouncedEvent::Create(path)) => {
batched_invalidate_path_and_children.insert(path.clone());
batched_invalidate_path_and_children_dir.insert(path.clone());
if let Some(parent) = path.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
batched_new_paths.insert(path.clone());
}
Ok(DebouncedEvent::Remove(path)) => {
batched_invalidate_path_and_children.insert(path.clone());
batched_invalidate_path_and_children_dir.insert(path.clone());
if let Some(parent) = path.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
}
Ok(DebouncedEvent::Rename(source, destination)) => {
batched_invalidate_path_and_children.insert(source.clone());
if let Some(parent) = source.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
batched_invalidate_path_and_children.insert(destination.clone());
if let Some(parent) = destination.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
batched_new_paths.insert(destination.clone());
}
Ok(DebouncedEvent::Rescan) => {
batched_invalidate_path_and_children.insert(PathBuf::from(&root));
batched_invalidate_path_and_children_dir.insert(PathBuf::from(&root));
}
Ok(DebouncedEvent::Error(err, path)) => {
println!("watch error ({:?}): {:?} ", path, err);
match path {
Some(path) => {
batched_invalidate_path_and_children.insert(path.clone());
batched_invalidate_path_and_children_dir.insert(path);
Ok(Ok(events)) => {
events.iter().for_each(|DebouncedEvent { event: notify_debouncer_full::notify::Event {kind, paths, ..}, .. }| {
// [NOTE] there is attrs in the `Event` struct, which contains few more metadata like process_id who triggered the event,
// or the source we may able to utilize later.
match kind {
// [NOTE] Observing `ModifyKind::Metadata(MetadataKind::Any)` is not a mistake, fix for PACK-2437.
// In here explicitly subscribes to the `ModifyKind::Data` which
// indicates file content changes - in case of fsevents backend, this is `kFSEventStreamEventFlagItemModified`.
// Also meanwhile we subscribe to ModifyKind::Metadata as well.
// This is due to in some cases fsevents does not emit explicit kFSEventStreamEventFlagItemModified kernel events,
// but only emits kFSEventStreamEventFlagItemInodeMetaMod. While this could cause redundant invalidation,
// it's the way to reliably detect file content changes.
// ref other implementation, i.e libuv does same thing to trigger UV_CHANEGS
// https://github.com/libuv/libuv/commit/73cf3600d75a5884b890a1a94048b8f3f9c66876#diff-e12fdb1f404f1c97bbdcc0956ac90d7db0d811d9fa9ca83a3deef90c937a486cR95-R99
EventKind::Modify(ModifyKind::Data(_) | ModifyKind::Metadata(MetadataKind::Any)) => {
batched_invalidate_path.extend(paths.clone());
}
EventKind::Create(_) => {
batched_invalidate_path_and_children.extend(paths.clone());
batched_invalidate_path_and_children_dir.extend(paths.clone());
paths.iter().for_each(|path| {
if let Some(parent) = path.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
});

#[cfg(not(any(target_os = "macos", target_os = "windows")))]
batched_new_paths.extend(paths.clone());
}
EventKind::Remove(_) => {
batched_invalidate_path_and_children.extend(paths.clone());
batched_invalidate_path_and_children_dir.extend(paths.clone());
paths.iter().for_each(|path| {
if let Some(parent) = path.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
});
}
// A single event emitted with both the `From` and `To` paths.
EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => {
// For the rename::both, notify provides an array of paths in given order
if let [source, destination, ..] = &paths[..] {
batched_invalidate_path_and_children.insert(source.clone());
if let Some(parent) = source.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
batched_invalidate_path_and_children.insert(destination.clone());
if let Some(parent) = destination.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
batched_new_paths.insert(destination.clone());
} else {
// If we hit here, we expect this as a bug either in notify or system weirdness.
panic!("Rename event does not contain source and destination paths {:#?}", paths);
}
}
// We expect RenameMode::Both covers most of the case we need to invalidate,
// but also checks RenameMode::To just in case to avoid edge cases.
EventKind::Any | EventKind::Modify(ModifyKind::Any | ModifyKind::Name(RenameMode::To)) => {
batched_invalidate_path.extend(paths.clone());
batched_invalidate_path_and_children.extend(paths.clone());
batched_invalidate_path_and_children_dir.extend(paths.clone());
for parent in paths.iter().filter_map(|path| path.parent()) {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
}
EventKind::Modify(
ModifyKind::Metadata(..)
| ModifyKind::Other
| ModifyKind::Name(..)
)
| EventKind::Access(_)
| EventKind::Other => {
// ignored
}
}
None => {
batched_invalidate_path_and_children
.insert(PathBuf::from(&root));
batched_invalidate_path_and_children_dir
.insert(PathBuf::from(&root));
}
}
});
}
Ok(DebouncedEvent::Chmod(_))
| Ok(DebouncedEvent::NoticeRemove(_))
| Ok(DebouncedEvent::NoticeWrite(_)) => {
// ignored
// Error raised by notify watcher itself
Ok(Err(errors)) => {
errors.iter().for_each(
|notify_debouncer_full::notify::Error { kind, paths }| {
println!("watch error ({:?}): {:?} ", paths, kind);

if paths.is_empty() {
batched_invalidate_path_and_children
.insert(PathBuf::from(&root));
batched_invalidate_path_and_children_dir
.insert(PathBuf::from(&root));
} else {
batched_invalidate_path_and_children.extend(paths.clone());
batched_invalidate_path_and_children_dir
.extend(paths.clone());
}
},
);
}
Err(TryRecvError::Disconnected) => {
// Sender has been disconnected
Expand Down

0 comments on commit 44a2fa8

Please sign in to comment.