Skip to content

Commit

Permalink
implement timeless range support
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Jan 23, 2024
1 parent 600c6b3 commit be58f0a
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 45 deletions.
1 change: 1 addition & 0 deletions crates/re_log_types/src/data_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ impl std::fmt::Display for RowId {

impl RowId {
pub const ZERO: Self = Self(re_tuid::Tuid::ZERO);
pub const MAX: Self = Self(re_tuid::Tuid::MAX);

/// Create a new unique [`RowId`] based on the current time.
#[allow(clippy::new_without_default)]
Expand Down
96 changes: 85 additions & 11 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::{BTreeMap, VecDeque},
ops::Range,
sync::Arc,
};

Expand Down Expand Up @@ -327,6 +328,8 @@ impl CachesPerArchetype {

re_tracing::profile_function!();

// TODO(cmc): range invalidation

for latest_at_cache in self.latest_at_per_archetype.read().values() {
let mut latest_at_cache = latest_at_cache.write();

Expand Down Expand Up @@ -419,12 +422,6 @@ pub struct CacheBucket {
}

impl CacheBucket {
/// Iterate over the timestamps of the point-of-view components.
#[inline]
pub fn iter_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.data_times.iter()
}

#[inline]
pub fn time_range(&self) -> Option<TimeRange> {
let first_time = self.data_times.front().map(|(t, _)| *t)?;
Expand All @@ -444,6 +441,25 @@ impl CacheBucket {
self.data_times.binary_search(&(data_time, row_id)).is_ok()
}

/// How many timestamps' worth of data is stored in this bucket?
#[inline]
pub fn num_entries(&self) -> usize {
self.data_times.len()
}

#[inline]
pub fn is_empty(&self) -> bool {
self.num_entries() == 0
}

// ---

/// Iterate over the timestamps of the point-of-view components.
#[inline]
pub fn iter_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.data_times.iter()
}

/// Iterate over the [`InstanceKey`] batches of the point-of-view components.
#[inline]
pub fn iter_pov_instance_keys(&self) -> impl Iterator<Item = &[InstanceKey]> {
Expand Down Expand Up @@ -474,15 +490,73 @@ impl CacheBucket {
Some(data.iter())
}

/// How many timestamps' worth of data is stored in this bucket?
// ---

/// Returns the index range that corresponds to the specified `time_range`.
///
/// Use the returned range with one of the range iteration methods:
/// - [`Self::range_data_times`]
/// - [`Self::range_pov_instance_keys`]
/// - [`Self::range_component`]
/// - [`Self::range_component_opt`]
///
/// Make sure that the bucket hasn't been modified in-between!
///
/// This is `O(2*log(n))`, so make sure to clone the returned range rather than calling this
/// multiple times.
#[inline]
pub fn num_entries(&self) -> usize {
self.data_times.len()
pub fn entry_range(&self, time_range: TimeRange) -> Range<usize> {
let start_index = self
.data_times
.partition_point(|(data_time, _)| data_time < &time_range.min);
let end_index = self
.data_times
.partition_point(|(data_time, _)| data_time <= &time_range.max);
start_index..end_index
}

/// Range over the timestamps of the point-of-view components.
#[inline]
pub fn is_empty(&self) -> bool {
self.num_entries() == 0
pub fn range_data_times(
&self,
entry_range: Range<usize>,
) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.data_times.range(entry_range)
}

/// Range over the [`InstanceKey`] batches of the point-of-view components.
#[inline]
pub fn range_pov_instance_keys(
&self,
entry_range: Range<usize>,
) -> impl Iterator<Item = &[InstanceKey]> {
self.pov_instance_keys.range(entry_range)
}

/// Range over the batches of the specified non-optional component.
#[inline]
pub fn range_component<C: Component + Send + Sync + 'static>(
&self,
entry_range: Range<usize>,
) -> Option<impl Iterator<Item = &[C]>> {
let data = self
.components
.get(&C::name())
.and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<C>>())?;
Some(data.range(entry_range))
}

/// Range over the batches of the specified optional component.
#[inline]
pub fn range_component_opt<C: Component + Send + Sync + 'static>(
&self,
entry_range: Range<usize>,
) -> Option<impl Iterator<Item = &[Option<C>]>> {
let data = self
.components
.get(&C::name())
.and_then(|data| data.as_any().downcast_ref::<FlatVecDeque<Option<C>>>())?;
Some(data.range(entry_range))
}
}

Expand Down
41 changes: 20 additions & 21 deletions crates/re_query_cache/src/cache_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::BTreeMap;
use re_log_types::{EntityPath, TimeRange, Timeline};
use re_types_core::ComponentName;

use crate::{Caches, LatestAtCache, RangeCache};
use crate::{cache::CacheBucket, Caches, LatestAtCache, RangeCache};

// ---

Expand Down Expand Up @@ -71,6 +71,18 @@ impl Caches {
pub fn stats(detailed_stats: bool) -> CachesStats {
re_tracing::profile_function!();

fn upsert_bucket_stats(
per_component: &mut BTreeMap<ComponentName, CachedComponentStats>,
bucket: &CacheBucket,
) {
for (component_name, data) in &bucket.components {
let stats: &mut CachedComponentStats =
per_component.entry(*component_name).or_default();
stats.total_rows += data.dyn_num_entries() as u64;
stats.total_instances += data.dyn_num_values() as u64;
}
}

Self::with(|caches| {
let latest_at = caches
.0
Expand Down Expand Up @@ -98,22 +110,12 @@ impl Caches {
if let Some(per_component) = per_component.as_mut() {
re_tracing::profile_scope!("detailed");

for bucket in per_data_time.values() {
for (component_name, data) in &bucket.read().components {
let stats: &mut CachedComponentStats =
per_component.entry(*component_name).or_default();
stats.total_rows += data.dyn_num_entries() as u64;
stats.total_instances += data.dyn_num_values() as u64;
}
if let Some(bucket) = &timeless {
upsert_bucket_stats(per_component, bucket);
}

if let Some(bucket) = &timeless {
for (component_name, data) in &bucket.components {
let stats: &mut CachedComponentStats =
per_component.entry(*component_name).or_default();
stats.total_rows += data.dyn_num_entries() as u64;
stats.total_instances += data.dyn_num_values() as u64;
}
for bucket in per_data_time.values() {
upsert_bucket_stats(per_component, &bucket.read());
}
}
}
Expand Down Expand Up @@ -141,6 +143,7 @@ impl Caches {
.map(|range_cache| {
let RangeCache {
per_data_time,
timeless,
total_size_bytes,
} = &*range_cache.read();

Expand All @@ -150,12 +153,8 @@ impl Caches {
if let Some(per_component) = per_component.as_mut() {
re_tracing::profile_scope!("detailed");

for (component_name, data) in &per_data_time.components {
let stats: &mut CachedComponentStats =
per_component.entry(*component_name).or_default();
stats.total_rows += data.dyn_num_entries() as u64;
stats.total_instances += data.dyn_num_values() as u64;
}
upsert_bucket_stats(per_component, timeless);
upsert_bucket_stats(per_component, per_data_time);
}

(
Expand Down
61 changes: 51 additions & 10 deletions crates/re_query_cache/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use paste::paste;
use seq_macro::seq;

use re_data_store::{DataStore, RangeQuery, TimeInt};
use re_log_types::{EntityPath, RowId};
use re_log_types::{EntityPath, RowId, TimeRange};
use re_types_core::{components::InstanceKey, Archetype, Component};

use crate::{CacheBucket, Caches, MaybeCachedComponentData};
Expand All @@ -12,9 +12,14 @@ use crate::{CacheBucket, Caches, MaybeCachedComponentData};
/// Caches the results of `Range` queries.
#[derive(Default)]
pub struct RangeCache {
/// All timeful data, organized by _data_ time.
//
// TODO(cmc): bucketize
pub per_data_time: CacheBucket,

/// All timeless data.
pub timeless: CacheBucket,

/// Total size of the data stored in this cache in bytes.
pub total_size_bytes: u64,
}
Expand Down Expand Up @@ -104,19 +109,23 @@ macro_rules! impl_query_archetype_range {
),
),
{
let mut iter_results = |bucket: &crate::CacheBucket| -> crate::Result<()> {
let mut range_results =
|timeless: bool, bucket: &crate::CacheBucket, time_range: TimeRange| -> crate::Result<()> {
re_tracing::profile_scope!("iter");

let entry_range = bucket.entry_range(time_range);
dbg!(&entry_range);

let it = itertools::izip!(
bucket.iter_data_times(),
bucket.iter_pov_instance_keys(),
$(bucket.iter_component::<$pov>()
bucket.range_data_times(entry_range.clone()),
bucket.range_pov_instance_keys(entry_range.clone()),
$(bucket.range_component::<$pov>(entry_range.clone())
.ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+
$(bucket.iter_component_opt::<$comp>()
$(bucket.range_component_opt::<$comp>(entry_range.clone())
.ok_or_else(|| re_query::ComponentNotFoundError(<$comp>::name()))?,)*
).map(|((time, row_id), instance_keys, $($pov,)+ $($comp,)*)| {
(
(Some(*time), *row_id), // TODO(cmc): timeless
((!timeless).then_some(*time), *row_id),
MaybeCachedComponentData::Cached(instance_keys),
$(MaybeCachedComponentData::Cached($pov),)+
$(MaybeCachedComponentData::Cached($comp),)*
Expand Down Expand Up @@ -147,7 +156,7 @@ macro_rules! impl_query_archetype_range {
let mut added_size_bytes = 0u64;

for arch_view in arch_views {
let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN); // TODO(cmc): timeless
let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN);

if bucket.contains_data_row(data_time, arch_view.primary_row_id()) {
continue;
Expand All @@ -171,7 +180,35 @@ macro_rules! impl_query_archetype_range {
let mut range_callback = |query: &RangeQuery, range_cache: &mut crate::RangeCache| {
re_tracing::profile_scope!("range", format!("{query:?}"));

for reduced_query in range_cache.compute_queries(query) {
eprintln!("query 1: {query:?}");

// NOTE: Same logic as what the store does.
if query.range.min <= TimeInt::MIN {
let mut reduced_query = query.clone();
reduced_query.range.max = TimeInt::MIN; // inclusive

eprintln!("query timeless: {reduced_query:?}");

// NOTE: `+ 2` because we always grab the indicator component as well as the
// instance keys.
let arch_views =
::re_query::range_archetype::<A, { $N + $M + 2 }>(store, &reduced_query, entity_path);
range_cache.total_size_bytes +=
upsert_results::<A, $($pov,)+ $($comp,)*>(arch_views, &mut range_cache.timeless)?;

if !range_cache.timeless.is_empty() {
range_results(true, &range_cache.timeless, reduced_query.range)?;
}
}


let mut query = query.clone();
query.range.min = TimeInt::max((TimeInt::MIN.as_i64() + 1).into(), query.range.min);

eprintln!("query 2: {query:?}");

for reduced_query in range_cache.compute_queries(&query) {
eprintln!("query 2: {reduced_query:?}");
// NOTE: `+ 2` because we always grab the indicator component as well as the
// instance keys.
let arch_views =
Expand All @@ -180,7 +217,11 @@ macro_rules! impl_query_archetype_range {
upsert_results::<A, $($pov,)+ $($comp,)*>(arch_views, &mut range_cache.per_data_time)?;
}

iter_results(&range_cache.per_data_time)
if !range_cache.per_data_time.is_empty() {
range_results(false, &range_cache.per_data_time, query.range)?;
}

Ok(())
};


Expand Down
6 changes: 3 additions & 3 deletions crates/re_query_cache/tests/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ fn simple_range() {
}

#[test]
// TODO(cmc): timeless support
#[should_panic(expected = "assertion failed: `(left == right)`")]
fn timeless_range() {
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
Expand Down Expand Up @@ -212,6 +210,8 @@ fn timeless_range() {

// --- Third test: `[-inf, +inf]` ---

eprintln!("XXXXXXXXXXXXXXXXXXXXXXXX");

let query =
re_data_store::RangeQuery::new(timepoint1[0].0, TimeRange::new(TimeInt::MIN, TimeInt::MAX));

Expand Down Expand Up @@ -363,7 +363,7 @@ fn query_and_compare(store: &DataStore, query: &RangeQuery, ent_path: &EntityPat
}

// Keep this around for the next unlucky chap.
// eprintln!("(expected={expected_data_times:?}, uncached={uncached_data_times:?}, cached={cached_data_times:?})");
eprintln!("(expected={expected_data_times:?}, uncached={uncached_data_times:?}, cached={cached_data_times:?})");
// eprintln!("{}", store.to_data_table().unwrap());

similar_asserts::assert_eq!(expected_data_times, uncached_data_times);
Expand Down

0 comments on commit be58f0a

Please sign in to comment.