Skip to content

Commit

Permalink
Make StoreDb::entity_db private (#3832)
Browse files Browse the repository at this point in the history
### What
This was a footgun that should not have been there.

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested [demo.rerun.io](https://demo.rerun.io/pr/3832) (if
applicable)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG

- [PR Build Summary](https://build.rerun.io/pr/3832)
- [Docs
preview](https://rerun.io/preview/244b0e9a041c2b9c56e335c6fc98cd220cc022d7/docs)
<!--DOCS-PREVIEW-->
- [Examples
preview](https://rerun.io/preview/244b0e9a041c2b9c56e335c6fc98cd220cc022d7/examples)
<!--EXAMPLES-PREVIEW-->
- [Recent benchmark results](https://ref.rerun.io/dev/bench/)
- [Wasm size tracking](https://ref.rerun.io/dev/sizes/)

---------

Co-authored-by: Antoine Beyeler <[email protected]>
Co-authored-by: Antoine Beyeler <[email protected]>
  • Loading branch information
3 people authored Oct 12, 2023
1 parent 2f23c52 commit 432d7d2
Show file tree
Hide file tree
Showing 38 changed files with 148 additions and 227 deletions.
161 changes: 79 additions & 82 deletions crates/re_data_store/src/store_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use nohash_hasher::IntMap;

use re_arrow_store::{DataStoreConfig, GarbageCollectionOptions};
use re_log_types::{
ApplicationId, ArrowMsg, ComponentPath, DataCell, DataRow, DataTable, EntityPath,
EntityPathHash, EntityPathOpMsg, LogMsg, PathOp, RowId, SetStoreInfo, StoreId, StoreInfo,
StoreKind, TimePoint, Timeline,
ApplicationId, ComponentPath, DataCell, DataRow, DataTable, EntityPath, EntityPathHash, LogMsg,
PathOp, RowId, SetStoreInfo, StoreId, StoreInfo, StoreKind, TimePoint, Timeline,
};
use re_types::{components::InstanceKey, Loggable as _};

Expand All @@ -15,6 +14,8 @@ use crate::{Error, TimesPerTimeline};
// ----------------------------------------------------------------------------

/// Stored entities with easy indexing of the paths.
///
/// NOTE: don't go mutating the contents of this. Use the public functions instead.
pub struct EntityDb {
/// In many places we just store the hashes, so we need a way to translate back.
pub entity_path_from_hash: IntMap<EntityPathHash, EntityPath>,
Expand Down Expand Up @@ -74,45 +75,12 @@ impl EntityDb {
.or_insert_with(|| entity_path.clone());
}

fn try_add_arrow_msg(&mut self, msg: &ArrowMsg) -> Result<(), Error> {
re_tracing::profile_function!();

// TODO(#1760): Compute the size of the datacells in the batching threads on the clients.
let mut table = DataTable::from_arrow_msg(msg)?;
table.compute_all_size_bytes();

// TODO(cmc): batch all of this
for row in table.to_rows() {
let row = row?;

self.register_entity_path(&row.entity_path);

self.try_add_data_row(&row)?;

// Look for a `ClearIsRecursive` component, and if it's there, go through the clear path
// instead.
use re_types::components::ClearIsRecursive;
if let Some(idx) = row.find_cell(&ClearIsRecursive::name()) {
let cell = &row.cells()[idx];
let settings = cell.try_to_native_mono::<ClearIsRecursive>().unwrap();
let path_op = if settings.map_or(false, |s| s.0) {
PathOp::ClearRecursive(row.entity_path.clone())
} else {
PathOp::ClearComponents(row.entity_path.clone())
};
// NOTE: We've just added the row itself, so make sure to bump the row ID already!
self.add_path_op(row.row_id().next(), row.timepoint(), &path_op);
}
}

Ok(())
}

// TODO(jleibs): If this shouldn't be public, chain together other setters
// TODO(cmc): Updates of secondary datastructures should be the result of subscribing to the
// datastore's changelog and reacting to these changes appropriately. We shouldn't be creating
// many sources of truth.
pub fn try_add_data_row(&mut self, row: &DataRow) -> Result<(), Error> {
fn add_data_row(&mut self, row: &DataRow) -> Result<(), Error> {
self.register_entity_path(&row.entity_path);

for (&timeline, &time_int) in row.timepoint().iter() {
self.times_per_timeline.insert(timeline, time_int);
}
Expand Down Expand Up @@ -144,7 +112,24 @@ impl EntityDb {
}
}

self.data_store.insert_row(row).map_err(Into::into)
self.data_store.insert_row(row)?;

// Look for a `ClearIsRecursive` component, and if it's there, go through the clear path
// instead.
use re_types::components::ClearIsRecursive;
if let Some(idx) = row.find_cell(&ClearIsRecursive::name()) {
let cell = &row.cells()[idx];
let settings = cell.try_to_native_mono::<ClearIsRecursive>().unwrap();
let path_op = if settings.map_or(false, |s| s.0) {
PathOp::ClearRecursive(row.entity_path.clone())
} else {
PathOp::ClearComponents(row.entity_path.clone())
};
// NOTE: We've just added the row itself, so make sure to bump the row ID already!
self.add_path_op(row.row_id().next(), row.timepoint(), &path_op);
}

Ok(())
}

fn add_path_op(&mut self, row_id: RowId, time_point: &TimePoint, path_op: &PathOp) {
Expand Down Expand Up @@ -226,51 +211,49 @@ impl EntityDb {
// ----------------------------------------------------------------------------

/// A in-memory database built from a stream of [`LogMsg`]es.
///
/// NOTE: all mutation is to be done via public functions!
pub struct StoreDb {
/// The [`StoreId`] for this log.
store_id: StoreId,

/// All [`EntityPathOpMsg`]s ever received.
entity_op_msgs: BTreeMap<RowId, EntityPathOpMsg>,

/// Set by whomever created this [`StoreDb`].
pub data_source: Option<re_smart_channel::SmartChannelSource>,

/// Comes in a special message, [`LogMsg::SetStoreInfo`].
recording_msg: Option<SetStoreInfo>,
set_store_info: Option<SetStoreInfo>,

/// Where we store the entities.
pub entity_db: EntityDb,
entity_db: EntityDb,
}

impl StoreDb {
pub fn new(store_id: StoreId) -> Self {
Self {
store_id,
entity_op_msgs: Default::default(),
data_source: None,
recording_msg: None,
set_store_info: None,
entity_db: Default::default(),
}
}

pub fn recording_msg(&self) -> Option<&SetStoreInfo> {
self.recording_msg.as_ref()
#[inline]
pub fn entity_db(&self) -> &EntityDb {
&self.entity_db
}

pub fn store_info_msg(&self) -> Option<&SetStoreInfo> {
self.set_store_info.as_ref()
}

pub fn store_info(&self) -> Option<&StoreInfo> {
self.recording_msg().map(|msg| &msg.info)
self.store_info_msg().map(|msg| &msg.info)
}

pub fn app_id(&self) -> Option<&ApplicationId> {
self.store_info().map(|ri| &ri.application_id)
}

#[inline]
pub fn store_mut(&mut self) -> &mut re_arrow_store::DataStore {
&mut self.entity_db.data_store
}

#[inline]
pub fn store(&self) -> &re_arrow_store::DataStore {
&self.entity_db.data_store
Expand Down Expand Up @@ -308,7 +291,7 @@ impl StoreDb {
}

pub fn is_empty(&self) -> bool {
self.recording_msg.is_none() && self.num_rows() == 0
self.set_store_info.is_none() && self.num_rows() == 0
}

pub fn add(&mut self, msg: &LogMsg) -> Result<(), Error> {
Expand All @@ -317,48 +300,68 @@ impl StoreDb {
debug_assert_eq!(msg.store_id(), self.store_id());

match &msg {
LogMsg::SetStoreInfo(msg) => self.add_begin_recording_msg(msg),
LogMsg::EntityPathOpMsg(_, msg) => {
let EntityPathOpMsg {
row_id,
time_point,
path_op,
} = msg;
self.entity_op_msgs.insert(*row_id, msg.clone());
self.entity_db.add_path_op(*row_id, time_point, path_op);
LogMsg::SetStoreInfo(msg) => self.set_store_info(msg.clone()),

LogMsg::ArrowMsg(_, arrow_msg) => {
let table = DataTable::from_arrow_msg(arrow_msg)?;
self.add_data_table(table)?;
}
LogMsg::ArrowMsg(_, inner) => self.entity_db.try_add_arrow_msg(inner)?,
}

Ok(())
}

pub fn add_begin_recording_msg(&mut self, msg: &SetStoreInfo) {
self.recording_msg = Some(msg.clone());
pub fn add_data_table(&mut self, mut table: DataTable) -> Result<(), Error> {
// TODO(#1760): Compute the size of the datacells in the batching threads on the clients.
table.compute_all_size_bytes();

// TODO(cmc): batch all of this
for row in table.to_rows() {
let row = row?;
self.add_data_row(&row)?;
}

Ok(())
}

pub fn add_data_row(&mut self, row: &DataRow) -> Result<(), Error> {
self.entity_db.add_data_row(row)
}

/// Returns an iterator over all [`EntityPathOpMsg`]s that have been written to this `StoreDb`.
pub fn iter_entity_op_msgs(&self) -> impl Iterator<Item = &EntityPathOpMsg> {
self.entity_op_msgs.values()
pub fn set_store_info(&mut self, store_info: SetStoreInfo) {
self.set_store_info = Some(store_info);
}

pub fn get_entity_op_msg(&self, row_id: &RowId) -> Option<&EntityPathOpMsg> {
self.entity_op_msgs.get(row_id)
pub fn gc_everything_but_the_latest_row(&mut self) {
re_tracing::profile_function!();

self.gc(GarbageCollectionOptions {
target: re_arrow_store::GarbageCollectionTarget::Everything,
gc_timeless: true,
protect_latest: 1, // TODO(jleibs): Bump this after we have an undo buffer
purge_empty_tables: true,
});
}

/// Free up some RAM by forgetting the older parts of all timelines.
pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) {
re_tracing::profile_function!();
assert!((0.0..=1.0).contains(&fraction_to_purge));

let (deleted, stats_diff) = self.entity_db.data_store.gc(GarbageCollectionOptions {
assert!((0.0..=1.0).contains(&fraction_to_purge));
self.gc(GarbageCollectionOptions {
target: re_arrow_store::GarbageCollectionTarget::DropAtLeastFraction(
fraction_to_purge as _,
),
gc_timeless: true,
protect_latest: 1,
purge_empty_tables: false,
});
}

pub fn gc(&mut self, gc_options: GarbageCollectionOptions) {
re_tracing::profile_function!();

let (deleted, stats_diff) = self.entity_db.data_store.gc(gc_options);
re_log::trace!(
num_row_ids_dropped = deleted.row_ids.len(),
size_bytes_dropped = re_format::format_bytes(stats_diff.total.num_bytes as _),
Expand All @@ -367,17 +370,11 @@ impl StoreDb {

let Self {
store_id: _,
entity_op_msgs,
data_source: _,
recording_msg: _,
set_store_info: _,
entity_db,
} = self;

{
re_tracing::profile_scope!("entity_op_msgs");
entity_op_msgs.retain(|row_id, _| !deleted.row_ids.contains(row_id));
}

entity_db.purge(&deleted);
}

Expand Down
3 changes: 1 addition & 2 deletions crates/re_data_ui/src/annotation_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ fn annotation_info(
) -> Option<AnnotationInfo> {
let class_id = ctx
.store_db
.entity_db
.data_store
.store()
.query_latest_component::<re_types::components::ClassId>(entity_path, query)?;
let annotations = crate::annotations(ctx, query, entity_path);
let class = annotations.resolved_class_description(Some(*class_id));
Expand Down
4 changes: 2 additions & 2 deletions crates/re_data_ui/src/component_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl DataUi for ComponentPath {
component_name,
} = self;

let store = &ctx.store_db.entity_db.data_store;
let store = ctx.store_db.store();

if let Some(archetype_name) = component_name.indicator_component_archetype() {
ui.label(format!(
Expand All @@ -30,7 +30,7 @@ impl DataUi for ComponentPath {
component_data,
}
.data_ui(ctx, ui, verbosity, query);
} else if let Some(entity_tree) = ctx.store_db.entity_db.tree.subtree(entity_path) {
} else if let Some(entity_tree) = ctx.store_db.entity_db().tree.subtree(entity_path) {
if entity_tree.components.contains_key(component_name) {
ui.label("<unset>");
} else {
Expand Down
3 changes: 1 addition & 2 deletions crates/re_data_ui/src/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ impl EntityDataUi for re_types::components::TensorData {

let tensor_data_row_id = ctx
.store_db
.entity_db
.data_store
.store()
.query_latest_component::<re_types::components::TensorData>(entity_path, query)
.map_or(RowId::ZERO, |tensor| tensor.row_id);

Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_ui/src/image_meaning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub fn image_meaning_for_entity(
entity_path: &EntityPath,
ctx: &ViewerContext<'_>,
) -> TensorDataMeaning {
let store = &ctx.store_db.entity_db.data_store;
let store = ctx.store_db.store();
let timeline = &ctx.current_query().timeline;
if store.entity_has_component(timeline, entity_path, &DepthImage::indicator().name()) {
TensorDataMeaning::Depth
Expand Down
4 changes: 2 additions & 2 deletions crates/re_data_ui/src/instance_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ impl DataUi for InstancePath {
instance_key,
} = self;

let store = &ctx.store_db.entity_db.data_store;
let store = ctx.store_db.store();

let Some(components) = store.all_components(&query.timeline, entity_path) else {
if ctx.store_db.entity_db.is_known_entity(entity_path) {
if ctx.store_db.entity_db().is_known_entity(entity_path) {
ui.label(ctx.re_ui.warning_text(format!(
"No components logged on timeline {:?}",
query.timeline.name()
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_ui/src/item_ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ pub fn instance_hover_card_ui(
let query = ctx.current_query();

if instance_path.instance_key.is_splat() {
let store = &ctx.store_db.entity_db.data_store;
let store = ctx.store_db.store();
let stats = store.entity_stats(query.timeline, instance_path.entity_path.hash());
entity_stats_ui(ui, &query.timeline, &stats);
} else {
Expand Down
29 changes: 1 addition & 28 deletions crates/re_data_ui/src/log_msg.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use re_log_types::{ArrowMsg, DataTable, EntityPathOpMsg, LogMsg, SetStoreInfo, StoreInfo};
use re_log_types::{ArrowMsg, DataTable, LogMsg, SetStoreInfo, StoreInfo};
use re_viewer_context::{UiVerbosity, ViewerContext};

use super::DataUi;
Expand All @@ -14,7 +14,6 @@ impl DataUi for LogMsg {
) {
match self {
LogMsg::SetStoreInfo(msg) => msg.data_ui(ctx, ui, verbosity, query),
LogMsg::EntityPathOpMsg(_, msg) => msg.data_ui(ctx, ui, verbosity, query),
LogMsg::ArrowMsg(_, msg) => msg.data_ui(ctx, ui, verbosity, query),
}
}
Expand Down Expand Up @@ -67,32 +66,6 @@ impl DataUi for SetStoreInfo {
}
}

impl DataUi for EntityPathOpMsg {
fn data_ui(
&self,
ctx: &mut ViewerContext<'_>,
ui: &mut egui::Ui,
verbosity: UiVerbosity,
query: &re_arrow_store::LatestAtQuery,
) {
let EntityPathOpMsg {
row_id: _,
time_point,
path_op,
} = self;

egui::Grid::new("fields").num_columns(2).show(ui, |ui| {
ui.monospace("time_point:");
time_point.data_ui(ctx, ui, verbosity, query);
ui.end_row();

ui.monospace("path_op:");
path_op.data_ui(ctx, ui, verbosity, query);
ui.end_row();
});
}
}

impl DataUi for ArrowMsg {
fn data_ui(
&self,
Expand Down
Loading

0 comments on commit 432d7d2

Please sign in to comment.