Skip to content

Commit

Permalink
Port TransportChunk::schema to arrow-rs (#8687)
Browse files Browse the repository at this point in the history
* Part of #3741
  • Loading branch information
emilk authored Jan 14, 2025
1 parent d4f99a1 commit c07eb6e
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 211 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6009,6 +6009,7 @@ dependencies = [
name = "re_grpc_client"
version = "0.22.0-alpha.1+dev"
dependencies = [
"arrow",
"re_chunk",
"re_error",
"re_log",
Expand Down
33 changes: 4 additions & 29 deletions crates/store/re_chunk/src/arrow.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use arrow::{
array::{make_array, RecordBatch},
datatypes::{Field, Schema},
error::ArrowError,
};

Expand All @@ -13,28 +12,15 @@ impl TransportChunk {
/// but does incur overhead of generating an alternative representation of the arrow-
/// related rust structures that refer to those data buffers.
pub fn try_to_arrow_record_batch(&self) -> Result<RecordBatch, ArrowError> {
let fields: Vec<Field> = self
.schema
.fields
.iter()
.map(|f| f.clone().into())
.collect();

let metadata = self.schema.metadata.clone().into_iter().collect();

let schema = Schema::new_with_metadata(fields, metadata);

let columns: Vec<_> = self
.data
.columns()
.iter()
.map(|arr2_array| {
.all_columns()
.map(|(_field, arr2_array)| {
let data = arrow2::array::to_data(arr2_array.as_ref());
make_array(data)
})
.collect();

RecordBatch::try_new(std::sync::Arc::new(schema), columns)
RecordBatch::try_new(self.schema(), columns)
}

/// Create a [`TransportChunk`] from an arrow-rs [`RecordBatch`].
Expand All @@ -43,17 +29,6 @@ impl TransportChunk {
/// but does incur overhead of generating an alternative representation of the arrow-
/// related rust structures that refer to those data buffers.
pub fn from_arrow_record_batch(batch: &RecordBatch) -> Self {
let fields: Vec<arrow2::datatypes::Field> = batch
.schema()
.fields
.iter()
.map(|f| f.clone().into())
.collect();

let metadata = batch.schema().metadata.clone().into_iter().collect();

let schema = arrow2::datatypes::Schema::from(fields).with_metadata(metadata);

let columns: Vec<_> = batch
.columns()
.iter()
Expand All @@ -62,6 +37,6 @@ impl TransportChunk {

let data = arrow2::chunk::Chunk::new(columns);

Self { schema, data }
Self::new(batch.schema(), data)
}
}
33 changes: 21 additions & 12 deletions crates/store/re_chunk/src/arrow2_util.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use arrow::datatypes::Schema as ArrowSchema;
use arrow2::{
array::{
Array as Arrow2Array, BooleanArray as Arrow2BooleanArray,
Expand Down Expand Up @@ -438,32 +439,40 @@ pub fn take_array<A: Arrow2Array + Clone, O: arrow2::types::Index>(

// ---

use arrow2::{chunk::Chunk as Arrow2Chunk, datatypes::Schema as Arrow2Schema};
use arrow2::chunk::Chunk as Arrow2Chunk;

/// Concatenate multiple [`TransportChunk`]s into one.
///
/// This is a temporary method that we use while waiting to migrate towards `arrow-rs`.
/// * `arrow2` doesn't have a `RecordBatch` type, therefore we emulate that using our `TransportChunk`s.
/// * `arrow-rs` does have one, and it natively supports concatenation.
pub fn concatenate_record_batches(
schema: Arrow2Schema,
schema: impl Into<ArrowSchema>,
batches: &[TransportChunk],
) -> anyhow::Result<TransportChunk> {
assert!(batches.iter().map(|batch| batch.schema_ref()).all_equal());
let schema: ArrowSchema = schema.into();
anyhow::ensure!(
batches
.iter()
.all(|batch| batch.schema_ref().as_ref() == &schema),
"concatenate_record_batches: all batches must have the same schema"
);

let mut arrays = Vec::new();
let mut output_columns = Vec::new();

if !batches.is_empty() {
for (i, _field) in schema.fields.iter().enumerate() {
let array = concat_arrays(
&batches
.iter()
.map(|batch| &*batch.data[i] as &dyn Arrow2Array)
.collect_vec(),
)?;
arrays.push(array);
let arrays: Option<Vec<_>> = batches.iter().map(|batch| batch.column(i)).collect();
let arrays = arrays.ok_or_else(|| {
anyhow::anyhow!("concatenate_record_batches: all batches must have the same schema")
})?;
let array = concat_arrays(&arrays)?;
output_columns.push(array);
}
}

Ok(TransportChunk::new(schema, Arrow2Chunk::new(arrays)))
Ok(TransportChunk::new(
schema,
Arrow2Chunk::new(output_columns),
))
}
Loading

0 comments on commit c07eb6e

Please sign in to comment.