Skip to content

Commit

Permalink
feat(tasks-fs): bump up notify, fix empty file watch
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj committed Feb 10, 2024
1 parent 7d674e3 commit 34092a4
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 63 deletions.
12 changes: 9 additions & 3 deletions crates/turbo-tasks-fs/benches/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use criterion::{
measurement::{Measurement, WallTime},
BenchmarkId, Criterion,
};
use notify::{watcher, RecursiveMode, Watcher};
use notify_debouncer_full::{
new_debouncer,
notify::{RecursiveMode, Watcher},
};
use tokio::runtime::Runtime;
use turbo_tasks::event::Event;
use turbo_tasks_fs::rope::{Rope, RopeBuilder};
Expand All @@ -35,8 +38,11 @@ fn bench_file_watching(c: &mut Criterion) {
let (tx, rx) = channel();
let event = Arc::new(Event::new(|| "test event".to_string()));

let mut watcher = watcher(tx, Duration::from_micros(1)).unwrap();
watcher.watch(temp_path, RecursiveMode::Recursive).unwrap();
let mut watcher = new_debouncer(Duration::from_micros(1), None, tx).unwrap();
watcher
.watcher()
.watch(temp_path, RecursiveMode::Recursive)
.unwrap();

let t = thread::spawn({
let event = event.clone();
Expand Down
190 changes: 130 additions & 60 deletions crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ use glob::Glob;
use invalidator_map::InvalidatorMap;
use jsonc_parser::{parse_to_serde_value, ParseOptions};
use mime::Mime;
use notify::{watcher, DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher};
use notify_debouncer_full::{
notify::{
event::{MetadataKind, ModifyKind, RenameMode},
EventKind, RecommendedWatcher, RecursiveMode, Watcher,
},
DebouncedEvent, Debouncer, FileIdMap,
};
use read_glob::read_glob;
pub use read_glob::ReadGlobResult;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -104,7 +110,7 @@ pub trait FileSystem: ValueToString {

#[derive(Default)]
struct DiskWatcher {
watcher: Mutex<Option<RecommendedWatcher>>,
watcher: Mutex<Option<Debouncer<RecommendedWatcher, FileIdMap>>>,
/// Keeps track of which directories are currently watched. This is only
/// used on a OS that doesn't support recursive watching.
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
Expand Down Expand Up @@ -136,13 +142,13 @@ impl DiskWatcher {
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
fn start_watching(
&self,
watcher: &mut std::sync::MutexGuard<Option<RecommendedWatcher>>,
watcher: &mut std::sync::MutexGuard<Option<Debouncer<RecommendedWatcher, FileIdMap>>>,
dir_path: &Path,
root_path: &Path,
) -> Result<()> {
if let Some(watcher) = watcher.as_mut() {
let mut path = dir_path;
while let Err(err) = watcher.watch(path, RecursiveMode::NonRecursive) {
while let Err(err) = watcher.watcher().watch(path, RecursiveMode::NonRecursive) {
if path == root_path {
return Err(err).context(format!(
"Unable to watch {} (tried up to {})",
Expand Down Expand Up @@ -264,6 +270,26 @@ impl DiskFileSystem {
self.start_watching_internal(true)
}

/**
* Create a watcher and start watching by creating `debounced` watcher
* via `full debouncer`
*
* `notify` provides 2 different debouncer implementation, `-full`
* provides below differences for the easy of use:
*
* - Only emits a single Rename event if the rename From and To events
* can be matched
* - Merges multiple Rename events
* - Takes Rename events into account and updates paths for events that
* occurred before the rename event, but which haven't been emitted,
* yet
* - Optionally keeps track of the file system IDs all files and stiches
* rename events together (FSevents, Windows)
* - Emits only one Remove event when deleting a directory (inotify)
* - Doesn't emit duplicate create events
* - Doesn't emit Modify events after a Create event
*
*/
fn start_watching_internal(&self, report_invalidation_reason: bool) -> Result<()> {
let mut watcher_guard = self.watcher.watcher.lock().unwrap();
if watcher_guard.is_some() {
Expand All @@ -288,14 +314,19 @@ impl DiskFileSystem {
let delay = Duration::from_millis(1);
// Create a watcher object, delivering debounced events.
// The notification back-end is selected based on the platform.
let mut watcher = watcher(tx, delay)?;
let mut debounced_watcher = notify_debouncer_full::new_debouncer(delay, None, tx)?;
// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
#[cfg(any(target_os = "macos", target_os = "windows"))]
watcher.watch(&root_path, RecursiveMode::Recursive)?;
debounced_watcher
.watcher()
.watch(&root_path, RecursiveMode::Recursive)?;

#[cfg(not(any(target_os = "macos", target_os = "windows")))]
for dir_path in self.watcher.watching.iter() {
watcher.watch(&*dir_path, RecursiveMode::NonRecursive)?;
debounced_watcher
.watcher()
.watch(&dir_path, RecursiveMode::NonRecursive)?;
}

// We need to invalidate all reads that happened before watching
Expand Down Expand Up @@ -323,7 +354,7 @@ impl DiskFileSystem {
});
}

watcher_guard.replace(watcher);
watcher_guard.replace(debounced_watcher);
drop(watcher_guard);

#[cfg(not(any(target_os = "macos", target_os = "windows")))]
Expand All @@ -343,60 +374,99 @@ impl DiskFileSystem {
});
loop {
match event {
Ok(DebouncedEvent::Write(path)) => {
batched_invalidate_path.insert(path);
}
Ok(DebouncedEvent::Create(path)) => {
batched_invalidate_path_and_children.insert(path.clone());
batched_invalidate_path_and_children_dir.insert(path.clone());
if let Some(parent) = path.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
batched_new_paths.insert(path.clone());
}
Ok(DebouncedEvent::Remove(path)) => {
batched_invalidate_path_and_children.insert(path.clone());
batched_invalidate_path_and_children_dir.insert(path.clone());
if let Some(parent) = path.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
}
Ok(DebouncedEvent::Rename(source, destination)) => {
batched_invalidate_path_and_children.insert(source.clone());
if let Some(parent) = source.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
batched_invalidate_path_and_children.insert(destination.clone());
if let Some(parent) = destination.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
batched_new_paths.insert(destination.clone());
}
Ok(DebouncedEvent::Rescan) => {
batched_invalidate_path_and_children.insert(PathBuf::from(&root));
batched_invalidate_path_and_children_dir.insert(PathBuf::from(&root));
}
Ok(DebouncedEvent::Error(err, path)) => {
println!("watch error ({:?}): {:?} ", path, err);
match path {
Some(path) => {
batched_invalidate_path_and_children.insert(path.clone());
batched_invalidate_path_and_children_dir.insert(path);
}
None => {
batched_invalidate_path_and_children
.insert(PathBuf::from(&root));
batched_invalidate_path_and_children_dir
.insert(PathBuf::from(&root));
Ok(Ok(events)) => {
events.iter().for_each(|DebouncedEvent { event: notify_debouncer_full::notify::Event {kind, paths, ..}, .. }| {
// [NOTE] there is attrs in the `Event` struct, which contains few more metadata like process_id who triggered the event,
// or the source we may able to utilize later.
match kind {
EventKind::Modify(ModifyKind::Data(_)) => {
batched_invalidate_path.extend(paths.clone());
}
EventKind::Create(_) => {
batched_invalidate_path_and_children.extend(paths.clone());
batched_invalidate_path_and_children_dir.extend(paths.clone());
paths.iter().for_each(|path| {
if let Some(parent) = path.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
});

#[cfg(not(any(target_os = "macos", target_os = "windows")))]
batched_new_paths.extend(paths.clone());
}
EventKind::Remove(_) => {
batched_invalidate_path_and_children.extend(paths.clone());
batched_invalidate_path_and_children_dir.extend(paths.clone());
paths.iter().for_each(|path| {
if let Some(parent) = path.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
});
}
// A single event emitted with both the `From` and `To` paths.
EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => {
// For the rename::both, notify provides an array of paths in given order
if let [source, destination, ..] = &paths[..] {
batched_invalidate_path_and_children.insert(source.clone());
if let Some(parent) = source.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
batched_invalidate_path_and_children.insert(destination.clone());
if let Some(parent) = destination.parent() {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
#[cfg(not(any(target_os = "macos", target_os = "windows")))]
batched_new_paths.insert(destination.clone());
} else {
println!("Rename event does not contain source and destination paths {:#?}", paths);
}
}
// [NOTE] Observing `ModifyKind::Metadata(MetadataKind::Any)` is not a mistake.
// PACK-2437 addresses some cases notify doesn't emit event to trigger hmr,
// cases like starting from a file have contents -> clear & save.
// v4 doesn't emit any debounced event for those,
// and v6 emits an event with `ModifyKind::Metadata(MetadataKind::Any)`
// it is unclear why these case are not considered as normal modify, anyway we use certain
// event to workaround.
EventKind::Any | EventKind::Modify(ModifyKind::Any | ModifyKind::Metadata(MetadataKind::Any)) => {
batched_invalidate_path.extend(paths.clone());
batched_invalidate_path_and_children.extend(paths.clone());
batched_invalidate_path_and_children_dir.extend(paths.clone());
for parent in paths.iter().filter_map(|path| path.parent()) {
batched_invalidate_path_dir.insert(PathBuf::from(parent));
}
}
EventKind::Modify(
ModifyKind::Metadata(..)
| ModifyKind::Other
// hope RenameMode::both are suffecient for the rename event, ignore others
| ModifyKind::Name(..)
)
| EventKind::Access(_)
| EventKind::Other => {
// ignored
}
}
}
});
}
Ok(DebouncedEvent::Chmod(_))
| Ok(DebouncedEvent::NoticeRemove(_))
| Ok(DebouncedEvent::NoticeWrite(_)) => {
// ignored
// Error raised by notify watcher itself
Ok(Err(errors)) => {
errors.iter().for_each(
|notify_debouncer_full::notify::Error { kind, paths }| {
println!("watch error ({:?}): {:?} ", paths, kind);

if paths.is_empty() {
batched_invalidate_path_and_children
.insert(PathBuf::from(&root));
batched_invalidate_path_and_children_dir
.insert(PathBuf::from(&root));
} else {
batched_invalidate_path_and_children.extend(paths.clone());
batched_invalidate_path_and_children_dir
.extend(paths.clone());
}
},
);
}
Err(TryRecvError::Disconnected) => {
// Sender has been disconnected
Expand Down

0 comments on commit 34092a4

Please sign in to comment.