diff --git a/crates/re_data_store/src/polars_util.rs b/crates/re_data_store/src/polars_util.rs index 54ad80f8a57d..59b33ffdb264 100644 --- a/crates/re_data_store/src/polars_util.rs +++ b/crates/re_data_store/src/polars_util.rs @@ -123,21 +123,54 @@ pub fn range_components<'a, const N: usize>( let mut state = None; + // NOTE: This will return none for `TimeInt::Min`, i.e. range queries that start infinitely far + // into the past don't have a latest-at state! + let latest_time = query.range.min.as_i64().checked_sub(1).map(Into::into); + + let mut df_latest = None; + if let Some(latest_time) = latest_time { + let df = latest_components( + store, + &LatestAtQuery::new(query.timeline, latest_time), + ent_path, + &components, + join_type, + ); + + df_latest = Some(df); + } + let primary_col = components .iter() .find_position(|component| **component == primary) .map(|(col, _)| col) .unwrap(); // asserted on entry - store - .range(query, ent_path, components) - .map(move |(time, _, cells)| { - ( - time, - cells[primary_col].is_some(), // is_primary - dataframe_from_cells(&cells), - ) - }) + // send the latest-at state before anything else + df_latest + .into_iter() + // NOTE: `false` here means we will _not_ yield the latest-at state as an actual + // ArchetypeView! + // That is a very important detail: for overlapping range queries to be correct in a + // multi-tenant cache context, we need to make sure to inherit the latest-at state + // from T-1, while also making sure to _not_ yield the view that comes with that state. + // + // Consider e.g. what happens when one system queries for `range(10, 20)` while another + // queries for `range(9, 20)`: the data at timestamp `10` would differ because of the + // statefulness of range queries! + .map(move |df| (latest_time, false, df)) + // followed by the range + .chain( + store + .range(query, ent_path, components) + .map(move |(time, _, cells)| { + ( + time, + cells[primary_col].is_some(), // is_primary + dataframe_from_cells(&cells), + ) + }), + ) .filter_map(move |(time, is_primary, df)| { state = Some(join_dataframes( cluster_key, diff --git a/crates/re_data_store/tests/data_store.rs b/crates/re_data_store/tests/data_store.rs index b52418325ee9..136c0da84bb8 100644 --- a/crates/re_data_store/tests/data_store.rs +++ b/crates/re_data_store/tests/data_store.rs @@ -553,18 +553,18 @@ fn range_impl(store: &mut DataStore) { // Unit ranges (Color's PoV) - // NOTE: Check out [1] to see what the results would've looked like with latest-at semantics at - // T-1 baked in (like we used to do). - // - // [1]: - assert_range_components( TimeRange::new(frame1, frame1), [Color::name(), Position2D::name()], - &[( - Some(frame1), - &[(Color::name(), &row1)], // - )], + &[ + ( + Some(frame1), + &[ + (Color::name(), &row1), + (Position2D::name(), &row4_4), // timeless + ], + ), // + ], ); assert_range_components( TimeRange::new(frame2, frame2), @@ -582,11 +582,11 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame4), - &[(Color::name(), &row4_1)], // + &[(Color::name(), &row4_1), (Position2D::name(), &row3)], ), ( Some(frame4), - &[(Color::name(), &row4_2)], // + &[(Color::name(), &row4_2), (Position2D::name(), &row3)], ), ( Some(frame4), @@ -613,17 +613,19 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame2), - &[(Position2D::name(), &row2)], // + &[(Position2D::name(), &row2), (Color::name(), &row1)], ), // ], ); assert_range_components( TimeRange::new(frame3, frame3), [Position2D::name(), Color::name()], - &[( - Some(frame3), - &[(Position2D::name(), &row3)], // - )], + &[ + ( + Some(frame3), + &[(Position2D::name(), &row3), (Color::name(), &row1)], + ), // + ], ); assert_range_components( TimeRange::new(frame4, frame4), @@ -653,7 +655,10 @@ fn range_impl(store: &mut DataStore) { &[ ( Some(frame1), - &[(Color::name(), &row1)], // + &[ + (Color::name(), &row1), + (Position2D::name(), &row4_4), // timeless + ], ), ( Some(frame4), diff --git a/crates/re_query/src/range.rs b/crates/re_query/src/range.rs index 44d3ea255620..8c6239925346 100644 --- a/crates/re_query/src/range.rs +++ b/crates/re_query/src/range.rs @@ -1,9 +1,9 @@ use itertools::Itertools as _; -use re_data_store::{DataStore, RangeQuery}; +use re_data_store::{DataStore, LatestAtQuery, RangeQuery}; use re_log_types::EntityPath; use re_types_core::{Archetype, ComponentName}; -use crate::{ArchetypeView, ComponentWithInstances}; +use crate::{get_component_with_instances, ArchetypeView, ComponentWithInstances}; // --- @@ -61,29 +61,68 @@ pub fn range_archetype<'a, A: Archetype + 'a, const N: usize>( .take(components.len()) .collect(); - store - .range(query, ent_path, components) - .map(move |(data_time, row_id, mut cells)| { - // NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed - // by the store. - let instance_keys = cells[cluster_col].take().unwrap(); - let is_primary = cells[primary_col].is_some(); - let cwis = cells - .into_iter() - .map(|cell| { - cell.map(|cell| { - ( - row_id, - ComponentWithInstances { - instance_keys: instance_keys.clone(), /* shallow */ - values: cell, - }, - ) + // NOTE: This will return none for `TimeInt::Min`, i.e. range queries that start infinitely far + // into the past don't have a latest-at state! + let query_time = query.range.min.as_i64().checked_sub(1).map(Into::into); + + let mut cwis_latest = None; + if let Some(query_time) = query_time { + let mut cwis_latest_raw: Vec<_> = std::iter::repeat_with(|| None) + .take(components.len()) + .collect(); + + // Fetch the latest data for every single component from their respective point-of-views, + // this will allow us to build up the initial state and send an initial latest-at + // entity-view if needed. + for (i, primary) in components.iter().enumerate() { + cwis_latest_raw[i] = get_component_with_instances( + store, + &LatestAtQuery::new(query.timeline, query_time), + ent_path, + *primary, + ) + .map(|(_, row_id, cwi)| (row_id, cwi)); + } + + cwis_latest = Some(cwis_latest_raw); + } + + // send the latest-at state before anything else + cwis_latest + .into_iter() + // NOTE: `false` here means we will _not_ yield the latest-at state as an actual + // ArchetypeView! + // That is a very important detail: for overlapping range queries to be correct in a + // multi-tenant cache context, we need to make sure to inherit the latest-at state + // from T-1, while also making sure to _not_ yield the view that comes with that state. + // + // Consider e.g. what happens when one system queries for `range(10, 20)` while another + // queries for `range(9, 20)`: the data at timestamp `10` would differ because of the + // statefulness of range queries! + .map(move |cwis| (query_time, false, cwis)) + .chain(store.range(query, ent_path, components).map( + move |(data_time, row_id, mut cells)| { + // NOTE: The unwrap cannot fail, the cluster key's presence is guaranteed + // by the store. + let instance_keys = cells[cluster_col].take().unwrap(); + let is_primary = cells[primary_col].is_some(); + let cwis = cells + .into_iter() + .map(|cell| { + cell.map(|cell| { + ( + row_id, + ComponentWithInstances { + instance_keys: instance_keys.clone(), /* shallow */ + values: cell, + }, + ) + }) }) - }) - .collect::>(); - (data_time, is_primary, cwis) - }) + .collect::>(); + (data_time, is_primary, cwis) + }, + )) .filter_map(move |(data_time, is_primary, cwis)| { for (i, cwi) in cwis .into_iter() diff --git a/crates/re_query/tests/archetype_range_tests.rs b/crates/re_query/tests/archetype_range_tests.rs index 980e7b5c99c9..4642a8630a3b 100644 --- a/crates/re_query/tests/archetype_range_tests.rs +++ b/crates/re_query/tests/archetype_range_tests.rs @@ -321,8 +321,6 @@ fn timeless_range() { // --- First test: `(timepoint1, timepoint3]` --- - // The exclusion of `timepoint1` means latest-at semantics will kick in! - let query = re_data_store::RangeQuery::new( timepoint1[0].0, TimeRange::new((timepoint1[0].1.as_i64() + 1).into(), timepoint3[0].1), @@ -416,7 +414,7 @@ fn timeless_range() { Some(Position2D::new(1.0, 2.0)), Some(Position2D::new(3.0, 4.0)), ]; - let colors: Vec> = vec![None, None]; + let colors = vec![None, Some(Color::from_rgb(255, 0, 0))]; let expected = DataCellRow(smallvec![ DataCell::from_native_sparse(instances), DataCell::from_native_sparse(positions), @@ -731,8 +729,6 @@ fn simple_splatted_range() { // --- Second test: `[timepoint1, timepoint3]` --- - // The inclusion of `timepoint1` means latest-at semantics will _not_ kick in! - let query = re_data_store::RangeQuery::new( timepoint1[0].0, TimeRange::new(timepoint1[0].1, timepoint3[0].1), diff --git a/tests/rust/plot_dashboard_stress/Cargo.toml b/tests/rust/plot_dashboard_stress/Cargo.toml index 25ea17a9c7a2..33cafcd126df 100644 --- a/tests/rust/plot_dashboard_stress/Cargo.toml +++ b/tests/rust/plot_dashboard_stress/Cargo.toml @@ -8,7 +8,7 @@ publish = false [dependencies] re_log = { workspace = true, features = ["setup"] } -rerun = { path = "../../../crates/rerun" } +rerun = { path = "../../../crates/rerun", features = ["clap"] } anyhow = "1.0" clap = { version = "4.0", features = ["derive"] }