diff --git a/Cargo.lock b/Cargo.lock index 75faee5fd61a..250bf25c9fdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5921,6 +5921,7 @@ dependencies = [ "re_arrow_util", "re_chunk", "re_chunk_store", + "re_format_arrow", "re_log", "re_log_encoding", "re_log_types", @@ -7322,6 +7323,7 @@ name = "rerun" version = "0.22.0-alpha.1+dev" dependencies = [ "anyhow", + "arrow", "clap", "document-features", "env_logger", @@ -7342,6 +7344,7 @@ dependencies = [ "re_entity_db", "re_error", "re_format", + "re_format_arrow", "re_log", "re_log_encoding", "re_log_types", diff --git a/crates/store/re_chunk/Cargo.toml b/crates/store/re_chunk/Cargo.toml index d3e285ca2a53..1907fa355e48 100644 --- a/crates/store/re_chunk/Cargo.toml +++ b/crates/store/re_chunk/Cargo.toml @@ -30,9 +30,6 @@ serde = [ "re_types_core/serde", ] -## Enable conversion to and from arrow-rs types -arrow = ["arrow2/arrow"] - [dependencies] diff --git a/crates/store/re_chunk/src/arrow.rs b/crates/store/re_chunk/src/arrow.rs deleted file mode 100644 index 4802c2170484..000000000000 --- a/crates/store/re_chunk/src/arrow.rs +++ /dev/null @@ -1,40 +0,0 @@ -use arrow::{ - array::{make_array, RecordBatch}, - error::ArrowError, -}; - -use crate::TransportChunk; - -impl TransportChunk { - /// Create an arrow-rs [`RecordBatch`] containing the data from this [`TransportChunk`]. - /// - /// This is a "fairly" cheap operation, as it does not copy the underlying arrow data, - /// but does incur overhead of generating an alternative representation of the arrow- - /// related rust structures that refer to those data buffers. - pub fn try_to_arrow_record_batch(&self) -> Result { - let columns: Vec<_> = self - .columns() - .iter() - .map(|arr2_array| make_array(arrow2::array::to_data(*arr2_array))) - .collect(); - - RecordBatch::try_new(self.schema(), columns) - } - - /// Create a [`TransportChunk`] from an arrow-rs [`RecordBatch`]. - /// - /// This is a "fairly" cheap operation, as it does not copy the underlying arrow data, - /// but does incur overhead of generating an alternative representation of the arrow- - /// related rust structures that refer to those data buffers. - pub fn from_arrow_record_batch(batch: &RecordBatch) -> Self { - let columns: Vec<_> = batch - .columns() - .iter() - .map(|array| arrow2::array::from_data(&array.to_data())) - .collect(); - - let data = arrow2::chunk::Chunk::new(columns); - - Self::new(batch.schema(), data) - } -} diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index 72827a1b81e5..31cdc5fe5fdc 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -956,13 +956,29 @@ impl Chunk { } } - /// Unconditionally inserts an [`Arrow2ListArray`] as a component column. + /// Unconditionally inserts an [`ArrowListArray`] as a component column. /// /// Removes and replaces the column if it already exists. /// /// This will fail if the end result is malformed in any way -- see [`Self::sanity_check`]. #[inline] pub fn add_component( + &mut self, + component_desc: ComponentDescriptor, + list_array: ArrowListArray, + ) -> ChunkResult<()> { + self.components + .insert_descriptor(component_desc, list_array); + self.sanity_check() + } + + /// Unconditionally inserts an [`Arrow2ListArray`] as a component column. + /// + /// Removes and replaces the column if it already exists. + /// + /// This will fail if the end result is malformed in any way -- see [`Self::sanity_check`]. + #[inline] + pub fn add_component_arrow2( &mut self, component_desc: ComponentDescriptor, list_array: Arrow2ListArray, diff --git a/crates/store/re_chunk/src/concat_record_batches.rs b/crates/store/re_chunk/src/concat_record_batches.rs deleted file mode 100644 index beae93c6a655..000000000000 --- a/crates/store/re_chunk/src/concat_record_batches.rs +++ /dev/null @@ -1,40 +0,0 @@ -use crate::TransportChunk; - -use arrow::datatypes::Schema as ArrowSchema; -use arrow2::chunk::Chunk as Arrow2Chunk; - -/// Concatenate multiple [`TransportChunk`]s into one. -/// -/// This is a temporary method that we use while waiting to migrate towards `arrow-rs`. -/// * `arrow2` doesn't have a `RecordBatch` type, therefore we emulate that using our `TransportChunk`s. -/// * `arrow-rs` does have one, and it natively supports concatenation. -pub fn concatenate_record_batches( - schema: impl Into, - batches: &[TransportChunk], -) -> anyhow::Result { - let schema: ArrowSchema = schema.into(); - anyhow::ensure!( - batches - .iter() - .all(|batch| batch.schema_ref().as_ref() == &schema), - "concatenate_record_batches: all batches must have the same schema" - ); - - let mut output_columns = Vec::new(); - - if !batches.is_empty() { - for (i, _field) in schema.fields.iter().enumerate() { - let arrays: Option> = batches.iter().map(|batch| batch.column(i)).collect(); - let arrays = arrays.ok_or_else(|| { - anyhow::anyhow!("concatenate_record_batches: all batches must have the same schema") - })?; - let array = re_arrow_util::arrow2_util::concat_arrays(&arrays)?; - output_columns.push(array); - } - } - - Ok(TransportChunk::new( - schema, - Arrow2Chunk::new(output_columns), - )) -} diff --git a/crates/store/re_chunk/src/lib.rs b/crates/store/re_chunk/src/lib.rs index 30ff3c5820da..c02fbbdbf75b 100644 --- a/crates/store/re_chunk/src/lib.rs +++ b/crates/store/re_chunk/src/lib.rs @@ -6,7 +6,6 @@ mod builder; mod chunk; -pub mod concat_record_batches; mod helpers; mod id; mod iter; @@ -21,9 +20,6 @@ mod transport; #[cfg(not(target_arch = "wasm32"))] mod batcher; -#[cfg(feature = "arrow")] -mod arrow; - pub use self::builder::{ChunkBuilder, TimeColumnBuilder}; pub use self::chunk::{ Chunk, ChunkComponents, ChunkError, ChunkResult, TimeColumn, TimeColumnError, diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index d157c8b52f98..90ea338e21d9 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -1,26 +1,23 @@ -use std::sync::Arc; - use arrow::{ array::{ - ArrayRef as ArrowArrayRef, RecordBatch as ArrowRecordBatch, StructArray as ArrowStructArray, + Array as ArrowArray, ArrayRef as ArrowArrayRef, ListArray as ArrowListArray, + RecordBatch as ArrowRecordBatch, StructArray as ArrowStructArray, + }, + datatypes::{ + DataType as ArrowDatatype, Field as ArrowField, Fields as ArrowFields, + Schema as ArrowSchema, TimeUnit as ArrowTimeUnit, }, - datatypes::{Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}, -}; -use arrow2::{ - array::{Array as Arrow2Array, ListArray as Arrow2ListArray}, - chunk::Chunk as Arrow2Chunk, - datatypes::{DataType as Arrow2Datatype, TimeUnit as Arrow2TimeUnit}, }; use itertools::Itertools; use nohash_hasher::IntMap; use tap::Tap as _; -use re_arrow_util::{ - arrow_util::into_arrow_ref, Arrow2ArrayDowncastRef as _, ArrowArrayDowncastRef as _, -}; +use re_arrow_util::{arrow_util::into_arrow_ref, ArrowArrayDowncastRef as _}; use re_byte_size::SizeBytes as _; use re_log_types::{EntityPath, Timeline}; -use re_types_core::{Component as _, ComponentDescriptor, Loggable as _}; +use re_types_core::{ + arrow_helpers::as_array_ref, Component as _, ComponentDescriptor, Loggable as _, +}; use crate::{chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, RowId, TimeColumn}; @@ -30,7 +27,10 @@ pub type ArrowMetadata = std::collections::HashMap; /// A [`Chunk`] that is ready for transport. Obtained by calling [`Chunk::to_transport`]. /// -/// Implemented as an Arrow dataframe: a schema and a batch. +/// It contains a schema with a matching number of columns, all of the same length. +/// +/// This is just a wrapper around an [`ArrowRecordBatch`], with some helper functions on top. +/// It can be converted to and from [`ArrowRecordBatch`] without overhead. /// /// Use the `Display` implementation to dump the chunk as a nicely formatted table. /// @@ -39,75 +39,46 @@ pub type ArrowMetadata = std::collections::HashMap; /// This means we have to be very careful when checking the validity of the data: slipping corrupt /// data into the store could silently break all the index search logic (e.g. think of a chunk /// claiming to be sorted while it is in fact not). +// TODO(emilk): remove this, and replace it with a trait extension type for `ArrowRecordBatch`. #[derive(Debug, Clone)] pub struct TransportChunk { - /// The schema of the dataframe, and all chunk-level and field-level metadata. - /// - /// Take a look at the `TransportChunk::CHUNK_METADATA_*` and `TransportChunk::FIELD_METADATA_*` - /// constants for more information about available metadata. - schema: ArrowSchemaRef, - - /// All the control, time and component data. - data: Arrow2Chunk>, + batch: ArrowRecordBatch, } impl std::fmt::Display for TransportChunk { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // TODO(#3741): simplify code when we have migrated to arrow-rs - re_format_arrow::format_dataframe( - &self.schema.metadata.clone().into_iter().collect(), - &self.schema.fields, - &self - .data - .iter() - .map(|list_array| ArrowArrayRef::from(list_array.clone())) - .collect_vec(), - f.width(), - ) - .fmt(f) + re_format_arrow::format_record_batch_with_width(self, f.width()).fmt(f) } } -impl TransportChunk { - pub fn new( - schema: impl Into, - columns: impl Into>>, - ) -> Self { - Self { - schema: schema.into(), - data: columns.into(), - } +impl AsRef for TransportChunk { + #[inline] + fn as_ref(&self) -> &ArrowRecordBatch { + &self.batch + } +} + +impl std::ops::Deref for TransportChunk { + type Target = ArrowRecordBatch; + + #[inline] + fn deref(&self) -> &ArrowRecordBatch { + &self.batch } } impl From for TransportChunk { + #[inline] fn from(batch: ArrowRecordBatch) -> Self { - Self::new( - batch.schema(), - Arrow2Chunk::new( - batch - .columns() - .iter() - .map(|column| column.clone().into()) - .collect(), - ), - ) + Self { batch } } } -impl TryFrom for ArrowRecordBatch { - type Error = arrow::error::ArrowError; - - fn try_from(chunk: TransportChunk) -> Result { - let TransportChunk { schema, data } = chunk; - Self::try_new( - schema, - data.columns() - .iter() - .map(|column| column.clone().into()) - .collect(), - ) +impl From for ArrowRecordBatch { + #[inline] + fn from(chunk: TransportChunk) -> Self { + chunk.batch } } @@ -303,17 +274,14 @@ impl TransportChunk { impl TransportChunk { #[inline] pub fn id(&self) -> ChunkResult { - if let Some(id) = self.schema.metadata.get(Self::CHUNK_METADATA_KEY_ID) { + if let Some(id) = self.metadata().get(Self::CHUNK_METADATA_KEY_ID) { let id = u128::from_str_radix(id, 16).map_err(|err| ChunkError::Malformed { reason: format!("cannot deserialize chunk id: {err}"), })?; Ok(ChunkId::from_u128(id)) } else { Err(crate::ChunkError::Malformed { - reason: format!( - "chunk id missing from metadata ({:?})", - self.schema.metadata - ), + reason: format!("chunk id missing from metadata ({:?})", self.metadata()), }) } } @@ -321,7 +289,7 @@ impl TransportChunk { #[inline] pub fn entity_path(&self) -> ChunkResult { match self - .schema + .schema_ref() .metadata .get(Self::CHUNK_METADATA_KEY_ENTITY_PATH) { @@ -329,32 +297,29 @@ impl TransportChunk { None => Err(crate::ChunkError::Malformed { reason: format!( "entity path missing from metadata ({:?})", - self.schema.metadata + self.schema_ref().metadata ), }), } } + /// The size in bytes of the data, once loaded in memory, in chunk-level. + /// + /// This is stored in the metadata. Returns `None` if that key is not set. #[inline] pub fn heap_size_bytes(&self) -> Option { - self.schema - .metadata + self.metadata() .get(Self::CHUNK_METADATA_KEY_HEAP_SIZE_BYTES) .and_then(|s| s.parse::().ok()) } #[inline] - pub fn schema(&self) -> ArrowSchemaRef { - self.schema.clone() + pub fn fields(&self) -> &ArrowFields { + &self.schema_ref().fields } - #[inline] - pub fn schema_ref(&self) -> &ArrowSchemaRef { - &self.schema - } - - pub fn insert_metadata(&mut self, key: String, value: String) { - Arc::make_mut(&mut self.schema).metadata.insert(key, value); + pub fn metadata(&self) -> &std::collections::HashMap { + &self.batch.schema_ref().metadata } /// Looks in the chunk metadata for the `IS_SORTED` marker. @@ -363,17 +328,10 @@ impl TransportChunk { /// This is fine, although wasteful. #[inline] pub fn is_sorted(&self) -> bool { - self.schema - .metadata + self.metadata() .contains_key(Self::CHUNK_METADATA_MARKER_IS_SORTED_BY_ROW_ID) } - /// Access specific column - #[inline] - pub fn column(&self, index: usize) -> Option<&dyn Arrow2Array> { - self.data.get(index).map(|c| c.as_ref()) - } - /// Iterates all columns of the specified `kind`. /// /// See: @@ -384,69 +342,48 @@ impl TransportChunk { fn columns_of_kind<'a>( &'a self, kind: &'a str, - ) -> impl Iterator)> + 'a { - self.schema - .fields - .iter() - .enumerate() - .filter_map(|(i, field)| { - let actual_kind = field.metadata().get(Self::FIELD_METADATA_KEY_KIND); - (actual_kind.map(|s| s.as_str()) == Some(kind)) - .then(|| { - self.data - .columns() - .get(i) - .map(|column| (field.as_ref(), column)) - }) - .flatten() - }) - } - - #[inline] - pub fn fields_and_columns( - &self, - ) -> impl Iterator)> + '_ { - self.schema - .fields - .iter() - .enumerate() - .filter_map(|(i, field)| { - self.data - .columns() - .get(i) - .map(|column| (field.as_ref(), column)) - }) + ) -> impl Iterator + 'a { + self.fields().iter().enumerate().filter_map(|(i, field)| { + let actual_kind = field.metadata().get(Self::FIELD_METADATA_KEY_KIND); + (actual_kind.map(|s| s.as_str()) == Some(kind)) + .then(|| { + self.batch + .columns() + .get(i) + .map(|column| (field.as_ref(), column)) + }) + .flatten() + }) } #[inline] - pub fn columns(&self) -> Vec<&dyn Arrow2Array> { - self.data.iter().map(|c| c.as_ref()).collect() + pub fn fields_and_columns(&self) -> impl Iterator + '_ { + self.fields().iter().enumerate().filter_map(|(i, field)| { + self.batch + .columns() + .get(i) + .map(|column| (field.as_ref(), column)) + }) } /// Iterates all control columns present in this chunk. #[inline] - pub fn controls(&self) -> impl Iterator)> { + pub fn controls(&self) -> impl Iterator { self.columns_of_kind(Self::FIELD_METADATA_VALUE_KIND_CONTROL) } /// Iterates all data columns present in this chunk. #[inline] - pub fn components(&self) -> impl Iterator)> { + pub fn components(&self) -> impl Iterator { self.columns_of_kind(Self::FIELD_METADATA_VALUE_KIND_DATA) } /// Iterates all timeline columns present in this chunk. #[inline] - pub fn timelines(&self) -> impl Iterator)> { + pub fn timelines(&self) -> impl Iterator { self.columns_of_kind(Self::FIELD_METADATA_VALUE_KIND_TIME) } - /// How many columns in total? Includes control, time, and component columns. - #[inline] - pub fn num_columns(&self) -> usize { - self.data.columns().len() - } - #[inline] pub fn num_controls(&self) -> usize { self.controls().count() @@ -461,18 +398,13 @@ impl TransportChunk { pub fn num_components(&self) -> usize { self.components().count() } - - #[inline] - pub fn num_rows(&self) -> usize { - self.data.len() - } } impl Chunk { /// Prepare the [`Chunk`] for transport. /// /// It is probably a good idea to sort the chunk first. - pub fn to_transport(&self) -> ChunkResult { + pub fn to_record_batch(&self) -> ChunkResult { self.sanity_check()?; re_tracing::profile_function!(format!( @@ -493,7 +425,7 @@ impl Chunk { let mut fields: Vec = vec![]; let mut metadata = std::collections::HashMap::default(); - let mut columns: Vec> = + let mut columns: Vec = Vec::with_capacity(1 /* row_ids */ + timelines.len() + components.len()); // Chunk-level metadata @@ -533,7 +465,7 @@ impl Chunk { }), ), ); - columns.push(into_arrow_ref(row_ids.clone()).into()); + columns.push(into_arrow_ref(row_ids.clone())); } // Timelines @@ -572,7 +504,7 @@ impl Chunk { for (field, times) in timelines { fields.push(field); - columns.push(times.into()); + columns.push(times); } } @@ -584,9 +516,11 @@ impl Chunk { .values() .flat_map(|per_desc| per_desc.iter()) .map(|(component_desc, list_array)| { + let list_array = ArrowListArray::from(list_array.clone()); + let field = ArrowField::new( component_desc.component_name.to_string(), - list_array.data_type().clone().into(), + list_array.data_type().clone(), true, ) .with_metadata({ @@ -597,9 +531,7 @@ impl Chunk { metadata }); - let data = list_array.clone().boxed(); - - (field, data) + (field, as_array_ref(list_array)) }) .collect_vec(); @@ -614,11 +546,19 @@ impl Chunk { let schema = ArrowSchema::new_with_metadata(fields, metadata); - Ok(TransportChunk::new(schema, Arrow2Chunk::new(columns))) + Ok(ArrowRecordBatch::try_new(schema.into(), columns)?) + } + + /// Prepare the [`Chunk`] for transport. + /// + /// It is probably a good idea to sort the chunk first. + pub fn to_transport(&self) -> ChunkResult { + let record_batch = self.to_record_batch()?; + Ok(TransportChunk::from(record_batch)) } pub fn from_record_batch(batch: ArrowRecordBatch) -> ChunkResult { - Self::from_transport(&batch.into()) + Self::from_transport(&TransportChunk::from(batch)) } pub fn from_transport(transport: &TransportChunk) -> ChunkResult { @@ -647,7 +587,7 @@ impl Chunk { (field.name() == RowId::descriptor().component_name.as_str()).then_some(column) }) else { return Err(ChunkError::Malformed { - reason: format!("missing row_id column ({:?})", transport.schema), + reason: format!("missing row_id column ({:?})", transport.schema_ref()), }); }; @@ -671,9 +611,9 @@ impl Chunk { for (field, column) in transport.timelines() { // See also [`Timeline::datatype`] - let timeline = match column.data_type().to_logical_type() { - Arrow2Datatype::Int64 => Timeline::new_sequence(field.name().as_str()), - Arrow2Datatype::Timestamp(Arrow2TimeUnit::Nanosecond, None) => { + let timeline = match column.data_type() { + ArrowDatatype::Int64 => Timeline::new_sequence(field.name().as_str()), + ArrowDatatype::Timestamp(ArrowTimeUnit::Nanosecond, None) => { Timeline::new_temporal(field.name().as_str()) } _ => { @@ -717,7 +657,7 @@ impl Chunk { for (field, column) in transport.components() { let column = column - .downcast_array2_ref::>() + .downcast_array_ref::() .ok_or_else(|| ChunkError::Malformed { reason: format!( "The outer array in a chunked component batch must be a sparse list, got {:?}", @@ -728,7 +668,7 @@ impl Chunk { let component_desc = TransportChunk::component_descriptor_from_field(field); if components - .insert_descriptor_arrow2(component_desc, column.clone()) + .insert_descriptor(component_desc, column.clone()) .is_some() { return Err(ChunkError::Malformed { @@ -782,7 +722,7 @@ impl Chunk { Ok(re_log_types::ArrowMsg { chunk_id: re_tuid::Tuid::from_u128(self.id().as_u128()), timepoint_max: self.timepoint_max(), - batch: transport.try_into()?, + batch: transport.into(), on_release: None, }) } @@ -871,11 +811,7 @@ mod tests { for _ in 0..3 { let chunk_in_transport = chunk_before.to_transport()?; - #[cfg(feature = "arrow")] - let chunk_after = - Chunk::from_record_batch(chunk_in_transport.try_to_arrow_record_batch()?)?; - #[cfg(not(feature = "arrow"))] - let chunk_after = Chunk::from_transport(&chunk_in_transport)?; + let chunk_after = Chunk::from_record_batch(chunk_in_transport.clone().into())?; assert_eq!( chunk_in_transport.entity_path()?, @@ -926,12 +862,7 @@ mod tests { eprintln!("{chunk_in_transport}"); eprintln!("{chunk_after}"); - #[cfg(not(feature = "arrow"))] - { - // This will fail when round-tripping all the way to record-batch - // the below check should always pass regardless. - assert_eq!(chunk_before, chunk_after); - } + assert_eq!(chunk_before, chunk_after); assert!(chunk_before.are_equal_ignoring_extension_types(&chunk_after)); diff --git a/crates/store/re_chunk_store/src/dataframe.rs b/crates/store/re_chunk_store/src/dataframe.rs index 0484798837cb..3ff35d366dca 100644 --- a/crates/store/re_chunk_store/src/dataframe.rs +++ b/crates/store/re_chunk_store/src/dataframe.rs @@ -4,6 +4,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::ops::Deref; use std::ops::DerefMut; +use arrow::datatypes::Field as ArrowField; use arrow2::{ array::ListArray as ArrowListArray, datatypes::{DataType as Arrow2Datatype, Field as Arrow2Field}, @@ -43,7 +44,7 @@ impl ColumnDescriptor { } #[inline] - pub fn datatype(&self) -> Arrow2Datatype { + pub fn arrow2_datatype(&self) -> Arrow2Datatype { match self { Self::Time(descr) => descr.datatype.clone(), Self::Component(descr) => descr.returned_datatype(), @@ -51,13 +52,21 @@ impl ColumnDescriptor { } #[inline] - pub fn to_arrow_field(&self) -> Arrow2Field { + pub fn to_arrow_field(&self) -> ArrowField { match self { Self::Time(descr) => descr.to_arrow_field(), Self::Component(descr) => descr.to_arrow_field(), } } + #[inline] + pub fn to_arrow2_field(&self) -> Arrow2Field { + match self { + Self::Time(descr) => descr.to_arrow2_field(), + Self::Component(descr) => descr.to_arrow2_field(), + } + } + #[inline] pub fn short_name(&self) -> String { match self { @@ -150,7 +159,12 @@ impl TimeColumnDescriptor { } #[inline] - pub fn to_arrow_field(&self) -> Arrow2Field { + pub fn to_arrow_field(&self) -> ArrowField { + self.to_arrow2_field().into() + } + + #[inline] + pub fn to_arrow2_field(&self) -> Arrow2Field { let Self { timeline, datatype } = self; let nullable = true; // Time column must be nullable since static data doesn't have a time. Arrow2Field::new(timeline.name().to_string(), datatype.clone(), nullable) @@ -351,7 +365,12 @@ impl ComponentColumnDescriptor { } #[inline] - pub fn to_arrow_field(&self) -> Arrow2Field { + pub fn to_arrow_field(&self) -> ArrowField { + self.to_arrow2_field().into() + } + + #[inline] + pub fn to_arrow2_field(&self) -> Arrow2Field { let entity_path = &self.entity_path; let descriptor = ComponentDescriptor { archetype_name: self.archetype_name, diff --git a/crates/store/re_dataframe/Cargo.toml b/crates/store/re_dataframe/Cargo.toml index cd62b928e2b0..8574d7690baf 100644 --- a/crates/store/re_dataframe/Cargo.toml +++ b/crates/store/re_dataframe/Cargo.toml @@ -29,6 +29,7 @@ default = [] re_arrow_util.workspace = true re_chunk.workspace = true re_chunk_store.workspace = true +re_format_arrow.workspace = true re_log.workspace = true re_log_encoding.workspace = true re_log_types.workspace = true diff --git a/crates/store/re_dataframe/examples/query.rs b/crates/store/re_dataframe/examples/query.rs index f187319c2147..9589452ed691 100644 --- a/crates/store/re_dataframe/examples/query.rs +++ b/crates/store/re_dataframe/examples/query.rs @@ -6,6 +6,7 @@ use re_dataframe::{ ChunkStoreConfig, EntityPathFilter, QueryEngine, QueryExpression, ResolvedTimeRange, SparseFillStrategy, StoreKind, TimeInt, Timeline, }; +use re_format_arrow::format_record_batch; use re_log_encoding::VersionPolicy; fn main() -> anyhow::Result<()> { @@ -70,7 +71,7 @@ fn main() -> anyhow::Result<()> { let query_handle = engine.query(query.clone()); // eprintln!("{:#?}", query_handle.selected_contents()); for batch in query_handle.into_batch_iter() { - eprintln!("{batch}"); + eprintln!("{}", format_record_batch(&batch)); } } diff --git a/crates/store/re_dataframe/src/engine.rs b/crates/store/re_dataframe/src/engine.rs index 69a9ea6bf6ee..71d15b35506a 100644 --- a/crates/store/re_dataframe/src/engine.rs +++ b/crates/store/re_dataframe/src/engine.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use re_chunk::{EntityPath, TransportChunk}; +use re_chunk::EntityPath; use re_chunk_store::{ ChunkStore, ChunkStoreConfig, ChunkStoreHandle, ColumnDescriptor, QueryExpression, }; @@ -13,14 +13,6 @@ use crate::QueryHandle; #[allow(unused_imports)] use re_chunk_store::ComponentColumnDescriptor; -// --- - -// TODO(#3741): `arrow2` has no concept of a `RecordBatch`, so for now we just use our trustworthy -// `TransportChunk` type until we migrate to `arrow-rs`. -// `TransportChunk` maps 1:1 to `RecordBatch` so the switch (and the compatibility layer in the meantime) -// will be trivial. -pub type RecordBatch = TransportChunk; - // --- Queries --- /// A handle to our user-facing query engine. diff --git a/crates/store/re_dataframe/src/lib.rs b/crates/store/re_dataframe/src/lib.rs index 58e2d4c4dddf..7453cf74d66b 100644 --- a/crates/store/re_dataframe/src/lib.rs +++ b/crates/store/re_dataframe/src/lib.rs @@ -3,15 +3,11 @@ mod engine; mod query; -pub use self::engine::{QueryEngine, RecordBatch}; +pub use self::engine::QueryEngine; pub use self::query::QueryHandle; #[doc(no_inline)] -pub use self::external::arrow2::chunk::Chunk as Arrow2Chunk; -#[doc(no_inline)] -pub use self::external::re_chunk::{ - concat_record_batches::concatenate_record_batches, TransportChunk, -}; +pub use self::external::re_chunk::TransportChunk; #[doc(no_inline)] pub use self::external::re_chunk_store::{ ChunkStoreConfig, ChunkStoreHandle, ColumnSelector, ComponentColumnSelector, Index, IndexRange, diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index b57579be1f04..e3b3313fa6d2 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -6,19 +6,21 @@ use std::{ }, }; -use arrow::buffer::ScalarBuffer as ArrowScalarBuffer; +use arrow::{ + array::RecordBatch as ArrowRecordBatch, buffer::ScalarBuffer as ArrowScalarBuffer, + datatypes::Fields as ArrowFields, datatypes::Schema as ArrowSchema, + datatypes::SchemaRef as ArrowSchemaRef, +}; use arrow2::{ array::{ Array as Arrow2Array, BooleanArray as Arrow2BooleanArray, PrimitiveArray as Arrow2PrimitiveArray, }, - chunk::Chunk as Arrow2Chunk, - datatypes::Schema as Arrow2Schema, Either, }; use itertools::Itertools; - use nohash_hasher::{IntMap, IntSet}; + use re_arrow_util::Arrow2ArrayDowncastRef as _; use re_chunk::{ external::arrow::array::ArrayRef, Chunk, ComponentName, EntityPath, RangeQuery, RowId, TimeInt, @@ -33,8 +35,6 @@ use re_log_types::ResolvedTimeRange; use re_query::{QueryCache, StorageEngineLike}; use re_types_core::{components::ClearIsRecursive, ComponentDescriptor}; -use crate::RecordBatch; - // --- // TODO(cmc): (no specific order) (should we make issues for these?) @@ -107,7 +107,7 @@ struct QueryHandleState { /// The Arrow schema that corresponds to the `selected_contents`. /// /// All returned rows will have this schema. - arrow_schema: Arrow2Schema, + arrow_schema: ArrowSchemaRef, /// All the [`Chunk`]s included in the view contents. /// @@ -191,13 +191,13 @@ impl QueryHandle { // 3. Compute the Arrow schema of the selected components. // // Every result returned using this `QueryHandle` will match this schema exactly. - let arrow_schema = Arrow2Schema { - fields: selected_contents + let arrow_schema = ArrowSchemaRef::from(ArrowSchema::new_with_metadata( + selected_contents .iter() .map(|(_, descr)| descr.to_arrow_field()) - .collect_vec(), - metadata: Default::default(), - }; + .collect::(), + Default::default(), + )); // 4. Perform the query and keep track of all the relevant chunks. let query = { @@ -256,7 +256,7 @@ impl QueryHandle { // Only way this could fail is if the number of rows did not match. #[allow(clippy::unwrap_used)] chunk - .add_component( + .add_component_arrow2( re_types_core::ComponentDescriptor { component_name: descr.component_name, archetype_name: descr.archetype_name, @@ -669,7 +669,7 @@ impl QueryHandle { /// /// Columns that do not yield any data will still be present in the results, filled with null values. #[inline] - pub fn schema(&self) -> &Arrow2Schema { + pub fn schema(&self) -> &ArrowSchemaRef { &self.init().arrow_schema } @@ -794,7 +794,7 @@ impl QueryHandle { #[inline] pub fn next_row(&self) -> Option> { self.engine - .with(|store, cache| self._next_row(store, cache)) + .with(|store, cache| self._next_row_arrow2(store, cache)) .map(|vec| vec.into_iter().map(|a| a.into()).collect()) } @@ -826,7 +826,7 @@ impl QueryHandle { #[inline] fn next_row_arrow2(&self) -> Option>> { self.engine - .with(|store, cache| self._next_row(store, cache)) + .with(|store, cache| self._next_row_arrow2(store, cache)) } /// Asynchronously returns the next row's worth of data. @@ -845,7 +845,7 @@ impl QueryHandle { /// } /// ``` #[cfg(not(target_arch = "wasm32"))] - pub fn next_row_async( + pub fn next_row_async_arrow2( &self, ) -> impl std::future::Future>>> where @@ -853,7 +853,7 @@ impl QueryHandle { { let res: Option> = self .engine - .try_with(|store, cache| self._next_row(store, cache)); + .try_with(|store, cache| self._next_row_arrow2(store, cache)); let engine = self.engine.clone(); std::future::poll_fn(move |cx| { @@ -883,7 +883,7 @@ impl QueryHandle { }) } - pub fn _next_row( + pub fn _next_row_arrow2( &self, store: &ChunkStore, cache: &QueryCache, @@ -1240,7 +1240,7 @@ impl QueryHandle { .map(|(view_idx, column)| match column { ColumnDescriptor::Time(descr) => { max_value_per_index.get(&descr.timeline()).map_or_else( - || arrow2::array::new_null_array(column.datatype(), 1), + || arrow2::array::new_null_array(column.arrow2_datatype(), 1), |(_time, time_sliced)| { descr.typ().make_arrow_array(time_sliced.clone()).into() }, @@ -1251,7 +1251,7 @@ impl QueryHandle { .get(*view_idx) .cloned() .flatten() - .unwrap_or_else(|| arrow2::array::new_null_array(column.datatype(), 1)), + .unwrap_or_else(|| arrow2::array::new_null_array(column.arrow2_datatype(), 1)), }) .collect_vec(); @@ -1260,33 +1260,32 @@ impl QueryHandle { Some(selected_arrays) } - /// Calls [`Self::next_row`] and wraps the result in a [`RecordBatch`]. + /// Calls [`Self::next_row`] and wraps the result in a [`ArrowRecordBatch`]. /// - /// Only use this if you absolutely need a [`RecordBatch`] as this adds a lot of allocation - /// overhead. + /// Only use this if you absolutely need a [`RecordBatch`] as this adds a + /// some overhead for schema validation. /// /// See [`Self::next_row`] for more information. #[inline] - pub fn next_row_batch(&self) -> Option { - Some(RecordBatch::new( - self.schema().clone(), - Arrow2Chunk::new(self.next_row_arrow2()?), - )) + pub fn next_row_batch(&self) -> Option { + let row = self.next_row()?; + ArrowRecordBatch::try_new(self.schema().clone(), row).ok() } #[inline] #[cfg(not(target_arch = "wasm32"))] - pub async fn next_row_batch_async(&self) -> Option + pub async fn next_row_batch_async(&self) -> Option where E: 'static + Send + Clone, { - let row = self.next_row_async().await?; + let row = self.next_row_async_arrow2().await?; // If we managed to get a row, then the state must be initialized already. #[allow(clippy::unwrap_used)] let schema = self.state.get().unwrap().arrow_schema.clone(); - Some(RecordBatch::new(schema, Arrow2Chunk::new(row))) + // TODO(#3741): remove the collect + ArrowRecordBatch::try_new(schema, row.into_iter().map(|a| a.into()).collect()).ok() } } @@ -1305,13 +1304,13 @@ impl QueryHandle { /// Returns an iterator backed by [`Self::next_row_batch`]. #[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work - pub fn batch_iter(&self) -> impl Iterator + '_ { + pub fn batch_iter(&self) -> impl Iterator + '_ { std::iter::from_fn(move || self.next_row_batch()) } /// Returns an iterator backed by [`Self::next_row_batch`]. #[allow(clippy::should_implement_trait)] // we need an anonymous closure, this won't work - pub fn into_batch_iter(self) -> impl Iterator { + pub fn into_batch_iter(self) -> impl Iterator { std::iter::from_fn(move || self.next_row_batch()) } } @@ -1323,13 +1322,12 @@ impl QueryHandle { mod tests { use std::sync::Arc; - use re_chunk::{ - concat_record_batches::concatenate_record_batches, Chunk, ChunkId, RowId, TimePoint, - TransportChunk, - }; + use arrow::compute::concat_batches; + use re_chunk::{Chunk, ChunkId, RowId, TimePoint, TransportChunk}; use re_chunk_store::{ ChunkStore, ChunkStoreConfig, ChunkStoreHandle, ResolvedTimeRange, TimeInt, }; + use re_format_arrow::format_record_batch; use re_log_types::{ build_frame_nr, build_log_time, example_components::{MyColor, MyLabel, MyPoint}, @@ -1398,13 +1396,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // temporal @@ -1420,13 +1418,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -1454,13 +1452,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); Ok(()) } @@ -1487,13 +1485,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); Ok(()) } @@ -1526,13 +1524,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); Ok(()) } @@ -1568,13 +1566,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // sparse-filled @@ -1598,13 +1596,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -1639,13 +1637,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // non-existing component @@ -1665,13 +1663,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // MyPoint @@ -1691,13 +1689,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // MyColor @@ -1717,13 +1715,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -1759,13 +1757,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } { @@ -1796,13 +1794,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -1834,13 +1832,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // only indices (+ duplication) @@ -1867,13 +1865,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // only components (+ duplication) @@ -1907,13 +1905,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // static @@ -1968,13 +1966,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -2037,13 +2035,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -2077,13 +2075,13 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!(dataframe); + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } // sparse-filled @@ -2101,17 +2099,17 @@ mod tests { query_engine.query(query.clone()).into_iter().count() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), - &query_handle.into_batch_iter().collect_vec(), + let dataframe = concat_batches( + query_handle.schema(), + &query_handle.batch_iter().collect_vec(), )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); // TODO(#7650): Those null values for `MyColor` on 10 and 20 look completely insane, but then again // static clear semantics in general are pretty unhinged right now, especially when // ranges are involved. - // It's extremely niche, our time is better spent somewhere else right now. - assert_snapshot_fixed_width!(dataframe); + + assert_snapshot_fixed_width!(TransportChunk::from(dataframe)); } Ok(()) @@ -2149,12 +2147,12 @@ mod tests { for i in 0..expected_rows.len() { query_handle.seek_to_row(i); - let expected = concatenate_record_batches( - query_handle.schema().clone(), + let expected = concat_batches( + query_handle.schema(), &expected_rows.iter().skip(i).take(3).cloned().collect_vec(), )?; - let got = concatenate_record_batches( - query_handle.schema().clone(), + let got = concat_batches( + query_handle.schema(), &query_handle.batch_iter().take(3).collect_vec(), )?; @@ -2190,12 +2188,12 @@ mod tests { for i in 0..expected_rows.len() { query_handle.seek_to_row(i); - let expected = concatenate_record_batches( - query_handle.schema().clone(), + let expected = concat_batches( + query_handle.schema(), &expected_rows.iter().skip(i).take(3).cloned().collect_vec(), )?; - let got = concatenate_record_batches( - query_handle.schema().clone(), + let got = concat_batches( + query_handle.schema(), &query_handle.batch_iter().take(3).collect_vec(), )?; @@ -2234,12 +2232,12 @@ mod tests { for i in 0..expected_rows.len() { query_handle.seek_to_row(i); - let expected = concatenate_record_batches( - query_handle.schema().clone(), + let expected = concat_batches( + query_handle.schema(), &expected_rows.iter().skip(i).take(3).cloned().collect_vec(), )?; - let got = concatenate_record_batches( - query_handle.schema().clone(), + let got = concat_batches( + query_handle.schema(), &query_handle.batch_iter().take(3).collect_vec(), )?; @@ -2272,12 +2270,12 @@ mod tests { for i in 0..expected_rows.len() { query_handle.seek_to_row(i); - let expected = concatenate_record_batches( - query_handle.schema().clone(), + let expected = concat_batches( + query_handle.schema(), &expected_rows.iter().skip(i).take(3).cloned().collect_vec(), )?; - let got = concatenate_record_batches( - query_handle.schema().clone(), + let got = concat_batches( + query_handle.schema(), &query_handle.batch_iter().take(3).collect_vec(), )?; @@ -2302,7 +2300,7 @@ mod tests { pub struct QueryHandleStream(pub QueryHandle); impl tokio_stream::Stream for QueryHandleStream { - type Item = TransportChunk; + type Item = ArrowRecordBatch; #[inline] fn poll_next( @@ -2341,15 +2339,18 @@ mod tests { .len() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), + let dataframe = concat_batches( + query_handle.schema(), &QueryHandleStream(query_engine.query(query.clone())) .collect::>() .await, )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!("async_barebones_static", dataframe); + assert_snapshot_fixed_width!( + "async_barebones_static", + TransportChunk::from(dataframe) + ); Ok::<_, anyhow::Error>(()) } @@ -2372,15 +2373,18 @@ mod tests { .len() as u64, query_handle.num_rows() ); - let dataframe = concatenate_record_batches( - query_handle.schema().clone(), + let dataframe = concat_batches( + query_handle.schema(), &QueryHandleStream(query_engine.query(query.clone())) .collect::>() .await, )?; - eprintln!("{dataframe}"); + eprintln!("{}", format_record_batch(&dataframe.clone())); - assert_snapshot_fixed_width!("async_barebones_temporal", dataframe); + assert_snapshot_fixed_width!( + "async_barebones_temporal", + TransportChunk::from(dataframe) + ); Ok::<_, anyhow::Error>(()) } diff --git a/crates/store/re_format_arrow/src/lib.rs b/crates/store/re_format_arrow/src/lib.rs index a796f6d91e5a..f9b80c626275 100644 --- a/crates/store/re_format_arrow/src/lib.rs +++ b/crates/store/re_format_arrow/src/lib.rs @@ -212,7 +212,25 @@ fn trim_name(name: &str) -> &str { .trim_start_matches("rerun.") } -pub fn format_dataframe( +/// Nicely format this record batch in a way that fits the terminal. +pub fn format_record_batch(batch: &arrow::array::RecordBatch) -> Table { + format_record_batch_with_width(batch, None) +} + +/// Nicely format this record batch, either with the given fixed width, or with the terminal width (`None`). +pub fn format_record_batch_with_width( + batch: &arrow::array::RecordBatch, + width: Option, +) -> Table { + format_dataframe( + &batch.schema_ref().metadata.clone().into_iter().collect(), // HashMap -> BTreeMap + &batch.schema_ref().fields, + batch.columns(), + width, + ) +} + +fn format_dataframe( metadata: &Metadata, fields: &Fields, columns: &[ArrayRef], diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 62ee365c1a9e..63471503527c 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -1,43 +1,49 @@ //! Communications with an Rerun Data Platform gRPC server. -mod address; - -pub use address::{InvalidRedapAddress, RedapAddress}; -use re_arrow_util::Arrow2ArrayDowncastRef as _; -use re_chunk::external::arrow2; -use re_log_encoding::codec::wire::decoder::Decode; -use re_log_types::external::re_types_core::ComponentDescriptor; -use re_protos::remote_store::v0::CatalogFilter; -use re_types::blueprint::archetypes::{ContainerBlueprint, ViewportBlueprint}; -use re_types::blueprint::archetypes::{ViewBlueprint, ViewContents}; -use re_types::blueprint::components::{ContainerKind, RootContainer}; -use re_types::components::RecordingUri; -use re_types::external::uuid; -use re_types::{Archetype, Component}; +use std::{collections::HashMap, error::Error, sync::Arc}; + +use arrow::{ + array::{ + Array as ArrowArray, ArrayRef as ArrowArrayRef, RecordBatch as ArrowRecordBatch, + StringArray as ArrowStringArray, + }, + datatypes::{DataType as ArrowDataType, Field as ArrowField}, +}; use url::Url; -// ---------------------------------------------------------------------------- - -use std::sync::Arc; -use std::{collections::HashMap, error::Error}; - -use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField}; -use arrow2::array::Utf8Array as Arrow2Utf8Array; -use re_chunk::{ - Arrow2Array, Chunk, ChunkBuilder, ChunkId, EntityPath, RowId, Timeline, TransportChunk, -}; -use re_log_encoding::codec::CodecError; +use re_arrow_util::ArrowArrayDowncastRef as _; +use re_chunk::{Chunk, ChunkBuilder, ChunkId, EntityPath, RowId, Timeline, TransportChunk}; +use re_log_encoding::codec::{wire::decoder::Decode, CodecError}; use re_log_types::{ - ApplicationId, BlueprintActivationCommand, EntityPathFilter, LogMsg, SetStoreInfo, StoreId, - StoreInfo, StoreKind, StoreSource, Time, + external::re_types_core::ComponentDescriptor, ApplicationId, BlueprintActivationCommand, + EntityPathFilter, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time, +}; +use re_protos::{ + common::v0::RecordingId, + remote_store::v0::{ + storage_node_client::StorageNodeClient, CatalogFilter, FetchRecordingRequest, + QueryCatalogRequest, + }, }; -use re_protos::common::v0::RecordingId; -use re_protos::remote_store::v0::{ - storage_node_client::StorageNodeClient, FetchRecordingRequest, QueryCatalogRequest, +use re_types::{ + arrow_helpers::as_array_ref, + blueprint::{ + archetypes::{ContainerBlueprint, ViewBlueprint, ViewContents, ViewportBlueprint}, + components::{ContainerKind, RootContainer}, + }, + components::RecordingUri, + external::uuid, + Archetype, Component, }; // ---------------------------------------------------------------------------- +mod address; + +pub use address::{InvalidRedapAddress, RedapAddress}; + +// ---------------------------------------------------------------------------- + /// Wrapper with a nicer error message #[derive(Debug)] pub struct TonicStatusError(pub tonic::Status); @@ -281,7 +287,7 @@ pub fn store_info_from_catalog_chunk( reason: "no application_id field found".to_owned(), }))?; let app_id = data - .downcast_array2_ref::>() + .downcast_array_ref::() .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { reason: format!("application_id must be a utf8 array: {:?}", tc.schema_ref()), }))? @@ -294,7 +300,7 @@ pub fn store_info_from_catalog_chunk( reason: "no start_time field found".to_owned(), }))?; let start_time = data - .downcast_array2_ref::() + .downcast_array_ref::() .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { reason: format!("start_time must be an int64 array: {:?}", tc.schema_ref()), }))? @@ -391,8 +397,8 @@ async fn stream_catalog_async( // - conversion expects "data" columns to be ListArrays, hence we need to convert any individual row column data to ListArray // - conversion expects the input TransportChunk to have a ChunkId so we need to add that piece of metadata - let mut fields = Vec::new(); - let mut arrays = Vec::new(); + let mut fields: Vec = Vec::new(); + let mut columns: Vec = Vec::new(); // add the (row id) control field let (row_id_field, row_id_data) = input.controls().next().ok_or( StreamError::ChunkError(re_chunk::ChunkError::Malformed { @@ -408,12 +414,12 @@ async fn stream_catalog_async( ) .with_metadata(TransportChunk::field_metadata_control_column()), ); - arrays.push(row_id_data.clone()); + columns.push(row_id_data.clone()); // next add any timeline field for (field, data) in input.timelines() { fields.push(field.clone()); - arrays.push(data.clone()); + columns.push(data.clone()); } // now add all the 'data' fields - we slice each column array into individual arrays and then convert the whole lot into a ListArray @@ -428,41 +434,41 @@ async fn stream_catalog_async( ) .with_metadata(TransportChunk::field_metadata_data_column()); - let mut sliced: Vec> = Vec::new(); + let mut sliced: Vec = Vec::new(); for idx in 0..data.len() { - let mut array = data.clone(); - array.slice(idx, 1); - sliced.push(array); + sliced.push(data.clone().slice(idx, 1)); } let data_arrays = sliced.iter().map(|e| Some(e.as_ref())).collect::>(); #[allow(clippy::unwrap_used)] // we know we've given the right field type - let data_field_array: arrow2::array::ListArray = - re_arrow_util::arrow2_util::arrays_to_list_array( - data_field_inner.data_type().clone().into(), + let data_field_array: arrow::array::ListArray = + re_arrow_util::arrow_util::arrays_to_list_array( + data_field_inner.data_type().clone(), &data_arrays, ) .unwrap(); fields.push(data_field); - arrays.push(Box::new(data_field_array)); + columns.push(as_array_ref(data_field_array)); } let schema = { - let metadata = HashMap::from_iter([( - TransportChunk::CHUNK_METADATA_KEY_ENTITY_PATH.to_owned(), - "catalog".to_owned(), - )]); + let metadata = HashMap::from_iter([ + ( + TransportChunk::CHUNK_METADATA_KEY_ENTITY_PATH.to_owned(), + "catalog".to_owned(), + ), + ( + TransportChunk::CHUNK_METADATA_KEY_ID.to_owned(), + ChunkId::new().to_string(), + ), + ]); arrow::datatypes::Schema::new_with_metadata(fields, metadata) }; - // modified and enriched TransportChunk - let mut tc = TransportChunk::new(schema, arrow2::chunk::Chunk::new(arrays)); - tc.insert_metadata( - TransportChunk::CHUNK_METADATA_KEY_ID.to_owned(), - ChunkId::new().to_string(), - ); - let mut chunk = Chunk::from_transport(&tc)?; + let record_batch = ArrowRecordBatch::try_new(schema.into(), columns) + .map_err(re_chunk::ChunkError::from)?; + let mut chunk = Chunk::from_record_batch(record_batch)?; // finally, enrich catalog data with RecordingUri that's based on the ReDap endpoint (that we know) // and the recording id (that we have in the catalog data) @@ -477,20 +483,16 @@ async fn stream_catalog_async( "couldn't get port from {redap_endpoint}" )))?; - let recording_uri_arrays: Vec> = chunk + let recording_uri_arrays: Vec = chunk .iter_slices::("id".into()) .map(|id| { let rec_id = &id[0]; // each component batch is of length 1 i.e. single 'id' value let recording_uri = format!("rerun://{host}:{port}/recording/{rec_id}"); - let recording_uri_data = Arrow2Utf8Array::::from([Some(recording_uri)]); - - Ok::, StreamError>( - Box::new(recording_uri_data) as Box - ) + as_array_ref(ArrowStringArray::from(vec![recording_uri])) }) - .collect::, _>>()?; + .collect(); let recording_id_arrays = recording_uri_arrays .iter() @@ -499,8 +501,8 @@ async fn stream_catalog_async( let rec_id_field = ArrowField::new("item", ArrowDataType::Utf8, true); #[allow(clippy::unwrap_used)] // we know we've given the right field type - let uris = re_arrow_util::arrow2_util::arrays_to_list_array( - rec_id_field.data_type().clone().into(), + let uris = re_arrow_util::arrow_util::arrays_to_list_array( + rec_id_field.data_type().clone(), &recording_id_arrays, ) .unwrap(); diff --git a/crates/store/re_log_encoding/src/codec/wire/decoder.rs b/crates/store/re_log_encoding/src/codec/wire/decoder.rs index 8df9ae1ca0e0..e8f3c83040be 100644 --- a/crates/store/re_log_encoding/src/codec/wire/decoder.rs +++ b/crates/store/re_log_encoding/src/codec/wire/decoder.rs @@ -13,7 +13,7 @@ fn decode( re_protos::common::v0::EncoderVersion::V0 => { let mut reader = std::io::Cursor::new(data); let batch = read_arrow_from_bytes(&mut reader)?; - Ok(batch.into()) + Ok(TransportChunk::from(batch)) } } } diff --git a/crates/store/re_log_encoding/src/codec/wire/encoder.rs b/crates/store/re_log_encoding/src/codec/wire/encoder.rs index 58c8a95dca4c..1c8b657fda50 100644 --- a/crates/store/re_log_encoding/src/codec/wire/encoder.rs +++ b/crates/store/re_log_encoding/src/codec/wire/encoder.rs @@ -1,20 +1,20 @@ -use crate::codec::arrow::write_arrow_to_bytes; -use crate::codec::CodecError; -use re_chunk::TransportChunk; use re_protos::common::v0::RerunChunk; use re_protos::remote_store::v0::DataframePart; +use arrow::array::RecordBatch as ArrowRecordBatch; + +use crate::codec::arrow::write_arrow_to_bytes; +use crate::codec::CodecError; + /// Encode a transport chunk into a byte stream. fn encode( version: re_protos::common::v0::EncoderVersion, - chunk: &TransportChunk, + batch: &ArrowRecordBatch, ) -> Result, CodecError> { - let transport_chunk = - arrow::array::RecordBatch::try_from(chunk.clone()).map_err(CodecError::InvalidChunk)?; match version { re_protos::common::v0::EncoderVersion::V0 => { let mut data: Vec = Vec::new(); - write_arrow_to_bytes(&mut data, &transport_chunk)?; + write_arrow_to_bytes(&mut data, batch)?; Ok(data) } } @@ -25,7 +25,7 @@ pub trait Encode { fn encode(&self) -> Result; } -impl Encode for TransportChunk { +impl Encode for ArrowRecordBatch { fn encode(&self) -> Result { let payload = encode(re_protos::common::v0::EncoderVersion::V0, self)?; Ok(DataframePart { @@ -35,7 +35,7 @@ impl Encode for TransportChunk { } } -impl Encode for TransportChunk { +impl Encode for ArrowRecordBatch { fn encode(&self) -> Result { let payload = encode(re_protos::common::v0::EncoderVersion::V0, self)?; Ok(RerunChunk { diff --git a/crates/store/re_log_types/src/arrow_msg.rs b/crates/store/re_log_types/src/arrow_msg.rs index 6a02c767f5f6..77259a7f9fe5 100644 --- a/crates/store/re_log_types/src/arrow_msg.rs +++ b/crates/store/re_log_types/src/arrow_msg.rs @@ -239,7 +239,7 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg { })?; if chunks.is_empty() { - return Err(serde::de::Error::custom("No Arrow2Chunk found in stream")); + return Err(serde::de::Error::custom("No chunks found in stream")); } if chunks.len() > 1 { return Err(serde::de::Error::custom(format!( diff --git a/crates/top/rerun/Cargo.toml b/crates/top/rerun/Cargo.toml index 5b4ee234beb2..1a3884cc097c 100644 --- a/crates/top/rerun/Cargo.toml +++ b/crates/top/rerun/Cargo.toml @@ -130,6 +130,7 @@ re_chunk.workspace = true re_crash_handler.workspace = true re_entity_db.workspace = true re_error.workspace = true +re_format_arrow.workspace = true re_format.workspace = true re_log_encoding.workspace = true re_log_types.workspace = true @@ -140,6 +141,7 @@ re_tracing.workspace = true re_video.workspace = true anyhow.workspace = true +arrow.workspace = true document-features.workspace = true itertools.workspace = true similar-asserts.workspace = true diff --git a/crates/top/rerun/src/lib.rs b/crates/top/rerun/src/lib.rs index cdfe26a44eb2..9b299c1e0ed6 100644 --- a/crates/top/rerun/src/lib.rs +++ b/crates/top/rerun/src/lib.rs @@ -166,11 +166,13 @@ pub const EXTERNAL_DATA_LOADER_INCOMPATIBLE_EXIT_CODE: i32 = 66; /// Re-exports of other crates. pub mod external { pub use anyhow; + pub use arrow; pub use ::re_build_info; pub use ::re_entity_db; pub use ::re_entity_db::external::*; pub use ::re_format; + pub use ::re_format_arrow; #[cfg(feature = "run")] pub use re_data_source; diff --git a/docs/snippets/all/reference/dataframe_query.rs b/docs/snippets/all/reference/dataframe_query.rs index 2513c46440dc..cbc56c1f2b34 100644 --- a/docs/snippets/all/reference/dataframe_query.rs +++ b/docs/snippets/all/reference/dataframe_query.rs @@ -2,6 +2,7 @@ use rerun::{ dataframe::{QueryEngine, QueryExpression, SparseFillStrategy, Timeline}, + external::re_format_arrow::format_record_batch, ChunkStoreConfig, VersionPolicy, }; @@ -30,7 +31,7 @@ fn main() -> Result<(), Box> { let query_handle = engine.query(query.clone()); for row in query_handle.batch_iter().take(10) { // Each row is a `RecordBatch`, which can be easily passed around across different data ecosystems. - println!("{row}"); + println!("{}", format_record_batch(&row)); } Ok(()) diff --git a/examples/rust/dataframe_query/src/main.rs b/examples/rust/dataframe_query/src/main.rs index d451c830da93..2c27b18d58cb 100644 --- a/examples/rust/dataframe_query/src/main.rs +++ b/examples/rust/dataframe_query/src/main.rs @@ -3,10 +3,9 @@ use itertools::Itertools; use rerun::{ - dataframe::{ - concatenate_record_batches, EntityPathFilter, QueryEngine, QueryExpression, - SparseFillStrategy, Timeline, - }, + dataframe::{EntityPathFilter, QueryEngine, QueryExpression, SparseFillStrategy, Timeline}, + external::arrow, + external::re_format_arrow::format_record_batch, ChunkStoreConfig, StoreKind, VersionPolicy, }; @@ -68,8 +67,8 @@ fn main() -> Result<(), Box> { let query_handle = engine.query(query.clone()); let record_batches = query_handle.batch_iter().take(10).collect_vec(); - let table = concatenate_record_batches(query_handle.schema().clone(), &record_batches)?; - println!("{table}"); + let batch = arrow::compute::concat_batches(query_handle.schema(), &record_batches)?; + println!("{}", format_record_batch(&batch)); } Ok(()) diff --git a/rerun_py/Cargo.toml b/rerun_py/Cargo.toml index 0cc66e0e91a2..646e2b86b1d3 100644 --- a/rerun_py/Cargo.toml +++ b/rerun_py/Cargo.toml @@ -60,7 +60,7 @@ web_viewer = [ [dependencies] re_arrow_util.workspace = true re_build_info.workspace = true -re_chunk = { workspace = true, features = ["arrow"] } +re_chunk.workspace = true re_chunk_store.workspace = true re_dataframe.workspace = true re_grpc_client = { workspace = true, optional = true } diff --git a/rerun_py/src/dataframe.rs b/rerun_py/src/dataframe.rs index 79b8ba2f82d2..01fd441f1fd3 100644 --- a/rerun_py/src/dataframe.rs +++ b/rerun_py/src/dataframe.rs @@ -753,18 +753,10 @@ impl PyRecordingView { py_rerun_warn("RecordingView::select: tried to select static data, but no non-static contents generated an index value on this timeline. No results will be returned. Either include non-static data or consider using `select_static()` instead.")?; } - let schema = query_handle.schema(); - let fields: Vec = - schema.fields.iter().map(|f| f.clone().into()).collect(); - let metadata = schema.metadata.clone().into_iter().collect(); - let schema = arrow::datatypes::Schema::new_with_metadata(fields, metadata); - - let reader = RecordBatchIterator::new( - query_handle - .into_batch_iter() - .map(|batch| batch.try_to_arrow_record_batch()), - std::sync::Arc::new(schema), - ); + let schema = query_handle.schema().clone(); + + let reader = + RecordBatchIterator::new(query_handle.into_batch_iter().map(Ok), schema); Ok(PyArrowType(Box::new(reader))) } #[cfg(feature = "remote")] @@ -850,18 +842,10 @@ impl PyRecordingView { ))); } - let schema = query_handle.schema(); - let fields: Vec = - schema.fields.iter().map(|f| f.clone().into()).collect(); - let metadata = schema.metadata.clone().into_iter().collect(); - let schema = arrow::datatypes::Schema::new_with_metadata(fields, metadata); - - let reader = RecordBatchIterator::new( - query_handle - .into_batch_iter() - .map(|batch| batch.try_to_arrow_record_batch()), - std::sync::Arc::new(schema), - ); + let schema = query_handle.schema().clone(); + + let reader = + RecordBatchIterator::new(query_handle.into_batch_iter().map(Ok), schema); Ok(PyArrowType(Box::new(reader))) } diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index bc408ba72570..5959a385b6a9 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -1,4 +1,4 @@ -#![allow(unsafe_op_in_unsafe_fn)] +#![allow(unsafe_op_in_unsafe_fn)] // False positive due to #[pyfunction] macro use std::collections::BTreeSet; @@ -8,15 +8,16 @@ use arrow::{ ffi_stream::ArrowArrayStreamReader, pyarrow::PyArrowType, }; -// False positive due to #[pyfunction] macro use pyo3::{ exceptions::{PyRuntimeError, PyTypeError, PyValueError}, prelude::*, types::PyDict, Bound, PyResult, }; -use re_arrow_util::Arrow2ArrayDowncastRef as _; -use re_chunk::{Chunk, TransportChunk}; +use tokio_stream::StreamExt; + +use re_arrow_util::ArrowArrayDowncastRef as _; +use re_chunk::Chunk; use re_chunk_store::ChunkStore; use re_dataframe::{ChunkStoreHandle, QueryExpression, SparseFillStrategy, ViewContentsSelector}; use re_grpc_client::TonicStatusError; @@ -32,7 +33,6 @@ use re_protos::{ TypeConversionError, }; use re_sdk::{ApplicationId, ComponentName, StoreId, StoreKind, Time, Timeline}; -use tokio_stream::StreamExt; use crate::dataframe::{ComponentLike, PyRecording, PyRecordingHandle, PyRecordingView, PySchema}; @@ -173,7 +173,7 @@ impl PyStorageNodeClient { .unwrap_or_else(|| ArrowSchema::empty().into()); Ok(RecordBatchIterator::new( - batches.into_iter().map(|tc| tc.try_to_arrow_record_batch()), + batches.into_iter().map(|tc| Ok(tc.into())), schema, )) }); @@ -236,7 +236,7 @@ impl PyStorageNodeClient { let record_batches: Vec> = transport_chunks .into_iter() - .map(|tc| tc.try_to_arrow_record_batch()) + .map(|tc| Ok(tc.into())) .collect(); // TODO(jleibs): surfacing this schema is awkward. This should be more explicit in @@ -321,8 +321,7 @@ impl PyStorageNodeClient { )); } - let metadata_tc = TransportChunk::from_arrow_record_batch(&metadata); - metadata_tc + metadata .encode() .map_err(|err| PyRuntimeError::new_err(err.to_string())) }) @@ -351,7 +350,7 @@ impl PyStorageNodeClient { .find(|(field, _data)| field.name() == "rerun_recording_id") .map(|(_field, data)| data) .ok_or(PyRuntimeError::new_err("No rerun_recording_id"))? - .downcast_array2_ref::>() + .downcast_array_ref::() .ok_or(PyRuntimeError::new_err("Recording Id is not a string"))? .value(0) .to_owned(); @@ -388,11 +387,9 @@ impl PyStorageNodeClient { )); } - let metadata_tc = TransportChunk::from_arrow_record_batch(&metadata); - let request = UpdateCatalogRequest { metadata: Some( - metadata_tc + metadata .encode() .map_err(|err| PyRuntimeError::new_err(err.to_string()))?, ),