From be58f0ac5c8c19f4aebb7071ea394e7435038f39 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 16 Jan 2024 17:47:05 +0100 Subject: [PATCH] implement timeless range support --- crates/re_log_types/src/data_row.rs | 1 + crates/re_query_cache/src/cache.rs | 96 +++++++++++++++++++++--- crates/re_query_cache/src/cache_stats.rs | 41 +++++----- crates/re_query_cache/src/range.rs | 61 ++++++++++++--- crates/re_query_cache/tests/range.rs | 6 +- 5 files changed, 160 insertions(+), 45 deletions(-) diff --git a/crates/re_log_types/src/data_row.rs b/crates/re_log_types/src/data_row.rs index fffd27c7e21c..46501eea8039 100644 --- a/crates/re_log_types/src/data_row.rs +++ b/crates/re_log_types/src/data_row.rs @@ -155,6 +155,7 @@ impl std::fmt::Display for RowId { impl RowId { pub const ZERO: Self = Self(re_tuid::Tuid::ZERO); + pub const MAX: Self = Self(re_tuid::Tuid::MAX); /// Create a new unique [`RowId`] based on the current time. #[allow(clippy::new_without_default)] diff --git a/crates/re_query_cache/src/cache.rs b/crates/re_query_cache/src/cache.rs index 14be0c39a40b..dda9c599afc9 100644 --- a/crates/re_query_cache/src/cache.rs +++ b/crates/re_query_cache/src/cache.rs @@ -1,5 +1,6 @@ use std::{ collections::{BTreeMap, VecDeque}, + ops::Range, sync::Arc, }; @@ -327,6 +328,8 @@ impl CachesPerArchetype { re_tracing::profile_function!(); + // TODO(cmc): range invalidation + for latest_at_cache in self.latest_at_per_archetype.read().values() { let mut latest_at_cache = latest_at_cache.write(); @@ -419,12 +422,6 @@ pub struct CacheBucket { } impl CacheBucket { - /// Iterate over the timestamps of the point-of-view components. - #[inline] - pub fn iter_data_times(&self) -> impl Iterator { - self.data_times.iter() - } - #[inline] pub fn time_range(&self) -> Option { let first_time = self.data_times.front().map(|(t, _)| *t)?; @@ -444,6 +441,25 @@ impl CacheBucket { self.data_times.binary_search(&(data_time, row_id)).is_ok() } + /// How many timestamps' worth of data is stored in this bucket? + #[inline] + pub fn num_entries(&self) -> usize { + self.data_times.len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.num_entries() == 0 + } + + // --- + + /// Iterate over the timestamps of the point-of-view components. + #[inline] + pub fn iter_data_times(&self) -> impl Iterator { + self.data_times.iter() + } + /// Iterate over the [`InstanceKey`] batches of the point-of-view components. #[inline] pub fn iter_pov_instance_keys(&self) -> impl Iterator { @@ -474,15 +490,73 @@ impl CacheBucket { Some(data.iter()) } - /// How many timestamps' worth of data is stored in this bucket? + // --- + + /// Returns the index range that corresponds to the specified `time_range`. + /// + /// Use the returned range with one of the range iteration methods: + /// - [`Self::range_data_times`] + /// - [`Self::range_pov_instance_keys`] + /// - [`Self::range_component`] + /// - [`Self::range_component_opt`] + /// + /// Make sure that the bucket hasn't been modified in-between! + /// + /// This is `O(2*log(n))`, so make sure to clone the returned range rather than calling this + /// multiple times. #[inline] - pub fn num_entries(&self) -> usize { - self.data_times.len() + pub fn entry_range(&self, time_range: TimeRange) -> Range { + let start_index = self + .data_times + .partition_point(|(data_time, _)| data_time < &time_range.min); + let end_index = self + .data_times + .partition_point(|(data_time, _)| data_time <= &time_range.max); + start_index..end_index } + /// Range over the timestamps of the point-of-view components. #[inline] - pub fn is_empty(&self) -> bool { - self.num_entries() == 0 + pub fn range_data_times( + &self, + entry_range: Range, + ) -> impl Iterator { + self.data_times.range(entry_range) + } + + /// Range over the [`InstanceKey`] batches of the point-of-view components. + #[inline] + pub fn range_pov_instance_keys( + &self, + entry_range: Range, + ) -> impl Iterator { + self.pov_instance_keys.range(entry_range) + } + + /// Range over the batches of the specified non-optional component. + #[inline] + pub fn range_component( + &self, + entry_range: Range, + ) -> Option> { + let data = self + .components + .get(&C::name()) + .and_then(|data| data.as_any().downcast_ref::>())?; + Some(data.range(entry_range)) + } + + /// Range over the batches of the specified optional component. + #[inline] + pub fn range_component_opt( + &self, + entry_range: Range, + ) -> Option]>> { + let data = self + .components + .get(&C::name()) + .and_then(|data| data.as_any().downcast_ref::>>())?; + Some(data.range(entry_range)) } } diff --git a/crates/re_query_cache/src/cache_stats.rs b/crates/re_query_cache/src/cache_stats.rs index 1147463ecaa7..76d2e6a2d5f7 100644 --- a/crates/re_query_cache/src/cache_stats.rs +++ b/crates/re_query_cache/src/cache_stats.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use re_log_types::{EntityPath, TimeRange, Timeline}; use re_types_core::ComponentName; -use crate::{Caches, LatestAtCache, RangeCache}; +use crate::{cache::CacheBucket, Caches, LatestAtCache, RangeCache}; // --- @@ -71,6 +71,18 @@ impl Caches { pub fn stats(detailed_stats: bool) -> CachesStats { re_tracing::profile_function!(); + fn upsert_bucket_stats( + per_component: &mut BTreeMap, + bucket: &CacheBucket, + ) { + for (component_name, data) in &bucket.components { + let stats: &mut CachedComponentStats = + per_component.entry(*component_name).or_default(); + stats.total_rows += data.dyn_num_entries() as u64; + stats.total_instances += data.dyn_num_values() as u64; + } + } + Self::with(|caches| { let latest_at = caches .0 @@ -98,22 +110,12 @@ impl Caches { if let Some(per_component) = per_component.as_mut() { re_tracing::profile_scope!("detailed"); - for bucket in per_data_time.values() { - for (component_name, data) in &bucket.read().components { - let stats: &mut CachedComponentStats = - per_component.entry(*component_name).or_default(); - stats.total_rows += data.dyn_num_entries() as u64; - stats.total_instances += data.dyn_num_values() as u64; - } + if let Some(bucket) = &timeless { + upsert_bucket_stats(per_component, bucket); } - if let Some(bucket) = &timeless { - for (component_name, data) in &bucket.components { - let stats: &mut CachedComponentStats = - per_component.entry(*component_name).or_default(); - stats.total_rows += data.dyn_num_entries() as u64; - stats.total_instances += data.dyn_num_values() as u64; - } + for bucket in per_data_time.values() { + upsert_bucket_stats(per_component, &bucket.read()); } } } @@ -141,6 +143,7 @@ impl Caches { .map(|range_cache| { let RangeCache { per_data_time, + timeless, total_size_bytes, } = &*range_cache.read(); @@ -150,12 +153,8 @@ impl Caches { if let Some(per_component) = per_component.as_mut() { re_tracing::profile_scope!("detailed"); - for (component_name, data) in &per_data_time.components { - let stats: &mut CachedComponentStats = - per_component.entry(*component_name).or_default(); - stats.total_rows += data.dyn_num_entries() as u64; - stats.total_instances += data.dyn_num_values() as u64; - } + upsert_bucket_stats(per_component, timeless); + upsert_bucket_stats(per_component, per_data_time); } ( diff --git a/crates/re_query_cache/src/range.rs b/crates/re_query_cache/src/range.rs index a11cf6eb3388..5fdc3095e049 100644 --- a/crates/re_query_cache/src/range.rs +++ b/crates/re_query_cache/src/range.rs @@ -2,7 +2,7 @@ use paste::paste; use seq_macro::seq; use re_data_store::{DataStore, RangeQuery, TimeInt}; -use re_log_types::{EntityPath, RowId}; +use re_log_types::{EntityPath, RowId, TimeRange}; use re_types_core::{components::InstanceKey, Archetype, Component}; use crate::{CacheBucket, Caches, MaybeCachedComponentData}; @@ -12,9 +12,14 @@ use crate::{CacheBucket, Caches, MaybeCachedComponentData}; /// Caches the results of `Range` queries. #[derive(Default)] pub struct RangeCache { + /// All timeful data, organized by _data_ time. + // // TODO(cmc): bucketize pub per_data_time: CacheBucket, + /// All timeless data. + pub timeless: CacheBucket, + /// Total size of the data stored in this cache in bytes. pub total_size_bytes: u64, } @@ -104,19 +109,23 @@ macro_rules! impl_query_archetype_range { ), ), { - let mut iter_results = |bucket: &crate::CacheBucket| -> crate::Result<()> { + let mut range_results = + |timeless: bool, bucket: &crate::CacheBucket, time_range: TimeRange| -> crate::Result<()> { re_tracing::profile_scope!("iter"); + let entry_range = bucket.entry_range(time_range); + dbg!(&entry_range); + let it = itertools::izip!( - bucket.iter_data_times(), - bucket.iter_pov_instance_keys(), - $(bucket.iter_component::<$pov>() + bucket.range_data_times(entry_range.clone()), + bucket.range_pov_instance_keys(entry_range.clone()), + $(bucket.range_component::<$pov>(entry_range.clone()) .ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+ - $(bucket.iter_component_opt::<$comp>() + $(bucket.range_component_opt::<$comp>(entry_range.clone()) .ok_or_else(|| re_query::ComponentNotFoundError(<$comp>::name()))?,)* ).map(|((time, row_id), instance_keys, $($pov,)+ $($comp,)*)| { ( - (Some(*time), *row_id), // TODO(cmc): timeless + ((!timeless).then_some(*time), *row_id), MaybeCachedComponentData::Cached(instance_keys), $(MaybeCachedComponentData::Cached($pov),)+ $(MaybeCachedComponentData::Cached($comp),)* @@ -147,7 +156,7 @@ macro_rules! impl_query_archetype_range { let mut added_size_bytes = 0u64; for arch_view in arch_views { - let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN); // TODO(cmc): timeless + let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN); if bucket.contains_data_row(data_time, arch_view.primary_row_id()) { continue; @@ -171,7 +180,35 @@ macro_rules! impl_query_archetype_range { let mut range_callback = |query: &RangeQuery, range_cache: &mut crate::RangeCache| { re_tracing::profile_scope!("range", format!("{query:?}")); - for reduced_query in range_cache.compute_queries(query) { + eprintln!("query 1: {query:?}"); + + // NOTE: Same logic as what the store does. + if query.range.min <= TimeInt::MIN { + let mut reduced_query = query.clone(); + reduced_query.range.max = TimeInt::MIN; // inclusive + + eprintln!("query timeless: {reduced_query:?}"); + + // NOTE: `+ 2` because we always grab the indicator component as well as the + // instance keys. + let arch_views = + ::re_query::range_archetype::(store, &reduced_query, entity_path); + range_cache.total_size_bytes += + upsert_results::(arch_views, &mut range_cache.timeless)?; + + if !range_cache.timeless.is_empty() { + range_results(true, &range_cache.timeless, reduced_query.range)?; + } + } + + + let mut query = query.clone(); + query.range.min = TimeInt::max((TimeInt::MIN.as_i64() + 1).into(), query.range.min); + + eprintln!("query 2: {query:?}"); + + for reduced_query in range_cache.compute_queries(&query) { + eprintln!("query 2: {reduced_query:?}"); // NOTE: `+ 2` because we always grab the indicator component as well as the // instance keys. let arch_views = @@ -180,7 +217,11 @@ macro_rules! impl_query_archetype_range { upsert_results::(arch_views, &mut range_cache.per_data_time)?; } - iter_results(&range_cache.per_data_time) + if !range_cache.per_data_time.is_empty() { + range_results(false, &range_cache.per_data_time, query.range)?; + } + + Ok(()) }; diff --git a/crates/re_query_cache/tests/range.rs b/crates/re_query_cache/tests/range.rs index a0312ba1127a..df8dd59c9555 100644 --- a/crates/re_query_cache/tests/range.rs +++ b/crates/re_query_cache/tests/range.rs @@ -97,8 +97,6 @@ fn simple_range() { } #[test] -// TODO(cmc): timeless support -#[should_panic(expected = "assertion failed: `(left == right)`")] fn timeless_range() { let mut store = DataStore::new( re_log_types::StoreId::random(re_log_types::StoreKind::Recording), @@ -212,6 +210,8 @@ fn timeless_range() { // --- Third test: `[-inf, +inf]` --- + eprintln!("XXXXXXXXXXXXXXXXXXXXXXXX"); + let query = re_data_store::RangeQuery::new(timepoint1[0].0, TimeRange::new(TimeInt::MIN, TimeInt::MAX)); @@ -363,7 +363,7 @@ fn query_and_compare(store: &DataStore, query: &RangeQuery, ent_path: &EntityPat } // Keep this around for the next unlucky chap. - // eprintln!("(expected={expected_data_times:?}, uncached={uncached_data_times:?}, cached={cached_data_times:?})"); + eprintln!("(expected={expected_data_times:?}, uncached={uncached_data_times:?}, cached={cached_data_times:?})"); // eprintln!("{}", store.to_data_table().unwrap()); similar_asserts::assert_eq!(expected_data_times, uncached_data_times);