Skip to content

Commit

Permalink
TransportChunk just wraps RecordBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Jan 15, 2025
1 parent c63de63 commit dee88f0
Show file tree
Hide file tree
Showing 18 changed files with 317 additions and 401 deletions.
3 changes: 0 additions & 3 deletions crates/store/re_chunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ serde = [
"re_types_core/serde",
]

## Enable conversion to and from arrow-rs types
arrow = ["arrow2/arrow"]


[dependencies]

Expand Down
40 changes: 0 additions & 40 deletions crates/store/re_chunk/src/arrow.rs

This file was deleted.

18 changes: 17 additions & 1 deletion crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,13 +956,29 @@ impl Chunk {
}
}

/// Unconditionally inserts an [`Arrow2ListArray`] as a component column.
/// Unconditionally inserts an [`ArrowListArray`] as a component column.
///
/// Removes and replaces the column if it already exists.
///
/// This will fail if the end result is malformed in any way -- see [`Self::sanity_check`].
#[inline]
pub fn add_component(
&mut self,
component_desc: ComponentDescriptor,
list_array: ArrowListArray,
) -> ChunkResult<()> {
self.components
.insert_descriptor(component_desc, list_array);
self.sanity_check()
}

/// Unconditionally inserts an [`Arrow2ListArray`] as a component column.
///
/// Removes and replaces the column if it already exists.
///
/// This will fail if the end result is malformed in any way -- see [`Self::sanity_check`].
#[inline]
pub fn add_component_arrow2(
&mut self,
component_desc: ComponentDescriptor,
list_array: Arrow2ListArray<i32>,
Expand Down
34 changes: 8 additions & 26 deletions crates/store/re_chunk/src/concat_record_batches.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
use crate::TransportChunk;

use arrow::array::RecordBatch;
use arrow::datatypes::Schema as ArrowSchema;
use arrow2::chunk::Chunk as Arrow2Chunk;

/// Concatenate multiple [`TransportChunk`]s into one.
///
/// This is a temporary method that we use while waiting to migrate towards `arrow-rs`.
/// * `arrow2` doesn't have a `RecordBatch` type, therefore we emulate that using our `TransportChunk`s.
/// * `arrow-rs` does have one, and it natively supports concatenation.
/// Concatenate multiple [`RecordBatch`]s into one.
// TODO: make a member of `RecordBatch` instead
pub fn concatenate_record_batches(
schema: impl Into<ArrowSchema>,
batches: &[TransportChunk],
) -> anyhow::Result<TransportChunk> {
batches: &[RecordBatch],
) -> anyhow::Result<RecordBatch> {
let schema: ArrowSchema = schema.into();
anyhow::ensure!(
batches
Expand All @@ -20,21 +15,8 @@ pub fn concatenate_record_batches(
"concatenate_record_batches: all batches must have the same schema"
);

let mut output_columns = Vec::new();

if !batches.is_empty() {
for (i, _field) in schema.fields.iter().enumerate() {
let arrays: Option<Vec<_>> = batches.iter().map(|batch| batch.column(i)).collect();
let arrays = arrays.ok_or_else(|| {
anyhow::anyhow!("concatenate_record_batches: all batches must have the same schema")
})?;
let array = re_arrow_util::arrow2_util::concat_arrays(&arrays)?;
output_columns.push(array);
}
}
// TODO: is_sorted is probably false now!

Ok(TransportChunk::new(
schema,
Arrow2Chunk::new(output_columns),
))
let record_batch = arrow::compute::concat_batches(&schema.into(), batches)?;
Ok(record_batch)
}
3 changes: 0 additions & 3 deletions crates/store/re_chunk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ mod transport;
#[cfg(not(target_arch = "wasm32"))]
mod batcher;

#[cfg(feature = "arrow")]
mod arrow;

pub use self::builder::{ChunkBuilder, TimeColumnBuilder};
pub use self::chunk::{
Chunk, ChunkComponents, ChunkError, ChunkResult, TimeColumn, TimeColumnError,
Expand Down
Loading

0 comments on commit dee88f0

Please sign in to comment.