Skip to content

Commit

Permalink
Port row_ids to arrow1 (#8657)
Browse files Browse the repository at this point in the history
### Related
* Part of #3741
  • Loading branch information
emilk authored Jan 13, 2025
1 parent bdb2742 commit d07a374
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 92 deletions.
7 changes: 6 additions & 1 deletion crates/store/re_chunk/src/arrow_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ use itertools::Itertools;

// ---

#[inline]
pub fn into_arrow_ref(array: impl Array + 'static) -> ArrayRef {
std::sync::Arc::new(array)
}

/// Returns true if the given `list_array` is semantically empty.
///
/// Semantic emptiness is defined as either one of these:
Expand Down Expand Up @@ -285,7 +290,7 @@ where
);

if indices.len() == array.len() {
let indices = indices.values().as_ref();
let indices = indices.values();

let starts_at_zero = || indices[0] == O::Native::ZERO;
let is_consecutive = || {
Expand Down
74 changes: 33 additions & 41 deletions crates/store/re_chunk/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use std::sync::atomic::{AtomicU64, Ordering};

use ahash::HashMap;
use arrow::{
array::{Array as ArrowArray, ArrayRef as ArrowArrayRef, ListArray as ArrowListArray},
array::{
Array as ArrowArray, ArrayRef as ArrowArrayRef, ListArray as ArrowListArray,
StructArray as ArrowStructArray, UInt64Array as ArrowUInt64Array,
},
buffer::ScalarBuffer as ArrowScalarBuffer,
};
use arrow2::{
array::{
Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray,
StructArray as Arrow2StructArray,
},
array::{Array as Arrow2Array, ListArray as Arrow2ListArray},
Either,
};
use itertools::{izip, Itertools};
Expand All @@ -34,7 +34,10 @@ pub enum ChunkError {
Malformed { reason: String },

#[error(transparent)]
Arrow(#[from] arrow2::error::Error),
Arrow(#[from] arrow::error::ArrowError),

#[error(transparent)]
Arrow2(#[from] arrow2::error::Error),

#[error("{kind} index out of bounds: {index} (len={len})")]
IndexOutOfBounds {
Expand Down Expand Up @@ -224,7 +227,7 @@ pub struct Chunk {
pub(crate) is_sorted: bool,

/// The respective [`RowId`]s for each row of data.
pub(crate) row_ids: Arrow2StructArray,
pub(crate) row_ids: ArrowStructArray,

/// The time columns.
///
Expand Down Expand Up @@ -351,12 +354,6 @@ impl Chunk {
components,
} = self;

let row_ids_no_extension = arrow2::array::StructArray::new(
row_ids.data_type().to_logical_type().clone(),
row_ids.values().to_vec(),
row_ids.validity().cloned(),
);

let components_no_extension: IntMap<_, _> = components
.values()
.flat_map(|per_desc| {
Expand Down Expand Up @@ -388,16 +385,10 @@ impl Chunk {
})
.collect();

let other_row_ids_no_extension = arrow2::array::StructArray::new(
other.row_ids.data_type().to_logical_type().clone(),
other.row_ids.values().to_vec(),
other.row_ids.validity().cloned(),
);

*id == other.id
&& *entity_path == other.entity_path
&& *is_sorted == other.is_sorted
&& row_ids_no_extension == other_row_ids_no_extension
&& row_ids == &other.row_ids
&& *timelines == other.timelines
&& components_no_extension == other_components_no_extension
}
Expand Down Expand Up @@ -437,11 +428,11 @@ impl Chunk {
.collect_vec();

#[allow(clippy::unwrap_used)]
let row_ids = <RowId as Loggable>::to_arrow2(&row_ids)
let row_ids = <RowId as Loggable>::to_arrow(&row_ids)
// Unwrap: native RowIds cannot fail to serialize.
.unwrap()
.as_any()
.downcast_ref::<Arrow2StructArray>()
.downcast_ref::<ArrowStructArray>()
// Unwrap: RowId schema is known in advance to be a struct array -- always.
.unwrap()
.clone();
Expand Down Expand Up @@ -493,11 +484,11 @@ impl Chunk {
.collect_vec();

#[allow(clippy::unwrap_used)]
let row_ids = <RowId as Loggable>::to_arrow2(&row_ids)
let row_ids = <RowId as Loggable>::to_arrow(&row_ids)
// Unwrap: native RowIds cannot fail to serialize.
.unwrap()
.as_any()
.downcast_ref::<Arrow2StructArray>()
.downcast_ref::<ArrowStructArray>()
// Unwrap: RowId schema is known in advance to be a struct array -- always.
.unwrap()
.clone();
Expand Down Expand Up @@ -826,7 +817,7 @@ impl Chunk {
id: ChunkId,
entity_path: EntityPath,
is_sorted: Option<bool>,
row_ids: Arrow2StructArray,
row_ids: ArrowStructArray,
timelines: IntMap<Timeline, TimeColumn>,
components: ChunkComponents,
) -> ChunkResult<Self> {
Expand Down Expand Up @@ -866,13 +857,13 @@ impl Chunk {
) -> ChunkResult<Self> {
re_tracing::profile_function!();
let row_ids = row_ids
.to_arrow2()
.to_arrow()
// NOTE: impossible, but better safe than sorry.
.map_err(|err| ChunkError::Malformed {
reason: format!("RowIds failed to serialize: {err}"),
})?
.as_any()
.downcast_ref::<Arrow2StructArray>()
.downcast_ref::<ArrowStructArray>()
// NOTE: impossible, but better safe than sorry.
.ok_or_else(|| ChunkError::Malformed {
reason: "RowIds failed to downcast".to_owned(),
Expand Down Expand Up @@ -923,7 +914,7 @@ impl Chunk {
id: ChunkId,
entity_path: EntityPath,
is_sorted: Option<bool>,
row_ids: Arrow2StructArray,
row_ids: ArrowStructArray,
components: ChunkComponents,
) -> ChunkResult<Self> {
Self::new(
Expand All @@ -943,7 +934,11 @@ impl Chunk {
entity_path,
heap_size_bytes: Default::default(),
is_sorted: true,
row_ids: Arrow2StructArray::new_empty(RowId::arrow2_datatype()),
row_ids: arrow::array::StructBuilder::from_fields(
re_types_core::tuid_arrow_fields(),
0,
)
.finish(),
timelines: Default::default(),
components: Default::default(),
}
Expand Down Expand Up @@ -1203,27 +1198,24 @@ impl Chunk {
}

#[inline]
pub fn row_ids_array(&self) -> &Arrow2StructArray {
pub fn row_ids_array(&self) -> &ArrowStructArray {
&self.row_ids
}

/// Returns the [`RowId`]s in their raw-est form: a tuple of (times, counters) arrays.
#[inline]
pub fn row_ids_raw(&self) -> (&Arrow2PrimitiveArray<u64>, &Arrow2PrimitiveArray<u64>) {
let [times, counters] = self.row_ids.values() else {
pub fn row_ids_raw(&self) -> (&ArrowUInt64Array, &ArrowUInt64Array) {
let [times, counters] = self.row_ids.columns() else {
panic!("RowIds are corrupt -- this should be impossible (sanity checked)");
};

#[allow(clippy::unwrap_used)]
let times = times
.as_any()
.downcast_ref::<Arrow2PrimitiveArray<u64>>()
.unwrap(); // sanity checked
let times = times.as_any().downcast_ref::<ArrowUInt64Array>().unwrap(); // sanity checked

#[allow(clippy::unwrap_used)]
let counters = counters
.as_any()
.downcast_ref::<Arrow2PrimitiveArray<u64>>()
.downcast_ref::<ArrowUInt64Array>()
.unwrap(); // sanity checked

(times, counters)
Expand All @@ -1235,7 +1227,7 @@ impl Chunk {
#[inline]
pub fn row_ids(&self) -> impl Iterator<Item = RowId> + '_ {
let (times, counters) = self.row_ids_raw();
izip!(times.values().as_ref(), counters.values().as_slice())
izip!(times.values(), counters.values())
.map(|(&time, &counter)| RowId::from_u128((time as u128) << 64 | (counter as u128)))
}

Expand Down Expand Up @@ -1277,7 +1269,7 @@ impl Chunk {
}

let (times, counters) = self.row_ids_raw();
let (times, counters) = (times.values().as_ref(), counters.values().as_slice());
let (times, counters) = (times.values(), counters.values());

#[allow(clippy::unwrap_used)] // checked above
let (index_min, index_max) = if self.is_sorted() {
Expand Down Expand Up @@ -1558,11 +1550,11 @@ impl Chunk {

// Row IDs
{
if *row_ids.data_type().to_logical_type() != RowId::arrow2_datatype() {
if *row_ids.data_type() != RowId::arrow_datatype() {
return Err(ChunkError::Malformed {
reason: format!(
"RowId data has the wrong datatype: expected {:?} but got {:?} instead",
RowId::arrow2_datatype(),
RowId::arrow_datatype(),
*row_ids.data_type(),
),
});
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_chunk/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,8 @@ impl Iterator for ChunkIndicesIter {

let row_id = {
let (times, incs) = self.chunk.row_ids_raw();
let times = times.values().as_slice();
let incs = incs.values().as_slice();
let times = times.values();
let incs = incs.values();

let time = *times.get(i)?;
let inc = *incs.get(i)?;
Expand Down
12 changes: 6 additions & 6 deletions crates/store/re_chunk/src/merge.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use arrow::array::StructArray as ArrowStructArray;
use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
use arrow2::array::{
Array as Arrow2Array, ListArray as Arrow2ListArray, StructArray as Arrow2StructArray,
};
use arrow2::array::{Array as Arrow2Array, ListArray as Arrow2ListArray};
use itertools::{izip, Itertools};
use nohash_hasher::IntMap;

use crate::{
arrow2_util, chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult, TimeColumn,
arrow2_util, arrow_util, chunk::ChunkComponents, Chunk, ChunkError, ChunkId, ChunkResult,
TimeColumn,
};

// ---
Expand Down Expand Up @@ -48,12 +48,12 @@ impl Chunk {
let row_ids = {
re_tracing::profile_scope!("row_ids");

let row_ids = arrow2_util::concat_arrays(&[&cl.row_ids, &cr.row_ids])?;
let row_ids = arrow_util::concat_arrays(&[&cl.row_ids, &cr.row_ids])?;
#[allow(clippy::unwrap_used)]
// concatenating 2 RowId arrays must yield another RowId array
row_ids
.as_any()
.downcast_ref::<Arrow2StructArray>()
.downcast_ref::<ArrowStructArray>()
.unwrap()
.clone()
};
Expand Down
23 changes: 11 additions & 12 deletions crates/store/re_chunk/src/shuffle.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
use std::sync::Arc;

use arrow::{
array::{Array as _, StructArray as ArrowStructArray, UInt64Array as ArrowUInt64Array},
buffer::ScalarBuffer as ArrowScalarBuffer,
};
use arrow2::{
array::{
Array as Arrow2Array, ListArray as Arrow2ListArray, PrimitiveArray as Arrow2PrimitiveArray,
StructArray,
},
array::{Array as Arrow2Array, ListArray as Arrow2ListArray},
offset::Offsets as ArrowOffsets,
};
use itertools::Itertools as _;
Expand Down Expand Up @@ -213,14 +215,11 @@ impl Chunk {
sorted_counters[to] = counters[from];
}

let times = Arrow2PrimitiveArray::<u64>::from_vec(sorted_times).boxed();
let counters = Arrow2PrimitiveArray::<u64>::from_vec(sorted_counters).boxed();
let times = Arc::new(ArrowUInt64Array::from(sorted_times));
let counters = Arc::new(ArrowUInt64Array::from(sorted_counters));

self.row_ids = StructArray::new(
self.row_ids.data_type().clone(),
vec![times, counters],
None,
);
self.row_ids =
ArrowStructArray::new(self.row_ids.fields().clone(), vec![times, counters], None);
}

let Self {
Expand Down
23 changes: 14 additions & 9 deletions crates/store/re_chunk/src/slice.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use arrow2::array::{
Array as Arrow2Array, BooleanArray as Arrow2BooleanArray, ListArray as Arrow2ListArray,
StructArray as Arrow2StructArray,
};

use itertools::Itertools;
Expand Down Expand Up @@ -38,8 +37,8 @@ impl Chunk {
let row_id_inc = (row_id_128 & (!0 >> 64)) as u64;

let (times, incs) = self.row_ids_raw();
let times = times.values().as_slice();
let incs = incs.values().as_slice();
let times = times.values();
let incs = incs.values();

let mut index = times.partition_point(|&time| time < row_id_time_ns);
while index < incs.len() && incs[index] < row_id_inc {
Expand Down Expand Up @@ -102,7 +101,7 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted,
row_ids: row_ids.clone().sliced(index, len),
row_ids: row_ids.clone().slice(index, len),
timelines: timelines
.iter()
.map(|(timeline, time_column)| (*timeline, time_column.row_sliced(index, len)))
Expand Down Expand Up @@ -369,7 +368,7 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted,
row_ids: arrow2_util::filter_array(row_ids, &validity_filter),
row_ids: arrow_util::filter_array(row_ids, &validity_filter.clone().into()),
timelines: timelines
.iter()
.map(|(&timeline, time_column)| (timeline, time_column.filtered(&validity_filter)))
Expand Down Expand Up @@ -450,7 +449,7 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted: true,
row_ids: Arrow2StructArray::new_empty(row_ids.data_type().clone()),
row_ids: arrow::array::StructBuilder::from_fields(row_ids.fields().clone(), 0).finish(),
timelines: timelines
.iter()
.map(|(&timeline, time_column)| (timeline, time_column.emptied()))
Expand Down Expand Up @@ -546,7 +545,10 @@ impl Chunk {
entity_path: self.entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted: self.is_sorted,
row_ids: arrow2_util::take_array(&self.row_ids, &indices),
row_ids: arrow_util::take_array(
&self.row_ids,
&arrow::array::Int32Array::from(indices.clone()),
),
timelines: self
.timelines
.iter()
Expand Down Expand Up @@ -619,7 +621,7 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted,
row_ids: arrow2_util::filter_array(row_ids, filter),
row_ids: arrow_util::filter_array(row_ids, &filter.clone().into()),
timelines: timelines
.iter()
.map(|(&timeline, time_column)| (timeline, time_column.filtered(filter)))
Expand Down Expand Up @@ -699,7 +701,10 @@ impl Chunk {
entity_path: entity_path.clone(),
heap_size_bytes: Default::default(),
is_sorted,
row_ids: arrow2_util::take_array(row_ids, indices),
row_ids: arrow_util::take_array(
row_ids,
&arrow::array::Int32Array::from(indices.clone()),
),
timelines: timelines
.iter()
.map(|(&timeline, time_column)| (timeline, time_column.taken(indices)))
Expand Down
Loading

0 comments on commit d07a374

Please sign in to comment.