Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port TransportChunk::schema to arrow-rs #8687

Merged
merged 4 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6009,6 +6009,7 @@ dependencies = [
name = "re_grpc_client"
version = "0.22.0-alpha.1+dev"
dependencies = [
"arrow",
"re_chunk",
"re_error",
"re_log",
Expand Down
33 changes: 4 additions & 29 deletions crates/store/re_chunk/src/arrow.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use arrow::{
array::{make_array, RecordBatch},
datatypes::{Field, Schema},
error::ArrowError,
};

Expand All @@ -13,28 +12,15 @@ impl TransportChunk {
/// but does incur overhead of generating an alternative representation of the arrow-
/// related rust structures that refer to those data buffers.
pub fn try_to_arrow_record_batch(&self) -> Result<RecordBatch, ArrowError> {
let fields: Vec<Field> = self
.schema
.fields
.iter()
.map(|f| f.clone().into())
.collect();

let metadata = self.schema.metadata.clone().into_iter().collect();

let schema = Schema::new_with_metadata(fields, metadata);

let columns: Vec<_> = self
.data
.columns()
.iter()
.map(|arr2_array| {
.all_columns()
.map(|(_field, arr2_array)| {
let data = arrow2::array::to_data(arr2_array.as_ref());
make_array(data)
})
.collect();

RecordBatch::try_new(std::sync::Arc::new(schema), columns)
RecordBatch::try_new(self.schema(), columns)
}

/// Create a [`TransportChunk`] from an arrow-rs [`RecordBatch`].
Expand All @@ -43,17 +29,6 @@ impl TransportChunk {
/// but does incur overhead of generating an alternative representation of the arrow-
/// related rust structures that refer to those data buffers.
pub fn from_arrow_record_batch(batch: &RecordBatch) -> Self {
let fields: Vec<arrow2::datatypes::Field> = batch
.schema()
.fields
.iter()
.map(|f| f.clone().into())
.collect();

let metadata = batch.schema().metadata.clone().into_iter().collect();

let schema = arrow2::datatypes::Schema::from(fields).with_metadata(metadata);

let columns: Vec<_> = batch
.columns()
.iter()
Expand All @@ -62,6 +37,6 @@ impl TransportChunk {

let data = arrow2::chunk::Chunk::new(columns);

Self { schema, data }
Self::new(batch.schema(), data)
}
}
33 changes: 21 additions & 12 deletions crates/store/re_chunk/src/arrow2_util.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use arrow::datatypes::Schema as ArrowSchema;
use arrow2::{
array::{
Array as Arrow2Array, BooleanArray as Arrow2BooleanArray,
Expand Down Expand Up @@ -438,32 +439,40 @@ pub fn take_array<A: Arrow2Array + Clone, O: arrow2::types::Index>(

// ---

use arrow2::{chunk::Chunk as Arrow2Chunk, datatypes::Schema as Arrow2Schema};
use arrow2::chunk::Chunk as Arrow2Chunk;

/// Concatenate multiple [`TransportChunk`]s into one.
///
/// This is a temporary method that we use while waiting to migrate towards `arrow-rs`.
/// * `arrow2` doesn't have a `RecordBatch` type, therefore we emulate that using our `TransportChunk`s.
/// * `arrow-rs` does have one, and it natively supports concatenation.
pub fn concatenate_record_batches(
schema: Arrow2Schema,
schema: impl Into<ArrowSchema>,
batches: &[TransportChunk],
) -> anyhow::Result<TransportChunk> {
assert!(batches.iter().map(|batch| batch.schema_ref()).all_equal());
let schema: ArrowSchema = schema.into();
anyhow::ensure!(
batches
.iter()
.all(|batch| batch.schema_ref().as_ref() == &schema),
"concatenate_record_batches: all batches must have the same schema"
);

let mut arrays = Vec::new();
let mut output_columns = Vec::new();

if !batches.is_empty() {
for (i, _field) in schema.fields.iter().enumerate() {
let array = concat_arrays(
&batches
.iter()
.map(|batch| &*batch.data[i] as &dyn Arrow2Array)
.collect_vec(),
)?;
arrays.push(array);
let arrays: Option<Vec<_>> = batches.iter().map(|batch| batch.column(i)).collect();
let arrays = arrays.ok_or_else(|| {
anyhow::anyhow!("concatenate_record_batches: all batches must have the same schema")
})?;
let array = concat_arrays(&arrays)?;
output_columns.push(array);
}
}

Ok(TransportChunk::new(schema, Arrow2Chunk::new(arrays)))
Ok(TransportChunk::new(
schema,
Arrow2Chunk::new(output_columns),
))
}
Loading
Loading