Skip to content

Commit

Permalink
Update change to include timestamp and details fields
Browse files Browse the repository at this point in the history
  • Loading branch information
chipsenkbeil committed Jun 6, 2023
1 parent 4eaae55 commit 72cc998
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 30 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
change notification (will aggregate and merge changes)
- `debounce_tick_rate = <secs>` to specify how long to wait between event
aggregation loops
- `distant-protocol` response for a change now supports these additional
fields:
- `timestamp` (serialized as `ts`) to communicate the seconds since unix
epoch when the event was received
- `details` containing `attributes` (clarify changes on attribute kind) and
`extra` (to convey arbitrary platform-specific extra information)

### Changed

Expand Down
46 changes: 39 additions & 7 deletions distant-core/src/client/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,16 @@ mod tests {
req.id,
vec![
protocol::Response::Changed(Change {
timestamp: 0,
kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
protocol::Response::Changed(Change {
timestamp: 1,
kind: ChangeKind::Modify,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
],
))
Expand All @@ -280,17 +284,21 @@ mod tests {
assert_eq!(
change,
Change {
timestamp: 0,
kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}
);

let change = watcher.next().await.expect("Watcher closed unexpectedly");
assert_eq!(
change,
Change {
timestamp: 1,
kind: ChangeKind::Modify,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}
);
}
Expand Down Expand Up @@ -330,8 +338,10 @@ mod tests {
.write_frame_for(&Response::new(
req.id.clone(),
protocol::Response::Changed(Change {
timestamp: 0,
kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
))
.await
Expand All @@ -342,8 +352,10 @@ mod tests {
.write_frame_for(&Response::new(
req.id.clone() + "1",
protocol::Response::Changed(Change {
timestamp: 1,
kind: ChangeKind::Modify,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
))
.await
Expand All @@ -354,8 +366,10 @@ mod tests {
.write_frame_for(&Response::new(
req.id,
protocol::Response::Changed(Change {
timestamp: 2,
kind: ChangeKind::Delete,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
))
.await
Expand All @@ -366,17 +380,21 @@ mod tests {
assert_eq!(
change,
Change {
timestamp: 0,
kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}
);

let change = watcher.next().await.expect("Watcher closed unexpectedly");
assert_eq!(
change,
Change {
timestamp: 2,
kind: ChangeKind::Delete,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}
);
}
Expand Down Expand Up @@ -414,16 +432,22 @@ mod tests {
req.id,
vec![
protocol::Response::Changed(Change {
timestamp: 0,
kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
protocol::Response::Changed(Change {
timestamp: 1,
kind: ChangeKind::Modify,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
protocol::Response::Changed(Change {
timestamp: 2,
kind: ChangeKind::Delete,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
],
))
Expand All @@ -447,8 +471,10 @@ mod tests {
assert_eq!(
change,
Change {
timestamp: 0,
kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}
);

Expand All @@ -470,8 +496,10 @@ mod tests {
.write_frame_for(&Response::new(
req.id,
protocol::Response::Changed(Change {
timestamp: 3,
kind: ChangeKind::Unknown,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
))
.await
Expand All @@ -482,15 +510,19 @@ mod tests {
assert_eq!(
watcher.lock().await.next().await,
Some(Change {
timestamp: 1,
kind: ChangeKind::Modify,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
})
);
assert_eq!(
watcher.lock().await.next().await,
Some(Change {
timestamp: 2,
kind: ChangeKind::Delete,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
})
);
assert_eq!(watcher.lock().await.next().await, None);
Expand Down
32 changes: 28 additions & 4 deletions distant-local/src/api/state/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use std::collections::HashMap;
use std::io;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use distant_core::net::common::ConnectionId;
use distant_core::protocol::ChangeKind;
use distant_core::protocol::{Change, ChangeDetails, ChangeDetailsAttributes, ChangeKind};
use log::*;
use notify::event::{AccessKind, AccessMode, ModifyKind};
use notify::event::{AccessKind, AccessMode, MetadataKind, ModifyKind};
use notify::{
Config as WatcherConfig, Error as WatcherError, ErrorKind as WatcherErrorKind,
Event as WatcherEvent, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
Expand Down Expand Up @@ -317,6 +317,11 @@ async fn watcher_task<W>(
}
}
InnerWatcherMsg::Event { ev } => {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("System time before unix epoch")
.as_secs();

let kind = match ev.kind {
EventKind::Access(AccessKind::Read) => ChangeKind::Access,
EventKind::Modify(ModifyKind::Metadata(_)) => ChangeKind::Attribute,
Expand All @@ -332,8 +337,27 @@ async fn watcher_task<W>(
_ => ChangeKind::Unknown,
};

let attributes = match ev.kind {
EventKind::Modify(ModifyKind::Metadata(MetadataKind::WriteTime)) => {
vec![ChangeDetailsAttributes::Timestamp]
}
EventKind::Modify(ModifyKind::Metadata(
MetadataKind::Ownership | MetadataKind::Permissions,
)) => vec![ChangeDetailsAttributes::Permissions],
_ => Vec::new(),
};

for registered_path in registered_paths.iter() {
match registered_path.filter_and_send(kind, &ev.paths).await {
let change = Change {
timestamp,
kind,
paths: ev.paths.clone(),
details: ChangeDetails {
attributes: attributes.clone(),
extra: ev.info().map(ToString::to_string),
},
};
match registered_path.filter_and_send(change).await {
Ok(_) => (),
Err(x) => error!(
"[Conn {}] Failed to forward changes to paths: {}",
Expand Down
21 changes: 7 additions & 14 deletions distant-local/src/api/state/watcher/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{fmt, io};

use distant_core::net::common::ConnectionId;
use distant_core::net::server::Reply;
use distant_core::protocol::{Change, ChangeKind, ChangeKindSet, Error, Response};
use distant_core::protocol::{Change, ChangeKindSet, Error, Response};

/// Represents a path registered with a watcher that includes relevant state including
/// the ability to reply with
Expand Down Expand Up @@ -122,24 +122,17 @@ impl RegisteredPath {
/// out any paths that are not applicable
///
/// Returns true if message was sent, and false if not
pub async fn filter_and_send<T>(&self, kind: ChangeKind, paths: T) -> io::Result<bool>
where
T: IntoIterator,
T::Item: AsRef<Path>,
{
if !self.allowed().contains(&kind) {
pub async fn filter_and_send(&self, mut change: Change) -> io::Result<bool> {
if !self.allowed().contains(&change.kind) {
return Ok(false);
}

let paths: Vec<PathBuf> = paths
.into_iter()
.filter(|p| self.applies_to_path(p.as_ref()))
.map(|p| p.as_ref().to_path_buf())
.collect();
// filter the paths that are not applicable
change.paths.retain(|p| self.applies_to_path(p.as_path()));

if !paths.is_empty() {
if !change.paths.is_empty() {
self.reply
.send(Response::Changed(Change { kind, paths }))
.send(Response::Changed(change))
.await
.map(|_| true)
} else {
Expand Down
37 changes: 37 additions & 0 deletions distant-protocol/src/common/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,48 @@ use strum::{EnumString, EnumVariantNames, VariantNames};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct Change {
/// Unix timestamp (in seconds) when the server was notified of this change (not when the
/// change occurred)
#[serde(rename = "ts")]
pub timestamp: u64,

/// Label describing the kind of change
pub kind: ChangeKind,

/// Paths that were changed
pub paths: Vec<PathBuf>,

/// Additional details associated with the change
#[serde(default, skip_serializing_if = "ChangeDetails::is_empty")]
pub details: ChangeDetails,
}

/// Details about a change
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default, rename_all = "snake_case", deny_unknown_fields)]
pub struct ChangeDetails {
/// Clarity on type of attribute changes that have occurred (for kind == attribute)
#[serde(skip_serializing_if = "Vec::is_empty")]
pub attributes: Vec<ChangeDetailsAttributes>,

/// Optional information about the change that is typically platform-specific
#[serde(skip_serializing_if = "Option::is_none")]
pub extra: Option<String>,
}

impl ChangeDetails {
/// Returns true if no details are contained within.
pub fn is_empty(&self) -> bool {
self.attributes.is_empty() && self.extra.is_none()
}
}

/// Specific details about modification
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub enum ChangeDetailsAttributes {
Permissions,
Timestamp,
}

/// Represents a label attached to a [`Change`] that describes the kind of change.
Expand Down
Loading

0 comments on commit 72cc998

Please sign in to comment.