Skip to content

Commit

Permalink
Less use of TransportChunk
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Jan 16, 2025
1 parent 21f418a commit 6c95b56
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 16 deletions.
16 changes: 9 additions & 7 deletions crates/store/re_log_encoding/src/codec/wire/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use crate::codec::arrow::write_arrow_to_bytes;
use crate::codec::CodecError;
use re_chunk::TransportChunk;
use re_protos::common::v0::RerunChunk;
use re_protos::remote_store::v0::DataframePart;

use arrow::array::RecordBatch as ArrowRecordBatch;

use crate::codec::arrow::write_arrow_to_bytes;
use crate::codec::CodecError;

/// Encode a transport chunk into a byte stream.
fn encode(
version: re_protos::common::v0::EncoderVersion,
chunk: &TransportChunk,
batch: &ArrowRecordBatch,
) -> Result<Vec<u8>, CodecError> {
match version {
re_protos::common::v0::EncoderVersion::V0 => {
let mut data: Vec<u8> = Vec::new();
write_arrow_to_bytes(&mut data, chunk)?;
write_arrow_to_bytes(&mut data, batch)?;
Ok(data)
}
}
Expand All @@ -23,7 +25,7 @@ pub trait Encode<O> {
fn encode(&self) -> Result<O, CodecError>;
}

impl Encode<DataframePart> for TransportChunk {
impl Encode<DataframePart> for ArrowRecordBatch {
fn encode(&self) -> Result<DataframePart, CodecError> {
let payload = encode(re_protos::common::v0::EncoderVersion::V0, self)?;
Ok(DataframePart {
Expand All @@ -33,7 +35,7 @@ impl Encode<DataframePart> for TransportChunk {
}
}

impl Encode<RerunChunk> for TransportChunk {
impl Encode<RerunChunk> for ArrowRecordBatch {
fn encode(&self) -> Result<RerunChunk, CodecError> {
let payload = encode(re_protos::common::v0::EncoderVersion::V0, self)?;
Ok(RerunChunk {
Expand Down
15 changes: 6 additions & 9 deletions rerun_py/src/remote.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![allow(unsafe_op_in_unsafe_fn)]
#![allow(unsafe_op_in_unsafe_fn)] // False positive due to #[pyfunction] macro

use std::collections::BTreeSet;

Expand All @@ -8,15 +8,16 @@ use arrow::{
ffi_stream::ArrowArrayStreamReader,
pyarrow::PyArrowType,
};
// False positive due to #[pyfunction] macro
use pyo3::{
exceptions::{PyRuntimeError, PyTypeError, PyValueError},
prelude::*,
types::PyDict,
Bound, PyResult,
};
use tokio_stream::StreamExt;

use re_arrow_util::ArrowArrayDowncastRef as _;
use re_chunk::{Chunk, TransportChunk};
use re_chunk::Chunk;
use re_chunk_store::ChunkStore;
use re_dataframe::{ChunkStoreHandle, QueryExpression, SparseFillStrategy, ViewContentsSelector};
use re_grpc_client::TonicStatusError;
Expand All @@ -32,7 +33,6 @@ use re_protos::{
TypeConversionError,
};
use re_sdk::{ApplicationId, ComponentName, StoreId, StoreKind, Time, Timeline};
use tokio_stream::StreamExt;

use crate::dataframe::{ComponentLike, PyRecording, PyRecordingHandle, PyRecordingView, PySchema};

Expand Down Expand Up @@ -321,8 +321,7 @@ impl PyStorageNodeClient {
));
}

let metadata_tc = TransportChunk::from(metadata);
metadata_tc
metadata
.encode()
.map_err(|err| PyRuntimeError::new_err(err.to_string()))
})
Expand Down Expand Up @@ -388,11 +387,9 @@ impl PyStorageNodeClient {
));
}

let metadata_tc = TransportChunk::from(metadata);

let request = UpdateCatalogRequest {
metadata: Some(
metadata_tc
metadata
.encode()
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?,
),
Expand Down

0 comments on commit 6c95b56

Please sign in to comment.