Skip to content

Commit

Permalink
Switch back to using arrow1 for IPC serialization
Browse files Browse the repository at this point in the history
Until the fix for apache/arrow-rs#6803
has been released
  • Loading branch information
emilk committed Jan 13, 2025
1 parent a2fc981 commit 5eaf3b4
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6089,6 +6089,7 @@ dependencies = [
"lz4_flex",
"mimalloc",
"parking_lot",
"re_arrow2",
"re_build_info",
"re_chunk",
"re_log",
Expand Down
1 change: 1 addition & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ disallowed-names = []

# https://rust-lang.github.io/rust-clippy/master/index.html#disallowed_types
disallowed-types = [
{ path = "arrow::ipc::writer::StreamWriter", reason = "Wait until https://github.com/apache/arrow-rs/pull/6805 has been released" },
{ path = "egui::Checkbox", reason = "Use `re_checkbox` from `re_ui::UiEx" },
{ path = "ring::digest::SHA1_FOR_LEGACY_USE_ONLY", reason = "SHA1 is cryptographically broken" },
{ path = "std::sync::Condvar", reason = "Use parking_lot instead" },
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_log_encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ re_tracing.workspace = true

# External:
arrow = { workspace = true, features = ["ipc"] }
arrow2.workspace = true
parking_lot.workspace = true
thiserror.workspace = true

Expand Down
26 changes: 23 additions & 3 deletions crates/store/re_log_encoding/src/codec/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,30 @@ pub(crate) fn write_arrow_to_bytes<W: std::io::Write>(
writer: &mut W,
batch: &ArrowRecordBatch,
) -> Result<(), CodecError> {
let mut sw = ipc::writer::StreamWriter::try_new(writer, batch.schema_ref())
// TODO(#3741): switch to arrow1 once https://github.com/apache/arrow-rs/issues/6803 is released
// let mut sw = ipc::writer::StreamWriter::try_new(writer, batch.schema_ref())
// .map_err(CodecError::ArrowSerialization)?;
// sw.write(batch).map_err(CodecError::ArrowSerialization)?;
// sw.finish().map_err(CodecError::ArrowSerialization)?;

let schema = arrow2::datatypes::Schema::from(batch.schema());
let chunk = arrow2::chunk::Chunk::new(
batch
.columns()
.iter()
.map(|c| -> Box<dyn arrow2::array::Array> { c.clone().into() })
.collect(),
);

let mut writer = arrow2::io::ipc::write::StreamWriter::new(writer, Default::default());
writer
.start(&schema, None)
.map_err(CodecError::ArrowSerialization)?;
sw.write(batch).map_err(CodecError::ArrowSerialization)?;
sw.finish().map_err(CodecError::ArrowSerialization)?;
writer
.write(&chunk, None)
.map_err(CodecError::ArrowSerialization)?;
writer.finish().map_err(CodecError::ArrowSerialization)?;

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_log_encoding/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod wire;
#[derive(Debug, thiserror::Error)]
pub enum CodecError {
#[error("Arrow IPC serialization error: {0}")]
ArrowSerialization(::arrow::error::ArrowError),
ArrowSerialization(::arrow2::error::Error),

#[error("Invalid Chunk: {0}")]
InvalidChunk(::arrow::error::ArrowError),
Expand Down
48 changes: 36 additions & 12 deletions crates/store/re_log_types/src/arrow_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,48 @@ impl serde::Serialize for ArrowMsg {
S: serde::Serializer,
{
re_tracing::profile_scope!("ArrowMsg::serialize");

use arrow::ipc::writer::StreamWriter;
use serde::ser::SerializeTuple;

let mut buf = Vec::<u8>::new();
let mut writer = StreamWriter::try_new(&mut buf, self.batch.schema_ref())
.map_err(|err| serde::ser::Error::custom(err.to_string()))?;
writer
.write(&self.batch)
.map_err(|err| serde::ser::Error::custom(err.to_string()))?;
writer
.finish()
.map_err(|err| serde::ser::Error::custom(err.to_string()))?;
let mut ipc_bytes = Vec::<u8>::new();

// TODO(emilk): switch to arrow1 once https://github.com/apache/arrow-rs/issues/6803 is released
// use arrow::ipc::writer::StreamWriter;
// let mut writer = StreamWriter::try_new(&mut ipc_bytes, self.batch.schema_ref())
// .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
// writer
// .write(&self.batch)
// .map_err(|err| serde::ser::Error::custom(err.to_string()))?;
// writer
// .finish()
// .map_err(|err| serde::ser::Error::custom(err.to_string()))?;

{
let schema = arrow2::datatypes::Schema::from(self.batch.schema());
let chunk = arrow2::chunk::Chunk::new(
self.batch
.columns()
.iter()
.map(|c| -> Box<dyn arrow2::array::Array> { c.clone().into() })
.collect(),
);

let mut writer =
arrow2::io::ipc::write::StreamWriter::new(&mut ipc_bytes, Default::default());
writer
.start(&schema, None)
.map_err(|err| serde::ser::Error::custom(err.to_string()))?;
writer
.write(&chunk, None)
.map_err(|err| serde::ser::Error::custom(err.to_string()))?;
writer
.finish()
.map_err(|err| serde::ser::Error::custom(err.to_string()))?;
}

let mut inner = serializer.serialize_tuple(3)?;
inner.serialize_element(&self.chunk_id)?;
inner.serialize_element(&self.timepoint_max)?;
inner.serialize_element(&serde_bytes::ByteBuf::from(buf))?;
inner.serialize_element(&serde_bytes::ByteBuf::from(ipc_bytes))?;
inner.end()
}
}
Expand Down

0 comments on commit 5eaf3b4

Please sign in to comment.