Skip to content

Commit

Permalink
Fix code talking about "views" instead of subscribers (#4591)
Browse files Browse the repository at this point in the history
_Starting to cherry-pick some of the stuff buried in my caching branch_

Trivial.
  • Loading branch information
teh-cmc authored Dec 19, 2023
1 parent 832da05 commit d7b5867
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 29 deletions.
4 changes: 2 additions & 2 deletions crates/re_arrow_store/src/store_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ mod tests {
assert!(event.cells.contains_key(&store.cluster_key()));

let (events, _) = store.gc(&GarbageCollectionOptions::gc_everything());
assert!(events.len() == 1);
assert_eq!(1, events.len());
assert!(events[0].cells.contains_key(&store.cluster_key()));
}

Expand All @@ -515,7 +515,7 @@ mod tests {
assert!(!event.cells.contains_key(&store.cluster_key()));

let (events, _) = store.gc(&GarbageCollectionOptions::gc_everything());
assert!(events.len() == 1);
assert_eq!(1, events.len());
assert!(!events[0].cells.contains_key(&store.cluster_key()));
}

Expand Down
56 changes: 29 additions & 27 deletions crates/re_arrow_store/src/store_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type SharedStoreSubscriber = RwLock<Box<dyn StoreSubscriber>>;
//
// TODO(#4204): StoreSubscriber should require SizeBytes so they can be part of memstats.
pub trait StoreSubscriber: std::any::Any + Send + Sync {
/// Arbitrary name for the view.
/// Arbitrary name for the subscriber.
///
/// Does not need to be unique.
fn name(&self) -> String;
Expand All @@ -36,7 +36,7 @@ pub trait StoreSubscriber: std::any::Any + Send + Sync {

/// The core of this trait: get notified of changes happening in all [`DataStore`]s.
///
/// This will be called automatically by the [`DataStore`] itself if the view has been
/// This will be called automatically by the [`DataStore`] itself if the subscriber has been
/// registered: [`DataStore::register_subscriber`].
/// Or you might want to feed it [`StoreEvent`]s manually, depending on your use case.
///
Expand Down Expand Up @@ -72,65 +72,67 @@ impl DataStore {
/// ## Scope
///
/// Registered [`StoreSubscriber`]s are global scope: they get notified of all events from all
/// existing [`DataStore`]s, including [`DataStore`]s created after the view was registered.
/// existing [`DataStore`]s, including [`DataStore`]s created after the subscriber was registered.
///
/// Use [`StoreEvent::store_id`] to identify the source of an event.
///
/// ## Late registration
///
/// Views must be registered before a store gets created to guarantee that no events were
/// missed.
/// Subscribers must be registered before a store gets created to guarantee that no events
/// were missed.
///
/// [`StoreEvent::event_id`] can be used to identify missing events.
///
/// ## Ordering
///
/// The order in which registered views are notified is undefined and will likely become
/// The order in which registered subscribers are notified is undefined and will likely become
/// concurrent in the future.
///
/// If you need a specific order across multiple views, embed them into an orchestrating view.
/// If you need a specific order across multiple subscribers, embed them into an orchestrating
/// subscriber.
//
// TODO(cmc): send a compacted snapshot to late registerers for bootstrapping
pub fn register_subscriber(view: Box<dyn StoreSubscriber>) -> StoreSubscriberHandle {
let mut views = SUBSCRIBERS.write();
views.push(RwLock::new(view));
StoreSubscriberHandle(views.len() as u32 - 1)
pub fn register_subscriber(subscriber: Box<dyn StoreSubscriber>) -> StoreSubscriberHandle {
let mut subscribers = SUBSCRIBERS.write();
subscribers.push(RwLock::new(subscriber));
StoreSubscriberHandle(subscribers.len() as u32 - 1)
}

/// Passes a reference to the downcasted view to the given callback.
/// Passes a reference to the downcasted subscriber to the given callback.
///
/// Returns `None` if the view doesn't exist or downcasting failed.
/// Returns `None` if the subscriber doesn't exist or downcasting failed.
pub fn with_subscriber<V: StoreSubscriber, T, F: FnMut(&V) -> T>(
StoreSubscriberHandle(handle): StoreSubscriberHandle,
mut f: F,
) -> Option<T> {
let views = SUBSCRIBERS.read();
views.get(handle as usize).and_then(|view| {
let view = view.read();
view.as_any().downcast_ref::<V>().map(&mut f)
let subscribers = SUBSCRIBERS.read();
subscribers.get(handle as usize).and_then(|subscriber| {
let subscriber = subscriber.read();
subscriber.as_any().downcast_ref::<V>().map(&mut f)
})
}

/// Passes a mutable reference to the downcasted view to the given callback.
/// Passes a mutable reference to the downcasted subscriber to the given callback.
///
/// Returns `None` if the view doesn't exist or downcasting failed.
/// Returns `None` if the subscriber doesn't exist or downcasting failed.
pub fn with_subscriber_mut<V: StoreSubscriber, T, F: FnMut(&mut V) -> T>(
StoreSubscriberHandle(handle): StoreSubscriberHandle,
mut f: F,
) -> Option<T> {
let views = SUBSCRIBERS.read();
views.get(handle as usize).and_then(|view| {
let mut view = view.write();
view.as_any_mut().downcast_mut::<V>().map(&mut f)
let subscribers = SUBSCRIBERS.read();
subscribers.get(handle as usize).and_then(|subscriber| {
let mut subscriber = subscriber.write();
subscriber.as_any_mut().downcast_mut::<V>().map(&mut f)
})
}

/// Called by [`DataStore`]'s mutating methods to notify view subscribers of upcoming events.
/// Called by [`DataStore`]'s mutating methods to notify subscriber subscribers of upcoming events.
pub(crate) fn on_events(events: &[StoreEvent]) {
let views = SUBSCRIBERS.read();
re_tracing::profile_function!();
let subscribers = SUBSCRIBERS.read();
// TODO(cmc): might want to parallelize at some point.
for view in views.iter() {
view.write().on_events(events);
for subscriber in subscribers.iter() {
subscriber.write().on_events(events);
}
}
}
Expand Down

0 comments on commit d7b5867

Please sign in to comment.