Skip to content

Commit

Permalink
Port PendingRow to arrow-rs
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Jan 8, 2025
1 parent 8d8638a commit 288552d
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 22 deletions.
7 changes: 3 additions & 4 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5183,7 +5183,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
dependencies = [
"bytes",
"heck 0.5.0",
"heck 0.4.1",
"itertools 0.13.0",
"log",
"multimap",
Expand Down Expand Up @@ -5542,9 +5542,8 @@ dependencies = [

[[package]]
name = "re_arrow2"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f046c5679b0f305d610f80d93fd51ad702cfc077bbe16d9553a1660a2505160"
version = "0.18.1"
source = "git+https://github.com/rerun-io/re_arrow2.git?branch=emilk/more-arrow-compatibility#0e4b3dd7cd73426b1209ebe0323087452a7c8b91"
dependencies = [
"ahash",
"arrow-array",
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -587,3 +587,5 @@ egui_commonmark = { git = "https://github.com/rerun-io/egui_commonmark.git", bra
# walkers = { git = "https://github.com/rerun-io/walkers", rev = "8939cceb3fa49ca8648ee16fe1d8432f5ab0bdcc" } # https://github.com/podusowski/walkers/pull/222

# dav1d = { path = "/home/cmc/dev/rerun-io/rav1d", package = "re_rav1d", version = "0.1.1" }

re_arrow2 = { git = "https://github.com/rerun-io/re_arrow2.git", branch = "emilk/more-arrow-compatibility" } # TODO : point to main branmch
31 changes: 16 additions & 15 deletions crates/store/re_chunk/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ use std::{
time::{Duration, Instant},
};

use arrow2::array::{Array as Arrow2Array, PrimitiveArray as Arrow2PrimitiveArray};
use arrow::array::{Array as ArrowArray, ArrayRef};
use arrow2::array::PrimitiveArray as Arrow2PrimitiveArray;
use crossbeam::channel::{Receiver, Sender};
use nohash_hasher::IntMap;

use re_byte_size::SizeBytes as _;
use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, TimePoint, Timeline};
use re_types_core::ComponentDescriptor;

use crate::{arrow2_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};
use crate::{arrow_util, chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};

// ---

Expand Down Expand Up @@ -679,15 +680,12 @@ pub struct PendingRow {
/// The component data.
///
/// Each array is a single component, i.e. _not_ a list array.
pub components: IntMap<ComponentDescriptor, Box<dyn Arrow2Array>>,
pub components: IntMap<ComponentDescriptor, ArrayRef>,
}

impl PendingRow {
#[inline]
pub fn new(
timepoint: TimePoint,
components: IntMap<ComponentDescriptor, Box<dyn Arrow2Array>>,
) -> Self {
pub fn new(timepoint: TimePoint, components: IntMap<ComponentDescriptor, ArrayRef>) -> Self {
Self {
row_id: RowId::new(),
timepoint,
Expand Down Expand Up @@ -734,9 +732,9 @@ impl PendingRow {

let mut per_name = ChunkComponents::default();
for (component_desc, array) in components {
let list_array = arrow2_util::arrays_to_list_array_opt(&[Some(&*array as _)]);
let list_array = arrow_util::arrays_to_list_array_opt(&[Some(&*array as _)]);
if let Some(list_array) = list_array {
per_name.insert_descriptor(component_desc, list_array);
per_name.insert_descriptor_arrow1(component_desc, list_array);
}
}

Expand Down Expand Up @@ -826,7 +824,7 @@ impl PendingRow {

// Create all the logical list arrays that we're going to need, accounting for the
// possibility of sparse components in the data.
let mut all_components: IntMap<ComponentDescriptor, Vec<Option<&dyn Arrow2Array>>> =
let mut all_components: IntMap<ComponentDescriptor, Vec<Option<&dyn ArrowArray>>> =
IntMap::default();
for row in &rows {
for component_desc in row.components.keys() {
Expand Down Expand Up @@ -870,9 +868,12 @@ impl PendingRow {
for (component_desc, arrays) in std::mem::take(&mut components)
{
let list_array =
arrow2_util::arrays_to_list_array_opt(&arrays);
arrow_util::arrays_to_list_array_opt(&arrays);
if let Some(list_array) = list_array {
per_name.insert_descriptor(component_desc, list_array);
per_name.insert_descriptor_arrow1(
component_desc,
list_array,
);
}
}
per_name
Expand All @@ -898,7 +899,7 @@ impl PendingRow {
arrays.push(
row_components
.get(component_desc)
.map(|array| &**array as &dyn Arrow2Array),
.map(|array| &**array as &dyn ArrowArray),
);
}
}
Expand All @@ -915,9 +916,9 @@ impl PendingRow {
{
let mut per_name = ChunkComponents::default();
for (component_desc, arrays) in components {
let list_array = arrow2_util::arrays_to_list_array_opt(&arrays);
let list_array = arrow_util::arrays_to_list_array_opt(&arrays);
if let Some(list_array) = list_array {
per_name.insert_descriptor(component_desc, list_array);
per_name.insert_descriptor_arrow1(component_desc, list_array);
}
}
per_name
Expand Down
17 changes: 17 additions & 0 deletions crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::atomic::{AtomicU64, Ordering};

use ahash::HashMap;
use arrow::array::ListArray as ArrowListArray;
use arrow2::{
array::{
Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray,
Expand Down Expand Up @@ -60,6 +61,22 @@ pub struct ChunkComponents(
);

impl ChunkComponents {
/// Like `Self::insert`, but automatically infers the [`ComponentName`] layer.
#[inline]
pub fn insert_descriptor_arrow1(
&mut self,
component_desc: ComponentDescriptor,
list_array: ArrowListArray,
) -> Option<ArrowListArray> {
// TODO(cmc): revert me
let component_desc = component_desc.untagged();
self.0
.entry(component_desc.component_name)
.or_default()
.insert(component_desc, list_array.into())
.map(|la| la.into())
}

/// Like `Self::insert`, but automatically infers the [`ComponentName`] layer.
#[inline]
pub fn insert_descriptor(
Expand Down
2 changes: 1 addition & 1 deletion crates/top/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,7 @@ impl RecordingStream {
.into_iter()
.map(|comp_batch| {
comp_batch
.to_arrow2()
.to_arrow()
.map(|array| (comp_batch.descriptor().into_owned(), array))
})
.collect();
Expand Down
2 changes: 1 addition & 1 deletion crates/top/rerun_c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ fn rr_recording_stream_log_impl(
let component_type = component_type_registry.get(*component_type)?;
let datatype = component_type.datatype.clone();
let values = unsafe { arrow_array_from_c_ffi(array, datatype) }?;
components.insert(component_type.descriptor.clone(), values);
components.insert(component_type.descriptor.clone(), values.into());
}
}

Expand Down
2 changes: 1 addition & 1 deletion rerun_py/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub fn build_row_from_components(
let component_descr = descriptor_to_rust(&component_descr)?;
let (list_array, _field) = array_to_rust(&array, &component_descr)?;

components.insert(component_descr, list_array);
components.insert(component_descr, list_array.into());
}

Ok(PendingRow {
Expand Down

0 comments on commit 288552d

Please sign in to comment.