Skip to content

Commit

Permalink
Port ArrowMsg to using arrow::RecordBatch (#8669)
Browse files Browse the repository at this point in the history
### Related
* Poart of #3741 

### Details
I'd like to replace our `TransportChunk` with arrow's `RecordBatch`.

This is a good step in that direction.

---------

Co-authored-by: Jeremy Leibs <[email protected]>
  • Loading branch information
emilk and jleibs authored Jan 14, 2025
1 parent 63012cd commit 3a8c22f
Show file tree
Hide file tree
Showing 35 changed files with 640 additions and 329 deletions.
23 changes: 20 additions & 3 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ dependencies = [
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-ipc",
"arrow-ord",
"arrow-row",
"arrow-schema",
Expand Down Expand Up @@ -457,6 +458,20 @@ dependencies = [
"serde",
]

[[package]]
name = "arrow-ipc"
version = "53.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ed91bdeaff5a1c00d28d8f73466bcb64d32bbd7093b5a30156b4b9f4dba3eee"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-schema",
"flatbuffers",
]

[[package]]
name = "arrow-ord"
version = "53.2.0"
Expand Down Expand Up @@ -3885,7 +3900,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.52.6",
"windows-targets 0.48.5",
]

[[package]]
Expand Down Expand Up @@ -5203,7 +5218,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
dependencies = [
"bytes",
"heck 0.5.0",
"heck 0.4.1",
"itertools 0.13.0",
"log",
"multimap",
Expand Down Expand Up @@ -5569,7 +5584,7 @@ dependencies = [
[[package]]
name = "re_arrow2"
version = "0.18.1"
source = "git+https://github.com/rerun-io/re_arrow2.git?branch=main#573b5bafd071d09698353d8f0de8d31f3fa59017"
source = "git+https://github.com/rerun-io/re_arrow2.git?branch=main#e8576708a1b41b493980ecb995e808aefcfa1fbc"
dependencies = [
"ahash",
"arrow-array",
Expand Down Expand Up @@ -6057,6 +6072,7 @@ dependencies = [
name = "re_log_encoding"
version = "0.22.0-alpha.1+dev"
dependencies = [
"arrow",
"criterion",
"ehttp",
"js-sys",
Expand All @@ -6074,6 +6090,7 @@ dependencies = [
"re_types",
"rmp-serde",
"serde_test",
"similar-asserts",
"thiserror 1.0.65",
"wasm-bindgen",
"wasm-bindgen-futures",
Expand Down
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,12 @@ significant_drop_tightening = "allow" # An update of parking_lot made this trigg

[patch.crates-io]
# Try to avoid patching crates! It prevents us from publishing the crates on crates.io.
# If you do patch always prefer to patch to a commit on the trunk of the upstream repo.
# If you do patch always prefer to patch to the trunk branch of the upstream repo (i.e. `main`, `master`, …).
# If that is not possible, patch to a branch that has a PR open on the upstream repo.
# As a last resport, patch with a commit to our own repository.
# ALWAYS document what PR the commit hash is part of, or when it was merged into the upstream trunk.
# As a last resort, patch to a branch on our own repository.
#
# Prefer patching with `branch` over `rev` and let `Cargo.lock` handle the commit hash.
# That makes it easy to upade with `cargo update -p $CRATE`.

ecolor = { git = "https://github.com/emilk/egui.git", rev = "f0d7c74e838b8e8920a22e7515990fbe057ec218" } # egui master 2025-01-08
eframe = { git = "https://github.com/emilk/egui.git", rev = "f0d7c74e838b8e8920a22e7515990fbe057ec218" } # egui master 2025-01-08
Expand Down
3 changes: 3 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ disallowed-methods = [
{ path = "arrow::compute::filter", reason = "Use `re_chunk::arrow_util::filter_array` instead" },
{ path = "arrow::compute::take", reason = "Use `re_chunk::arrow_util::take_array` instead" },

{ path = "arrow::datatypes::Schema::new", reason = "Use `arrow::datatypes::Schema::new_with_metadata` instead. There is usually some metadata you want to preserve." },

# Specify both `arrow2` and `re_arrow2` -- clippy gets lost in all the package renaming happening.
{ path = "arrow2::compute::concatenate::concatenate", reason = "Use `re_chunk::arrow2_util::concat_arrays` instead, which has proper early outs" },
{ path = "arrow2::compute::filter::filter", reason = "Use `re_chunk::arrow2_util::filter_array` instead, which has proper early outs" },
Expand All @@ -72,6 +74,7 @@ disallowed-names = []

# https://rust-lang.github.io/rust-clippy/master/index.html#disallowed_types
disallowed-types = [
{ path = "arrow::ipc::writer::StreamWriter", reason = "Wait until https://github.com/apache/arrow-rs/pull/6805 has been released" },
{ path = "egui::Checkbox", reason = "Use `re_checkbox` from `re_ui::UiEx" },
{ path = "ring::digest::SHA1_FOR_LEGACY_USE_ONLY", reason = "SHA1 is cryptographically broken" },
{ path = "std::sync::Condvar", reason = "Use parking_lot instead" },
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_chunk/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl TransportChunk {

let metadata = self.schema.metadata.clone().into_iter().collect();

let schema = Schema::new(fields).with_metadata(metadata);
let schema = Schema::new_with_metadata(fields, metadata);

let columns: Vec<_> = self
.data
Expand Down
7 changes: 2 additions & 5 deletions crates/store/re_chunk/src/arrow2_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ pub fn concatenate_record_batches(
schema: Arrow2Schema,
batches: &[TransportChunk],
) -> anyhow::Result<TransportChunk> {
assert!(batches.iter().map(|batch| &batch.schema).all_equal());
assert!(batches.iter().map(|batch| batch.schema_ref()).all_equal());

let mut arrays = Vec::new();

Expand All @@ -465,8 +465,5 @@ pub fn concatenate_record_batches(
}
}

Ok(TransportChunk {
schema,
data: Arrow2Chunk::new(arrays),
})
Ok(TransportChunk::new(schema, Arrow2Chunk::new(arrays)))
}
4 changes: 2 additions & 2 deletions crates/store/re_chunk/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ pub struct BatcherHooks {

/// Callback to be run when an Arrow Chunk goes out of scope.
///
/// See [`re_log_types::ArrowChunkReleaseCallback`] for more information.
/// See [`re_log_types::ArrowRecordBatchReleaseCallback`] for more information.
//
// TODO(#6412): probably don't need this anymore.
pub on_release: Option<re_log_types::ArrowChunkReleaseCallback>,
pub on_release: Option<re_log_types::ArrowRecordBatchReleaseCallback>,
}

impl BatcherHooks {
Expand Down
80 changes: 60 additions & 20 deletions crates/store/re_chunk/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use arrow::array::{
Array as ArrowArray, ArrayRef as ArrowArrayRef, StructArray as ArrowStructArray,
Array as ArrowArray, ArrayRef as ArrowArrayRef, RecordBatch as ArrowRecordBatch,
StructArray as ArrowStructArray,
};
use arrow2::{
array::{Array as Arrow2Array, ListArray},
Expand Down Expand Up @@ -69,6 +70,48 @@ impl std::fmt::Display for TransportChunk {
}
}

impl TransportChunk {
pub fn new(
schema: impl Into<Arrow2Schema>,
columns: impl Into<Arrow2Chunk<Box<dyn Arrow2Array>>>,
) -> Self {
Self {
schema: schema.into(),
data: columns.into(),
}
}
}

impl From<ArrowRecordBatch> for TransportChunk {
fn from(batch: ArrowRecordBatch) -> Self {
Self::new(
batch.schema(),
Arrow2Chunk::new(
batch
.columns()
.iter()
.map(|column| column.clone().into())
.collect(),
),
)
}
}

impl TryFrom<TransportChunk> for ArrowRecordBatch {
type Error = arrow::error::ArrowError;

fn try_from(chunk: TransportChunk) -> Result<Self, Self::Error> {
let TransportChunk { schema, data } = chunk;
Self::try_new(
schema.into(),
data.columns()
.iter()
.map(|column| column.clone().into())
.collect(),
)
}
}

// TODO(#6572): Relying on Arrow's native schema metadata feature is bound to fail, we need to
// switch to something more powerful asap.
impl TransportChunk {
Expand Down Expand Up @@ -301,6 +344,11 @@ impl TransportChunk {
.and_then(|s| s.parse::<u64>().ok())
}

#[inline]
pub fn schema_ref(&self) -> &Arrow2Schema {
&self.schema
}

/// Looks in the chunk metadata for the `IS_SORTED` marker.
///
/// It is possible that a chunk is sorted but didn't set that marker.
Expand Down Expand Up @@ -541,10 +589,11 @@ impl Chunk {
}
}

Ok(TransportChunk {
schema,
data: Arrow2Chunk::new(columns),
})
Ok(TransportChunk::new(schema, Arrow2Chunk::new(columns)))
}

pub fn from_record_batch(batch: ArrowRecordBatch) -> ChunkResult<Self> {
Self::from_transport(&batch.into())
}

pub fn from_transport(transport: &TransportChunk) -> ChunkResult<Self> {
Expand Down Expand Up @@ -694,15 +743,11 @@ impl Chunk {
let re_log_types::ArrowMsg {
chunk_id: _,
timepoint_max: _,
schema,
chunk,
batch,
on_release: _,
} = msg;

Self::from_transport(&TransportChunk {
schema: schema.clone(),
data: chunk.clone(),
})
Self::from_record_batch(batch.clone())
}

#[inline]
Expand All @@ -714,8 +759,7 @@ impl Chunk {
Ok(re_log_types::ArrowMsg {
chunk_id: re_tuid::Tuid::from_u128(self.id().as_u128()),
timepoint_max: self.timepoint_max(),
schema: transport.schema,
chunk: transport.data,
batch: transport.try_into()?,
on_release: None,
})
}
Expand Down Expand Up @@ -805,14 +849,10 @@ mod tests {
for _ in 0..3 {
let chunk_in_transport = chunk_before.to_transport()?;
#[cfg(feature = "arrow")]
let chunk_after = {
let chunk_in_record_batch = chunk_in_transport.try_to_arrow_record_batch()?;
let chunk_roundtrip =
TransportChunk::from_arrow_record_batch(&chunk_in_record_batch);
Chunk::from_transport(&chunk_roundtrip)?
};
let chunk_after =
Chunk::from_record_batch(chunk_in_transport.try_to_arrow_record_batch()?)?;
#[cfg(not(feature = "arrow"))]
let chunk_after = { Chunk::from_transport(&chunk_in_transport)? };
let chunk_after = Chunk::from_transport(&chunk_in_transport)?;

assert_eq!(
chunk_in_transport.entity_path()?,
Expand Down
16 changes: 3 additions & 13 deletions crates/store/re_chunk_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arrow::datatypes::DataType as ArrowDataType;
use arrow2::datatypes::DataType as Arrow2DataType;
use nohash_hasher::IntMap;

use re_chunk::{Chunk, ChunkId, RowId, TransportChunk};
use re_chunk::{Chunk, ChunkId, RowId};
use re_log_types::{EntityPath, StoreId, StoreInfo, TimeInt, Timeline};
use re_types_core::{ComponentDescriptor, ComponentName};

Expand Down Expand Up @@ -734,12 +734,7 @@ impl ChunkStore {
anyhow::bail!("unknown store ID: {store_id}");
};

let transport = TransportChunk {
schema: msg.schema.clone(),
data: msg.chunk.clone(),
};

let chunk = Chunk::from_transport(&transport)
let chunk = Chunk::from_arrow_msg(&msg)
.with_context(|| format!("couldn't decode chunk {path_to_rrd:?}"))?;

store
Expand Down Expand Up @@ -787,12 +782,7 @@ impl ChunkStore {
anyhow::bail!("unknown store ID: {store_id}");
};

let transport = TransportChunk {
schema: msg.schema.clone(),
data: msg.chunk.clone(),
};

let chunk = Chunk::from_transport(&transport)
let chunk = Chunk::from_arrow_msg(&msg)
.with_context(|| "couldn't decode chunk".to_owned())?;

store
Expand Down
13 changes: 5 additions & 8 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1268,10 +1268,10 @@ impl<E: StorageEngineLike> QueryHandle<E> {
/// See [`Self::next_row`] for more information.
#[inline]
pub fn next_row_batch(&self) -> Option<RecordBatch> {
Some(RecordBatch {
schema: self.schema().clone(),
data: Arrow2Chunk::new(self.next_row_arrow2()?),
})
Some(RecordBatch::new(
self.schema().clone(),
Arrow2Chunk::new(self.next_row_arrow2()?),
))
}

#[inline]
Expand All @@ -1286,10 +1286,7 @@ impl<E: StorageEngineLike> QueryHandle<E> {
#[allow(clippy::unwrap_used)]
let schema = self.state.get().unwrap().arrow_schema.clone();

Some(RecordBatch {
schema,
data: Arrow2Chunk::new(row),
})
Some(RecordBatch::new(schema, Arrow2Chunk::new(row)))
}
}

Expand Down
9 changes: 3 additions & 6 deletions crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ pub fn store_info_from_catalog_chunk(
.as_any()
.downcast_ref::<Arrow2Utf8Array<i32>>()
.ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: format!("application_id must be a utf8 array: {:?}", tc.schema),
reason: format!("application_id must be a utf8 array: {:?}", tc.schema_ref()),
}))?
.value(0);

Expand All @@ -297,7 +297,7 @@ pub fn store_info_from_catalog_chunk(
.as_any()
.downcast_ref::<arrow2::array::Int64Array>()
.ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: format!("start_time must be an int64 array: {:?}", tc.schema),
reason: format!("start_time must be an int64 array: {:?}", tc.schema_ref()),
}))?
.value(0);

Expand Down Expand Up @@ -456,10 +456,7 @@ async fn stream_catalog_async(
);

// modified and enriched TransportChunk
let mut tc = TransportChunk {
schema,
data: arrow2::chunk::Chunk::new(arrays),
};
let mut tc = TransportChunk::new(schema, arrow2::chunk::Chunk::new(arrays));

tc.schema.metadata.insert(
TransportChunk::CHUNK_METADATA_KEY_ID.to_owned(),
Expand Down
2 changes: 2 additions & 0 deletions crates/store/re_log_encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ re_smart_channel.workspace = true
re_tracing.workspace = true

# External:
arrow = { workspace = true, features = ["ipc"] }
arrow2.workspace = true
parking_lot.workspace = true
thiserror.workspace = true
Expand All @@ -74,6 +75,7 @@ re_types.workspace = true
criterion.workspace = true
mimalloc.workspace = true
serde_test.workspace = true
similar-asserts.workspace = true

[lib]
bench = false
Expand Down
Loading

0 comments on commit 3a8c22f

Please sign in to comment.