diff --git a/crates/store/re_chunk/Cargo.toml b/crates/store/re_chunk/Cargo.toml index d3e285ca2a53c..1907fa355e480 100644 --- a/crates/store/re_chunk/Cargo.toml +++ b/crates/store/re_chunk/Cargo.toml @@ -30,9 +30,6 @@ serde = [ "re_types_core/serde", ] -## Enable conversion to and from arrow-rs types -arrow = ["arrow2/arrow"] - [dependencies] diff --git a/crates/store/re_chunk/src/arrow.rs b/crates/store/re_chunk/src/arrow.rs deleted file mode 100644 index 4802c21704844..0000000000000 --- a/crates/store/re_chunk/src/arrow.rs +++ /dev/null @@ -1,40 +0,0 @@ -use arrow::{ - array::{make_array, RecordBatch}, - error::ArrowError, -}; - -use crate::TransportChunk; - -impl TransportChunk { - /// Create an arrow-rs [`RecordBatch`] containing the data from this [`TransportChunk`]. - /// - /// This is a "fairly" cheap operation, as it does not copy the underlying arrow data, - /// but does incur overhead of generating an alternative representation of the arrow- - /// related rust structures that refer to those data buffers. - pub fn try_to_arrow_record_batch(&self) -> Result { - let columns: Vec<_> = self - .columns() - .iter() - .map(|arr2_array| make_array(arrow2::array::to_data(*arr2_array))) - .collect(); - - RecordBatch::try_new(self.schema(), columns) - } - - /// Create a [`TransportChunk`] from an arrow-rs [`RecordBatch`]. - /// - /// This is a "fairly" cheap operation, as it does not copy the underlying arrow data, - /// but does incur overhead of generating an alternative representation of the arrow- - /// related rust structures that refer to those data buffers. - pub fn from_arrow_record_batch(batch: &RecordBatch) -> Self { - let columns: Vec<_> = batch - .columns() - .iter() - .map(|array| arrow2::array::from_data(&array.to_data())) - .collect(); - - let data = arrow2::chunk::Chunk::new(columns); - - Self::new(batch.schema(), data) - } -} diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index 72827a1b81e5a..31cdc5fe5fdca 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -956,13 +956,29 @@ impl Chunk { } } - /// Unconditionally inserts an [`Arrow2ListArray`] as a component column. + /// Unconditionally inserts an [`ArrowListArray`] as a component column. /// /// Removes and replaces the column if it already exists. /// /// This will fail if the end result is malformed in any way -- see [`Self::sanity_check`]. #[inline] pub fn add_component( + &mut self, + component_desc: ComponentDescriptor, + list_array: ArrowListArray, + ) -> ChunkResult<()> { + self.components + .insert_descriptor(component_desc, list_array); + self.sanity_check() + } + + /// Unconditionally inserts an [`Arrow2ListArray`] as a component column. + /// + /// Removes and replaces the column if it already exists. + /// + /// This will fail if the end result is malformed in any way -- see [`Self::sanity_check`]. + #[inline] + pub fn add_component_arrow2( &mut self, component_desc: ComponentDescriptor, list_array: Arrow2ListArray, diff --git a/crates/store/re_chunk/src/concat_record_batches.rs b/crates/store/re_chunk/src/concat_record_batches.rs index beae93c6a655b..6e678c5cd72d5 100644 --- a/crates/store/re_chunk/src/concat_record_batches.rs +++ b/crates/store/re_chunk/src/concat_record_batches.rs @@ -1,17 +1,12 @@ -use crate::TransportChunk; - +use arrow::array::RecordBatch; use arrow::datatypes::Schema as ArrowSchema; -use arrow2::chunk::Chunk as Arrow2Chunk; -/// Concatenate multiple [`TransportChunk`]s into one. -/// -/// This is a temporary method that we use while waiting to migrate towards `arrow-rs`. -/// * `arrow2` doesn't have a `RecordBatch` type, therefore we emulate that using our `TransportChunk`s. -/// * `arrow-rs` does have one, and it natively supports concatenation. +/// Concatenate multiple [`RecordBatch`]s into one. +// TODO: make a member of `RecordBatch` instead pub fn concatenate_record_batches( schema: impl Into, - batches: &[TransportChunk], -) -> anyhow::Result { + batches: &[RecordBatch], +) -> anyhow::Result { let schema: ArrowSchema = schema.into(); anyhow::ensure!( batches @@ -20,21 +15,8 @@ pub fn concatenate_record_batches( "concatenate_record_batches: all batches must have the same schema" ); - let mut output_columns = Vec::new(); - - if !batches.is_empty() { - for (i, _field) in schema.fields.iter().enumerate() { - let arrays: Option> = batches.iter().map(|batch| batch.column(i)).collect(); - let arrays = arrays.ok_or_else(|| { - anyhow::anyhow!("concatenate_record_batches: all batches must have the same schema") - })?; - let array = re_arrow_util::arrow2_util::concat_arrays(&arrays)?; - output_columns.push(array); - } - } + // TODO: is_sorted is probably false now! - Ok(TransportChunk::new( - schema, - Arrow2Chunk::new(output_columns), - )) + let record_batch = arrow::compute::concat_batches(&schema.into(), batches)?; + Ok(record_batch) } diff --git a/crates/store/re_chunk/src/lib.rs b/crates/store/re_chunk/src/lib.rs index 30ff3c5820da9..f6861f612a475 100644 --- a/crates/store/re_chunk/src/lib.rs +++ b/crates/store/re_chunk/src/lib.rs @@ -21,9 +21,6 @@ mod transport; #[cfg(not(target_arch = "wasm32"))] mod batcher; -#[cfg(feature = "arrow")] -mod arrow; - pub use self::builder::{ChunkBuilder, TimeColumnBuilder}; pub use self::chunk::{ Chunk, ChunkComponents, ChunkError, ChunkResult, TimeColumn, TimeColumnError, diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index d157c8b52f981..0a993bb8efaf9 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -1,26 +1,23 @@ -use std::sync::Arc; - use arrow::{ array::{ - ArrayRef as ArrowArrayRef, RecordBatch as ArrowRecordBatch, StructArray as ArrowStructArray, + Array as ArrowArray, ArrayRef as ArrowArrayRef, ListArray as ArrowListArray, + RecordBatch as ArrowRecordBatch, StructArray as ArrowStructArray, + }, + datatypes::{ + DataType as ArrowDatatype, Field as ArrowField, Fields as ArrowFields, + Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit as ArrowTimeUnit, }, - datatypes::{Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}, -}; -use arrow2::{ - array::{Array as Arrow2Array, ListArray as Arrow2ListArray}, - chunk::Chunk as Arrow2Chunk, - datatypes::{DataType as Arrow2Datatype, TimeUnit as Arrow2TimeUnit}, }; use itertools::Itertools; use nohash_hasher::IntMap; use tap::Tap as _; -use re_arrow_util::{ - arrow_util::into_arrow_ref, Arrow2ArrayDowncastRef as _, ArrowArrayDowncastRef as _, -}; +use re_arrow_util::{arrow_util::into_arrow_ref, ArrowArrayDowncastRef as _}; use re_byte_size::SizeBytes as _; use re_log_types::{EntityPath, Timeline}; -use re_types_core::{Component as _, ComponentDescriptor, Loggable as _}; +use re_types_core::{ + arrow_helpers::as_array_ref, Component as _, ComponentDescriptor, Loggable as _, +}; use crate::{chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, RowId, TimeColumn}; @@ -30,7 +27,10 @@ pub type ArrowMetadata = std::collections::HashMap; /// A [`Chunk`] that is ready for transport. Obtained by calling [`Chunk::to_transport`]. /// -/// Implemented as an Arrow dataframe: a schema and a batch. +/// It contains a schema with a matching number of columns, all of the same length. +/// +/// This is just a wrapper around an [`ArrowRecordBatch`], with some helper functions on top. +/// It can be converted to and from [`ArrowRecordBatch`] without overhead. /// /// Use the `Display` implementation to dump the chunk as a nicely formatted table. /// @@ -41,73 +41,49 @@ pub type ArrowMetadata = std::collections::HashMap; /// claiming to be sorted while it is in fact not). #[derive(Debug, Clone)] pub struct TransportChunk { - /// The schema of the dataframe, and all chunk-level and field-level metadata. - /// - /// Take a look at the `TransportChunk::CHUNK_METADATA_*` and `TransportChunk::FIELD_METADATA_*` - /// constants for more information about available metadata. - schema: ArrowSchemaRef, - - /// All the control, time and component data. - data: Arrow2Chunk>, + batch: ArrowRecordBatch, } impl std::fmt::Display for TransportChunk { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // TODO(#3741): simplify code when we have migrated to arrow-rs re_format_arrow::format_dataframe( - &self.schema.metadata.clone().into_iter().collect(), - &self.schema.fields, - &self - .data - .iter() - .map(|list_array| ArrowArrayRef::from(list_array.clone())) - .collect_vec(), + &self.schema_ref().metadata.clone().into_iter().collect(), // HashMap -> BTreeMap + &self.schema_ref().fields, + self.batch.columns(), f.width(), ) .fmt(f) } } -impl TransportChunk { - pub fn new( - schema: impl Into, - columns: impl Into>>, - ) -> Self { - Self { - schema: schema.into(), - data: columns.into(), - } +impl AsRef for TransportChunk { + #[inline] + fn as_ref(&self) -> &ArrowRecordBatch { + &self.batch + } +} + +impl std::ops::Deref for TransportChunk { + type Target = ArrowRecordBatch; + + #[inline] + fn deref(&self) -> &ArrowRecordBatch { + &self.batch } } impl From for TransportChunk { + #[inline] fn from(batch: ArrowRecordBatch) -> Self { - Self::new( - batch.schema(), - Arrow2Chunk::new( - batch - .columns() - .iter() - .map(|column| column.clone().into()) - .collect(), - ), - ) + Self { batch } } } -impl TryFrom for ArrowRecordBatch { - type Error = arrow::error::ArrowError; - - fn try_from(chunk: TransportChunk) -> Result { - let TransportChunk { schema, data } = chunk; - Self::try_new( - schema, - data.columns() - .iter() - .map(|column| column.clone().into()) - .collect(), - ) +impl From for ArrowRecordBatch { + #[inline] + fn from(chunk: TransportChunk) -> Self { + chunk.batch } } @@ -303,17 +279,14 @@ impl TransportChunk { impl TransportChunk { #[inline] pub fn id(&self) -> ChunkResult { - if let Some(id) = self.schema.metadata.get(Self::CHUNK_METADATA_KEY_ID) { + if let Some(id) = self.metadata().get(Self::CHUNK_METADATA_KEY_ID) { let id = u128::from_str_radix(id, 16).map_err(|err| ChunkError::Malformed { reason: format!("cannot deserialize chunk id: {err}"), })?; Ok(ChunkId::from_u128(id)) } else { Err(crate::ChunkError::Malformed { - reason: format!( - "chunk id missing from metadata ({:?})", - self.schema.metadata - ), + reason: format!("chunk id missing from metadata ({:?})", self.metadata()), }) } } @@ -321,7 +294,7 @@ impl TransportChunk { #[inline] pub fn entity_path(&self) -> ChunkResult { match self - .schema + .schema_ref() .metadata .get(Self::CHUNK_METADATA_KEY_ENTITY_PATH) { @@ -329,7 +302,7 @@ impl TransportChunk { None => Err(crate::ChunkError::Malformed { reason: format!( "entity path missing from metadata ({:?})", - self.schema.metadata + self.schema_ref().metadata ), }), } @@ -337,24 +310,28 @@ impl TransportChunk { #[inline] pub fn heap_size_bytes(&self) -> Option { - self.schema - .metadata + self.metadata() .get(Self::CHUNK_METADATA_KEY_HEAP_SIZE_BYTES) .and_then(|s| s.parse::().ok()) } #[inline] pub fn schema(&self) -> ArrowSchemaRef { - self.schema.clone() + self.batch.schema() } #[inline] pub fn schema_ref(&self) -> &ArrowSchemaRef { - &self.schema + self.batch.schema_ref() + } + + #[inline] + pub fn fields(&self) -> &ArrowFields { + &self.schema_ref().fields } - pub fn insert_metadata(&mut self, key: String, value: String) { - Arc::make_mut(&mut self.schema).metadata.insert(key, value); + pub fn metadata(&self) -> &std::collections::HashMap { + &self.batch.schema_ref().metadata } /// Looks in the chunk metadata for the `IS_SORTED` marker. @@ -363,17 +340,10 @@ impl TransportChunk { /// This is fine, although wasteful. #[inline] pub fn is_sorted(&self) -> bool { - self.schema - .metadata + self.metadata() .contains_key(Self::CHUNK_METADATA_MARKER_IS_SORTED_BY_ROW_ID) } - /// Access specific column - #[inline] - pub fn column(&self, index: usize) -> Option<&dyn Arrow2Array> { - self.data.get(index).map(|c| c.as_ref()) - } - /// Iterates all columns of the specified `kind`. /// /// See: @@ -384,67 +354,52 @@ impl TransportChunk { fn columns_of_kind<'a>( &'a self, kind: &'a str, - ) -> impl Iterator)> + 'a { - self.schema - .fields - .iter() - .enumerate() - .filter_map(|(i, field)| { - let actual_kind = field.metadata().get(Self::FIELD_METADATA_KEY_KIND); - (actual_kind.map(|s| s.as_str()) == Some(kind)) - .then(|| { - self.data - .columns() - .get(i) - .map(|column| (field.as_ref(), column)) - }) - .flatten() - }) - } - - #[inline] - pub fn fields_and_columns( - &self, - ) -> impl Iterator)> + '_ { - self.schema - .fields - .iter() - .enumerate() - .filter_map(|(i, field)| { - self.data - .columns() - .get(i) - .map(|column| (field.as_ref(), column)) - }) + ) -> impl Iterator + 'a { + self.fields().iter().enumerate().filter_map(|(i, field)| { + let actual_kind = field.metadata().get(Self::FIELD_METADATA_KEY_KIND); + (actual_kind.map(|s| s.as_str()) == Some(kind)) + .then(|| { + self.batch + .columns() + .get(i) + .map(|column| (field.as_ref(), column)) + }) + .flatten() + }) } #[inline] - pub fn columns(&self) -> Vec<&dyn Arrow2Array> { - self.data.iter().map(|c| c.as_ref()).collect() + pub fn fields_and_columns(&self) -> impl Iterator + '_ { + self.fields().iter().enumerate().filter_map(|(i, field)| { + self.batch + .columns() + .get(i) + .map(|column| (field.as_ref(), column)) + }) } /// Iterates all control columns present in this chunk. #[inline] - pub fn controls(&self) -> impl Iterator)> { + pub fn controls(&self) -> impl Iterator { self.columns_of_kind(Self::FIELD_METADATA_VALUE_KIND_CONTROL) } /// Iterates all data columns present in this chunk. #[inline] - pub fn components(&self) -> impl Iterator)> { + pub fn components(&self) -> impl Iterator { self.columns_of_kind(Self::FIELD_METADATA_VALUE_KIND_DATA) } /// Iterates all timeline columns present in this chunk. #[inline] - pub fn timelines(&self) -> impl Iterator)> { + pub fn timelines(&self) -> impl Iterator { self.columns_of_kind(Self::FIELD_METADATA_VALUE_KIND_TIME) } /// How many columns in total? Includes control, time, and component columns. #[inline] pub fn num_columns(&self) -> usize { - self.data.columns().len() + self.batch.num_columns() } #[inline] @@ -464,7 +419,7 @@ impl TransportChunk { #[inline] pub fn num_rows(&self) -> usize { - self.data.len() + self.batch.num_rows() } } @@ -472,7 +427,7 @@ impl Chunk { /// Prepare the [`Chunk`] for transport. /// /// It is probably a good idea to sort the chunk first. - pub fn to_transport(&self) -> ChunkResult { + pub fn to_record_batch(&self) -> ChunkResult { self.sanity_check()?; re_tracing::profile_function!(format!( @@ -493,7 +448,7 @@ impl Chunk { let mut fields: Vec = vec![]; let mut metadata = std::collections::HashMap::default(); - let mut columns: Vec> = + let mut columns: Vec = Vec::with_capacity(1 /* row_ids */ + timelines.len() + components.len()); // Chunk-level metadata @@ -533,7 +488,7 @@ impl Chunk { }), ), ); - columns.push(into_arrow_ref(row_ids.clone()).into()); + columns.push(into_arrow_ref(row_ids.clone())); } // Timelines @@ -572,7 +527,7 @@ impl Chunk { for (field, times) in timelines { fields.push(field); - columns.push(times.into()); + columns.push(times); } } @@ -584,9 +539,11 @@ impl Chunk { .values() .flat_map(|per_desc| per_desc.iter()) .map(|(component_desc, list_array)| { + let list_array = ArrowListArray::from(list_array.clone()); + let field = ArrowField::new( component_desc.component_name.to_string(), - list_array.data_type().clone().into(), + list_array.data_type().clone(), true, ) .with_metadata({ @@ -597,9 +554,7 @@ impl Chunk { metadata }); - let data = list_array.clone().boxed(); - - (field, data) + (field, as_array_ref(list_array)) }) .collect_vec(); @@ -614,11 +569,19 @@ impl Chunk { let schema = ArrowSchema::new_with_metadata(fields, metadata); - Ok(TransportChunk::new(schema, Arrow2Chunk::new(columns))) + Ok(ArrowRecordBatch::try_new(schema.into(), columns)?) + } + + /// Prepare the [`Chunk`] for transport. + /// + /// It is probably a good idea to sort the chunk first. + pub fn to_transport(&self) -> ChunkResult { + let record_batch = self.to_record_batch()?; + Ok(TransportChunk::from(record_batch)) } pub fn from_record_batch(batch: ArrowRecordBatch) -> ChunkResult { - Self::from_transport(&batch.into()) + Self::from_transport(&TransportChunk::from(batch)) } pub fn from_transport(transport: &TransportChunk) -> ChunkResult { @@ -647,7 +610,7 @@ impl Chunk { (field.name() == RowId::descriptor().component_name.as_str()).then_some(column) }) else { return Err(ChunkError::Malformed { - reason: format!("missing row_id column ({:?})", transport.schema), + reason: format!("missing row_id column ({:?})", transport.schema_ref()), }); }; @@ -671,9 +634,9 @@ impl Chunk { for (field, column) in transport.timelines() { // See also [`Timeline::datatype`] - let timeline = match column.data_type().to_logical_type() { - Arrow2Datatype::Int64 => Timeline::new_sequence(field.name().as_str()), - Arrow2Datatype::Timestamp(Arrow2TimeUnit::Nanosecond, None) => { + let timeline = match column.data_type() { + ArrowDatatype::Int64 => Timeline::new_sequence(field.name().as_str()), + ArrowDatatype::Timestamp(ArrowTimeUnit::Nanosecond, None) => { Timeline::new_temporal(field.name().as_str()) } _ => { @@ -717,7 +680,7 @@ impl Chunk { for (field, column) in transport.components() { let column = column - .downcast_array2_ref::>() + .downcast_array_ref::() .ok_or_else(|| ChunkError::Malformed { reason: format!( "The outer array in a chunked component batch must be a sparse list, got {:?}", @@ -728,7 +691,7 @@ impl Chunk { let component_desc = TransportChunk::component_descriptor_from_field(field); if components - .insert_descriptor_arrow2(component_desc, column.clone()) + .insert_descriptor(component_desc, column.clone()) .is_some() { return Err(ChunkError::Malformed { @@ -782,7 +745,7 @@ impl Chunk { Ok(re_log_types::ArrowMsg { chunk_id: re_tuid::Tuid::from_u128(self.id().as_u128()), timepoint_max: self.timepoint_max(), - batch: transport.try_into()?, + batch: transport.into(), on_release: None, }) } @@ -871,11 +834,7 @@ mod tests { for _ in 0..3 { let chunk_in_transport = chunk_before.to_transport()?; - #[cfg(feature = "arrow")] - let chunk_after = - Chunk::from_record_batch(chunk_in_transport.try_to_arrow_record_batch()?)?; - #[cfg(not(feature = "arrow"))] - let chunk_after = Chunk::from_transport(&chunk_in_transport)?; + let chunk_after = Chunk::from_record_batch(chunk_in_transport.clone().into())?; assert_eq!( chunk_in_transport.entity_path()?, @@ -926,12 +885,7 @@ mod tests { eprintln!("{chunk_in_transport}"); eprintln!("{chunk_after}"); - #[cfg(not(feature = "arrow"))] - { - // This will fail when round-tripping all the way to record-batch - // the below check should always pass regardless. - assert_eq!(chunk_before, chunk_after); - } + assert_eq!(chunk_before, chunk_after); assert!(chunk_before.are_equal_ignoring_extension_types(&chunk_after)); diff --git a/crates/store/re_chunk_store/src/dataframe.rs b/crates/store/re_chunk_store/src/dataframe.rs index 0484798837cb0..3ff35d366dca3 100644 --- a/crates/store/re_chunk_store/src/dataframe.rs +++ b/crates/store/re_chunk_store/src/dataframe.rs @@ -4,6 +4,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::ops::Deref; use std::ops::DerefMut; +use arrow::datatypes::Field as ArrowField; use arrow2::{ array::ListArray as ArrowListArray, datatypes::{DataType as Arrow2Datatype, Field as Arrow2Field}, @@ -43,7 +44,7 @@ impl ColumnDescriptor { } #[inline] - pub fn datatype(&self) -> Arrow2Datatype { + pub fn arrow2_datatype(&self) -> Arrow2Datatype { match self { Self::Time(descr) => descr.datatype.clone(), Self::Component(descr) => descr.returned_datatype(), @@ -51,13 +52,21 @@ impl ColumnDescriptor { } #[inline] - pub fn to_arrow_field(&self) -> Arrow2Field { + pub fn to_arrow_field(&self) -> ArrowField { match self { Self::Time(descr) => descr.to_arrow_field(), Self::Component(descr) => descr.to_arrow_field(), } } + #[inline] + pub fn to_arrow2_field(&self) -> Arrow2Field { + match self { + Self::Time(descr) => descr.to_arrow2_field(), + Self::Component(descr) => descr.to_arrow2_field(), + } + } + #[inline] pub fn short_name(&self) -> String { match self { @@ -150,7 +159,12 @@ impl TimeColumnDescriptor { } #[inline] - pub fn to_arrow_field(&self) -> Arrow2Field { + pub fn to_arrow_field(&self) -> ArrowField { + self.to_arrow2_field().into() + } + + #[inline] + pub fn to_arrow2_field(&self) -> Arrow2Field { let Self { timeline, datatype } = self; let nullable = true; // Time column must be nullable since static data doesn't have a time. Arrow2Field::new(timeline.name().to_string(), datatype.clone(), nullable) @@ -351,7 +365,12 @@ impl ComponentColumnDescriptor { } #[inline] - pub fn to_arrow_field(&self) -> Arrow2Field { + pub fn to_arrow_field(&self) -> ArrowField { + self.to_arrow2_field().into() + } + + #[inline] + pub fn to_arrow2_field(&self) -> Arrow2Field { let entity_path = &self.entity_path; let descriptor = ComponentDescriptor { archetype_name: self.archetype_name, diff --git a/crates/store/re_dataframe/examples/query.rs b/crates/store/re_dataframe/examples/query.rs index f187319c21474..5d59f08da0f69 100644 --- a/crates/store/re_dataframe/examples/query.rs +++ b/crates/store/re_dataframe/examples/query.rs @@ -2,6 +2,7 @@ use itertools::Itertools; +use re_chunk::TransportChunk; use re_dataframe::{ ChunkStoreConfig, EntityPathFilter, QueryEngine, QueryExpression, ResolvedTimeRange, SparseFillStrategy, StoreKind, TimeInt, Timeline, @@ -70,7 +71,7 @@ fn main() -> anyhow::Result<()> { let query_handle = engine.query(query.clone()); // eprintln!("{:#?}", query_handle.selected_contents()); for batch in query_handle.into_batch_iter() { - eprintln!("{batch}"); + eprintln!("{}", TransportChunk::from(batch)); } } diff --git a/crates/store/re_dataframe/src/engine.rs b/crates/store/re_dataframe/src/engine.rs index 69a9ea6bf6ee4..6071881859381 100644 --- a/crates/store/re_dataframe/src/engine.rs +++ b/crates/store/re_dataframe/src/engine.rs @@ -1,6 +1,7 @@ use std::collections::BTreeMap; -use re_chunk::{EntityPath, TransportChunk}; +use arrow::array::RecordBatch as ArrowRecordBatch; +use re_chunk::EntityPath; use re_chunk_store::{ ChunkStore, ChunkStoreConfig, ChunkStoreHandle, ColumnDescriptor, QueryExpression, }; @@ -15,11 +16,7 @@ use re_chunk_store::ComponentColumnDescriptor; // --- -// TODO(#3741): `arrow2` has no concept of a `RecordBatch`, so for now we just use our trustworthy -// `TransportChunk` type until we migrate to `arrow-rs`. -// `TransportChunk` maps 1:1 to `RecordBatch` so the switch (and the compatibility layer in the meantime) -// will be trivial. -pub type RecordBatch = TransportChunk; +pub type RecordBatch = ArrowRecordBatch; // TODO: remove this // --- Queries --- diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index b57579be1f048..243f899a0b7bc 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -6,14 +6,15 @@ use std::{ }, }; -use arrow::buffer::ScalarBuffer as ArrowScalarBuffer; +use arrow::{ + array::RecordBatch as ArrowRecordBatch, buffer::ScalarBuffer as ArrowScalarBuffer, + datatypes::Fields as ArrowFields, datatypes::Schema as ArrowSchema, +}; use arrow2::{ array::{ Array as Arrow2Array, BooleanArray as Arrow2BooleanArray, PrimitiveArray as Arrow2PrimitiveArray, }, - chunk::Chunk as Arrow2Chunk, - datatypes::Schema as Arrow2Schema, Either, }; use itertools::Itertools; @@ -107,7 +108,7 @@ struct QueryHandleState { /// The Arrow schema that corresponds to the `selected_contents`. /// /// All returned rows will have this schema. - arrow_schema: Arrow2Schema, + arrow_schema: ArrowSchema, /// All the [`Chunk`]s included in the view contents. /// @@ -191,13 +192,13 @@ impl QueryHandle { // 3. Compute the Arrow schema of the selected components. // // Every result returned using this `QueryHandle` will match this schema exactly. - let arrow_schema = Arrow2Schema { - fields: selected_contents + let arrow2_schema = ArrowSchema::new_with_metadata( + selected_contents .iter() .map(|(_, descr)| descr.to_arrow_field()) - .collect_vec(), - metadata: Default::default(), - }; + .collect::(), + Default::default(), + ); // 4. Perform the query and keep track of all the relevant chunks. let query = { @@ -256,7 +257,7 @@ impl QueryHandle { // Only way this could fail is if the number of rows did not match. #[allow(clippy::unwrap_used)] chunk - .add_component( + .add_component_arrow2( re_types_core::ComponentDescriptor { component_name: descr.component_name, archetype_name: descr.archetype_name, @@ -359,7 +360,7 @@ impl QueryHandle { selected_contents, selected_static_values, filtered_index, - arrow_schema, + arrow_schema: arrow2_schema, view_chunks, cur_row: AtomicU64::new(0), unique_index_values, @@ -669,7 +670,7 @@ impl QueryHandle { /// /// Columns that do not yield any data will still be present in the results, filled with null values. #[inline] - pub fn schema(&self) -> &Arrow2Schema { + pub fn schema(&self) -> &ArrowSchema { &self.init().arrow_schema } @@ -794,7 +795,7 @@ impl QueryHandle { #[inline] pub fn next_row(&self) -> Option> { self.engine - .with(|store, cache| self._next_row(store, cache)) + .with(|store, cache| self._next_row_arrow2(store, cache)) .map(|vec| vec.into_iter().map(|a| a.into()).collect()) } @@ -826,7 +827,7 @@ impl QueryHandle { #[inline] fn next_row_arrow2(&self) -> Option>> { self.engine - .with(|store, cache| self._next_row(store, cache)) + .with(|store, cache| self._next_row_arrow2(store, cache)) } /// Asynchronously returns the next row's worth of data. @@ -845,7 +846,7 @@ impl QueryHandle { /// } /// ``` #[cfg(not(target_arch = "wasm32"))] - pub fn next_row_async( + pub fn next_row_async_arrow2( &self, ) -> impl std::future::Future>>> where @@ -853,7 +854,7 @@ impl QueryHandle { { let res: Option> = self .engine - .try_with(|store, cache| self._next_row(store, cache)); + .try_with(|store, cache| self._next_row_arrow2(store, cache)); let engine = self.engine.clone(); std::future::poll_fn(move |cx| { @@ -883,7 +884,7 @@ impl QueryHandle { }) } - pub fn _next_row( + pub fn _next_row_arrow2( &self, store: &ChunkStore, cache: &QueryCache, @@ -1240,7 +1241,7 @@ impl QueryHandle { .map(|(view_idx, column)| match column { ColumnDescriptor::Time(descr) => { max_value_per_index.get(&descr.timeline()).map_or_else( - || arrow2::array::new_null_array(column.datatype(), 1), + || arrow2::array::new_null_array(column.arrow2_datatype(), 1), |(_time, time_sliced)| { descr.typ().make_arrow_array(time_sliced.clone()).into() }, @@ -1251,7 +1252,7 @@ impl QueryHandle { .get(*view_idx) .cloned() .flatten() - .unwrap_or_else(|| arrow2::array::new_null_array(column.datatype(), 1)), + .unwrap_or_else(|| arrow2::array::new_null_array(column.arrow2_datatype(), 1)), }) .collect_vec(); @@ -1268,10 +1269,8 @@ impl QueryHandle { /// See [`Self::next_row`] for more information. #[inline] pub fn next_row_batch(&self) -> Option { - Some(RecordBatch::new( - self.schema().clone(), - Arrow2Chunk::new(self.next_row_arrow2()?), - )) + let row = self.next_row()?; + ArrowRecordBatch::try_new(self.schema().clone().into(), row).ok() } #[inline] @@ -1280,13 +1279,14 @@ impl QueryHandle { where E: 'static + Send + Clone, { - let row = self.next_row_async().await?; + let row = self.next_row_async_arrow2().await?; // If we managed to get a row, then the state must be initialized already. #[allow(clippy::unwrap_used)] let schema = self.state.get().unwrap().arrow_schema.clone(); - Some(RecordBatch::new(schema, Arrow2Chunk::new(row))) + // TODO(#3741): remove the collect + ArrowRecordBatch::try_new(schema.into(), row.into_iter().map(|a| a.into()).collect()).ok() } } @@ -1402,9 +1402,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // temporal @@ -1424,9 +1424,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -1458,9 +1458,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); Ok(()) } @@ -1491,9 +1491,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); Ok(()) } @@ -1530,9 +1530,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); Ok(()) } @@ -1572,9 +1572,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // sparse-filled @@ -1602,9 +1602,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -1643,9 +1643,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // non-existing component @@ -1669,9 +1669,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // MyPoint @@ -1695,9 +1695,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // MyColor @@ -1721,9 +1721,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -1763,9 +1763,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } { @@ -1800,9 +1800,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -1838,9 +1838,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // only indices (+ duplication) @@ -1871,9 +1871,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // only components (+ duplication) @@ -1911,9 +1911,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // static @@ -1972,9 +1972,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -2041,9 +2041,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -2081,9 +2081,9 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // sparse-filled @@ -2105,13 +2105,13 @@ mod tests { query_handle.schema().clone(), &query_handle.into_batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); // TODO(#7650): Those null values for `MyColor` on 10 and 20 look completely insane, but then again // static clear semantics in general are pretty unhinged right now, especially when // ranges are involved. - // It's extremely niche, our time is better spent somewhere else right now. - assert_snapshot_fixed_width!(dataframe); + + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -2302,7 +2302,7 @@ mod tests { pub struct QueryHandleStream(pub QueryHandle); impl tokio_stream::Stream for QueryHandleStream { - type Item = TransportChunk; + type Item = ArrowRecordBatch; #[inline] fn poll_next( @@ -2347,9 +2347,12 @@ mod tests { .collect::>() .await, )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!("async_barebones_static", dataframe); + assert_snapshot_fixed_width!( + "async_barebones_static", + TransportChunk::from(dataframe) + ); Ok::<_, anyhow::Error>(()) } @@ -2378,9 +2381,12 @@ mod tests { .collect::>() .await, )?; - eprintln!("{dataframe}"); + eprintln!("{}", TransportChunk::from(dataframe.clone())); - assert_snapshot_fixed_width!("async_barebones_temporal", dataframe); + assert_snapshot_fixed_width!( + "async_barebones_temporal", + TransportChunk::from(dataframe) + ); Ok::<_, anyhow::Error>(()) } diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 62ee365c1a9ec..63471503527c2 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -1,43 +1,49 @@ //! Communications with an Rerun Data Platform gRPC server. -mod address; - -pub use address::{InvalidRedapAddress, RedapAddress}; -use re_arrow_util::Arrow2ArrayDowncastRef as _; -use re_chunk::external::arrow2; -use re_log_encoding::codec::wire::decoder::Decode; -use re_log_types::external::re_types_core::ComponentDescriptor; -use re_protos::remote_store::v0::CatalogFilter; -use re_types::blueprint::archetypes::{ContainerBlueprint, ViewportBlueprint}; -use re_types::blueprint::archetypes::{ViewBlueprint, ViewContents}; -use re_types::blueprint::components::{ContainerKind, RootContainer}; -use re_types::components::RecordingUri; -use re_types::external::uuid; -use re_types::{Archetype, Component}; +use std::{collections::HashMap, error::Error, sync::Arc}; + +use arrow::{ + array::{ + Array as ArrowArray, ArrayRef as ArrowArrayRef, RecordBatch as ArrowRecordBatch, + StringArray as ArrowStringArray, + }, + datatypes::{DataType as ArrowDataType, Field as ArrowField}, +}; use url::Url; -// ---------------------------------------------------------------------------- - -use std::sync::Arc; -use std::{collections::HashMap, error::Error}; - -use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField}; -use arrow2::array::Utf8Array as Arrow2Utf8Array; -use re_chunk::{ - Arrow2Array, Chunk, ChunkBuilder, ChunkId, EntityPath, RowId, Timeline, TransportChunk, -}; -use re_log_encoding::codec::CodecError; +use re_arrow_util::ArrowArrayDowncastRef as _; +use re_chunk::{Chunk, ChunkBuilder, ChunkId, EntityPath, RowId, Timeline, TransportChunk}; +use re_log_encoding::codec::{wire::decoder::Decode, CodecError}; use re_log_types::{ - ApplicationId, BlueprintActivationCommand, EntityPathFilter, LogMsg, SetStoreInfo, StoreId, - StoreInfo, StoreKind, StoreSource, Time, + external::re_types_core::ComponentDescriptor, ApplicationId, BlueprintActivationCommand, + EntityPathFilter, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time, +}; +use re_protos::{ + common::v0::RecordingId, + remote_store::v0::{ + storage_node_client::StorageNodeClient, CatalogFilter, FetchRecordingRequest, + QueryCatalogRequest, + }, }; -use re_protos::common::v0::RecordingId; -use re_protos::remote_store::v0::{ - storage_node_client::StorageNodeClient, FetchRecordingRequest, QueryCatalogRequest, +use re_types::{ + arrow_helpers::as_array_ref, + blueprint::{ + archetypes::{ContainerBlueprint, ViewBlueprint, ViewContents, ViewportBlueprint}, + components::{ContainerKind, RootContainer}, + }, + components::RecordingUri, + external::uuid, + Archetype, Component, }; // ---------------------------------------------------------------------------- +mod address; + +pub use address::{InvalidRedapAddress, RedapAddress}; + +// ---------------------------------------------------------------------------- + /// Wrapper with a nicer error message #[derive(Debug)] pub struct TonicStatusError(pub tonic::Status); @@ -281,7 +287,7 @@ pub fn store_info_from_catalog_chunk( reason: "no application_id field found".to_owned(), }))?; let app_id = data - .downcast_array2_ref::>() + .downcast_array_ref::() .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { reason: format!("application_id must be a utf8 array: {:?}", tc.schema_ref()), }))? @@ -294,7 +300,7 @@ pub fn store_info_from_catalog_chunk( reason: "no start_time field found".to_owned(), }))?; let start_time = data - .downcast_array2_ref::() + .downcast_array_ref::() .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { reason: format!("start_time must be an int64 array: {:?}", tc.schema_ref()), }))? @@ -391,8 +397,8 @@ async fn stream_catalog_async( // - conversion expects "data" columns to be ListArrays, hence we need to convert any individual row column data to ListArray // - conversion expects the input TransportChunk to have a ChunkId so we need to add that piece of metadata - let mut fields = Vec::new(); - let mut arrays = Vec::new(); + let mut fields: Vec = Vec::new(); + let mut columns: Vec = Vec::new(); // add the (row id) control field let (row_id_field, row_id_data) = input.controls().next().ok_or( StreamError::ChunkError(re_chunk::ChunkError::Malformed { @@ -408,12 +414,12 @@ async fn stream_catalog_async( ) .with_metadata(TransportChunk::field_metadata_control_column()), ); - arrays.push(row_id_data.clone()); + columns.push(row_id_data.clone()); // next add any timeline field for (field, data) in input.timelines() { fields.push(field.clone()); - arrays.push(data.clone()); + columns.push(data.clone()); } // now add all the 'data' fields - we slice each column array into individual arrays and then convert the whole lot into a ListArray @@ -428,41 +434,41 @@ async fn stream_catalog_async( ) .with_metadata(TransportChunk::field_metadata_data_column()); - let mut sliced: Vec> = Vec::new(); + let mut sliced: Vec = Vec::new(); for idx in 0..data.len() { - let mut array = data.clone(); - array.slice(idx, 1); - sliced.push(array); + sliced.push(data.clone().slice(idx, 1)); } let data_arrays = sliced.iter().map(|e| Some(e.as_ref())).collect::>(); #[allow(clippy::unwrap_used)] // we know we've given the right field type - let data_field_array: arrow2::array::ListArray = - re_arrow_util::arrow2_util::arrays_to_list_array( - data_field_inner.data_type().clone().into(), + let data_field_array: arrow::array::ListArray = + re_arrow_util::arrow_util::arrays_to_list_array( + data_field_inner.data_type().clone(), &data_arrays, ) .unwrap(); fields.push(data_field); - arrays.push(Box::new(data_field_array)); + columns.push(as_array_ref(data_field_array)); } let schema = { - let metadata = HashMap::from_iter([( - TransportChunk::CHUNK_METADATA_KEY_ENTITY_PATH.to_owned(), - "catalog".to_owned(), - )]); + let metadata = HashMap::from_iter([ + ( + TransportChunk::CHUNK_METADATA_KEY_ENTITY_PATH.to_owned(), + "catalog".to_owned(), + ), + ( + TransportChunk::CHUNK_METADATA_KEY_ID.to_owned(), + ChunkId::new().to_string(), + ), + ]); arrow::datatypes::Schema::new_with_metadata(fields, metadata) }; - // modified and enriched TransportChunk - let mut tc = TransportChunk::new(schema, arrow2::chunk::Chunk::new(arrays)); - tc.insert_metadata( - TransportChunk::CHUNK_METADATA_KEY_ID.to_owned(), - ChunkId::new().to_string(), - ); - let mut chunk = Chunk::from_transport(&tc)?; + let record_batch = ArrowRecordBatch::try_new(schema.into(), columns) + .map_err(re_chunk::ChunkError::from)?; + let mut chunk = Chunk::from_record_batch(record_batch)?; // finally, enrich catalog data with RecordingUri that's based on the ReDap endpoint (that we know) // and the recording id (that we have in the catalog data) @@ -477,20 +483,16 @@ async fn stream_catalog_async( "couldn't get port from {redap_endpoint}" )))?; - let recording_uri_arrays: Vec> = chunk + let recording_uri_arrays: Vec = chunk .iter_slices::("id".into()) .map(|id| { let rec_id = &id[0]; // each component batch is of length 1 i.e. single 'id' value let recording_uri = format!("rerun://{host}:{port}/recording/{rec_id}"); - let recording_uri_data = Arrow2Utf8Array::::from([Some(recording_uri)]); - - Ok::, StreamError>( - Box::new(recording_uri_data) as Box - ) + as_array_ref(ArrowStringArray::from(vec![recording_uri])) }) - .collect::, _>>()?; + .collect(); let recording_id_arrays = recording_uri_arrays .iter() @@ -499,8 +501,8 @@ async fn stream_catalog_async( let rec_id_field = ArrowField::new("item", ArrowDataType::Utf8, true); #[allow(clippy::unwrap_used)] // we know we've given the right field type - let uris = re_arrow_util::arrow2_util::arrays_to_list_array( - rec_id_field.data_type().clone().into(), + let uris = re_arrow_util::arrow_util::arrays_to_list_array( + rec_id_field.data_type().clone(), &recording_id_arrays, ) .unwrap(); diff --git a/crates/store/re_log_encoding/src/codec/wire/decoder.rs b/crates/store/re_log_encoding/src/codec/wire/decoder.rs index 8df9ae1ca0e05..e8f3c83040be8 100644 --- a/crates/store/re_log_encoding/src/codec/wire/decoder.rs +++ b/crates/store/re_log_encoding/src/codec/wire/decoder.rs @@ -13,7 +13,7 @@ fn decode( re_protos::common::v0::EncoderVersion::V0 => { let mut reader = std::io::Cursor::new(data); let batch = read_arrow_from_bytes(&mut reader)?; - Ok(batch.into()) + Ok(TransportChunk::from(batch)) } } } diff --git a/crates/store/re_log_encoding/src/codec/wire/encoder.rs b/crates/store/re_log_encoding/src/codec/wire/encoder.rs index 58c8a95dca4ca..aaf10c2882463 100644 --- a/crates/store/re_log_encoding/src/codec/wire/encoder.rs +++ b/crates/store/re_log_encoding/src/codec/wire/encoder.rs @@ -9,12 +9,10 @@ fn encode( version: re_protos::common::v0::EncoderVersion, chunk: &TransportChunk, ) -> Result, CodecError> { - let transport_chunk = - arrow::array::RecordBatch::try_from(chunk.clone()).map_err(CodecError::InvalidChunk)?; match version { re_protos::common::v0::EncoderVersion::V0 => { let mut data: Vec = Vec::new(); - write_arrow_to_bytes(&mut data, &transport_chunk)?; + write_arrow_to_bytes(&mut data, chunk)?; Ok(data) } } diff --git a/docs/snippets/all/reference/dataframe_query.rs b/docs/snippets/all/reference/dataframe_query.rs index 2513c46440dc5..3f1a54596c2d9 100644 --- a/docs/snippets/all/reference/dataframe_query.rs +++ b/docs/snippets/all/reference/dataframe_query.rs @@ -30,7 +30,7 @@ fn main() -> Result<(), Box> { let query_handle = engine.query(query.clone()); for row in query_handle.batch_iter().take(10) { // Each row is a `RecordBatch`, which can be easily passed around across different data ecosystems. - println!("{row}"); + println!("{}", rerun::log::TransportChunk::from(row)); } Ok(()) diff --git a/examples/rust/dataframe_query/src/main.rs b/examples/rust/dataframe_query/src/main.rs index d451c830da932..0ffc8f64d2252 100644 --- a/examples/rust/dataframe_query/src/main.rs +++ b/examples/rust/dataframe_query/src/main.rs @@ -7,6 +7,7 @@ use rerun::{ concatenate_record_batches, EntityPathFilter, QueryEngine, QueryExpression, SparseFillStrategy, Timeline, }, + log::TransportChunk, ChunkStoreConfig, StoreKind, VersionPolicy, }; @@ -68,8 +69,8 @@ fn main() -> Result<(), Box> { let query_handle = engine.query(query.clone()); let record_batches = query_handle.batch_iter().take(10).collect_vec(); - let table = concatenate_record_batches(query_handle.schema().clone(), &record_batches)?; - println!("{table}"); + let batch = concatenate_record_batches(query_handle.schema().clone(), &record_batches)?; + println!("{}", TransportChunk::from(batch)); } Ok(()) diff --git a/rerun_py/Cargo.toml b/rerun_py/Cargo.toml index 0cc66e0e91a29..646e2b86b1d3f 100644 --- a/rerun_py/Cargo.toml +++ b/rerun_py/Cargo.toml @@ -60,7 +60,7 @@ web_viewer = [ [dependencies] re_arrow_util.workspace = true re_build_info.workspace = true -re_chunk = { workspace = true, features = ["arrow"] } +re_chunk.workspace = true re_chunk_store.workspace = true re_dataframe.workspace = true re_grpc_client = { workspace = true, optional = true } diff --git a/rerun_py/src/dataframe.rs b/rerun_py/src/dataframe.rs index 79b8ba2f82d24..e2463f5d811e3 100644 --- a/rerun_py/src/dataframe.rs +++ b/rerun_py/src/dataframe.rs @@ -753,16 +753,10 @@ impl PyRecordingView { py_rerun_warn("RecordingView::select: tried to select static data, but no non-static contents generated an index value on this timeline. No results will be returned. Either include non-static data or consider using `select_static()` instead.")?; } - let schema = query_handle.schema(); - let fields: Vec = - schema.fields.iter().map(|f| f.clone().into()).collect(); - let metadata = schema.metadata.clone().into_iter().collect(); - let schema = arrow::datatypes::Schema::new_with_metadata(fields, metadata); + let schema = query_handle.schema().clone(); let reader = RecordBatchIterator::new( - query_handle - .into_batch_iter() - .map(|batch| batch.try_to_arrow_record_batch()), + query_handle.into_batch_iter().map(Ok), std::sync::Arc::new(schema), ); Ok(PyArrowType(Box::new(reader))) @@ -850,18 +844,10 @@ impl PyRecordingView { ))); } - let schema = query_handle.schema(); - let fields: Vec = - schema.fields.iter().map(|f| f.clone().into()).collect(); - let metadata = schema.metadata.clone().into_iter().collect(); - let schema = arrow::datatypes::Schema::new_with_metadata(fields, metadata); + let schema = query_handle.schema().clone(); - let reader = RecordBatchIterator::new( - query_handle - .into_batch_iter() - .map(|batch| batch.try_to_arrow_record_batch()), - std::sync::Arc::new(schema), - ); + let reader = + RecordBatchIterator::new(query_handle.into_batch_iter().map(Ok), schema.into()); Ok(PyArrowType(Box::new(reader))) } diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index bc408ba725703..9de1077266133 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -15,7 +15,7 @@ use pyo3::{ types::PyDict, Bound, PyResult, }; -use re_arrow_util::Arrow2ArrayDowncastRef as _; +use re_arrow_util::ArrowArrayDowncastRef as _; use re_chunk::{Chunk, TransportChunk}; use re_chunk_store::ChunkStore; use re_dataframe::{ChunkStoreHandle, QueryExpression, SparseFillStrategy, ViewContentsSelector}; @@ -173,7 +173,7 @@ impl PyStorageNodeClient { .unwrap_or_else(|| ArrowSchema::empty().into()); Ok(RecordBatchIterator::new( - batches.into_iter().map(|tc| tc.try_to_arrow_record_batch()), + batches.into_iter().map(|tc| Ok(tc.into())), schema, )) }); @@ -236,7 +236,7 @@ impl PyStorageNodeClient { let record_batches: Vec> = transport_chunks .into_iter() - .map(|tc| tc.try_to_arrow_record_batch()) + .map(|tc| Ok(tc.into())) .collect(); // TODO(jleibs): surfacing this schema is awkward. This should be more explicit in @@ -321,7 +321,7 @@ impl PyStorageNodeClient { )); } - let metadata_tc = TransportChunk::from_arrow_record_batch(&metadata); + let metadata_tc = TransportChunk::from(metadata); metadata_tc .encode() .map_err(|err| PyRuntimeError::new_err(err.to_string())) @@ -351,7 +351,7 @@ impl PyStorageNodeClient { .find(|(field, _data)| field.name() == "rerun_recording_id") .map(|(_field, data)| data) .ok_or(PyRuntimeError::new_err("No rerun_recording_id"))? - .downcast_array2_ref::>() + .downcast_array_ref::() .ok_or(PyRuntimeError::new_err("Recording Id is not a string"))? .value(0) .to_owned(); @@ -388,7 +388,7 @@ impl PyStorageNodeClient { )); } - let metadata_tc = TransportChunk::from_arrow_record_batch(&metadata); + let metadata_tc = TransportChunk::from(metadata); let request = UpdateCatalogRequest { metadata: Some(