Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework python ROS2 (de)serialization using parsed ROS2 messages directly #415

Merged
merged 14 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions binaries/runtime/src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod channel;
mod python;
mod shared_lib;

#[allow(unused_variables)]
pub fn run_operator(
node_id: &NodeId,
operator_definition: OperatorDefinition,
Expand Down
21 changes: 11 additions & 10 deletions libraries/arrow-convert/src/from_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl TryFrom<&ArrowData> for u8 {
fn try_from(value: &ArrowData) -> Result<Self, Self::Error> {
let array = value
.as_primitive_opt::<arrow::datatypes::UInt8Type>()
.context("not a primitive array")?;
.context("not a primitive UInt8Type array")?;
extract_single_primitive(array)
}
}
Expand All @@ -48,7 +48,7 @@ impl TryFrom<&ArrowData> for u16 {
fn try_from(value: &ArrowData) -> Result<Self, Self::Error> {
let array = value
.as_primitive_opt::<arrow::datatypes::UInt16Type>()
.context("not a primitive array")?;
.context("not a primitive UInt16Type array")?;
extract_single_primitive(array)
}
}
Expand All @@ -57,7 +57,7 @@ impl TryFrom<&ArrowData> for u32 {
fn try_from(value: &ArrowData) -> Result<Self, Self::Error> {
let array = value
.as_primitive_opt::<arrow::datatypes::UInt32Type>()
.context("not a primitive array")?;
.context("not a primitive UInt32Type array")?;
extract_single_primitive(array)
}
}
Expand All @@ -66,7 +66,7 @@ impl TryFrom<&ArrowData> for u64 {
fn try_from(value: &ArrowData) -> Result<Self, Self::Error> {
let array = value
.as_primitive_opt::<arrow::datatypes::UInt64Type>()
.context("not a primitive array")?;
.context("not a primitive UInt64Type array")?;
extract_single_primitive(array)
}
}
Expand All @@ -75,7 +75,7 @@ impl TryFrom<&ArrowData> for i8 {
fn try_from(value: &ArrowData) -> Result<Self, Self::Error> {
let array = value
.as_primitive_opt::<arrow::datatypes::Int8Type>()
.context("not a primitive array")?;
.context("not a primitive Int8Type array")?;
extract_single_primitive(array)
}
}
Expand All @@ -84,7 +84,7 @@ impl TryFrom<&ArrowData> for i16 {
fn try_from(value: &ArrowData) -> Result<Self, Self::Error> {
let array = value
.as_primitive_opt::<arrow::datatypes::Int16Type>()
.context("not a primitive array")?;
.context("not a primitive Int16Type array")?;
extract_single_primitive(array)
}
}
Expand All @@ -93,7 +93,7 @@ impl TryFrom<&ArrowData> for i32 {
fn try_from(value: &ArrowData) -> Result<Self, Self::Error> {
let array = value
.as_primitive_opt::<arrow::datatypes::Int32Type>()
.context("not a primitive array")?;
.context("not a primitive Int32Type array")?;
extract_single_primitive(array)
}
}
Expand All @@ -102,7 +102,7 @@ impl TryFrom<&ArrowData> for i64 {
fn try_from(value: &ArrowData) -> Result<Self, Self::Error> {
let array = value
.as_primitive_opt::<arrow::datatypes::Int64Type>()
.context("not a primitive array")?;
.context("not a primitive Int64Type array")?;
extract_single_primitive(array)
}
}
Expand All @@ -127,8 +127,9 @@ impl<'a> TryFrom<&'a ArrowData> for &'a str {
impl<'a> TryFrom<&'a ArrowData> for &'a [u8] {
type Error = eyre::Report;
fn try_from(value: &'a ArrowData) -> Result<Self, Self::Error> {
let array: &PrimitiveArray<arrow::datatypes::UInt8Type> =
value.as_primitive_opt().wrap_err("not a primitive array")?;
let array: &PrimitiveArray<arrow::datatypes::UInt8Type> = value
.as_primitive_opt()
.wrap_err("not a primitive UInt8Type array")?;
if array.null_count() != 0 {
eyre::bail!("array has nulls");
}
Expand Down
27 changes: 15 additions & 12 deletions libraries/extensions/ros2-bridge/python/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{
borrow::Cow,
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};

use ::dora_ros2_bridge::{ros2_client, rustdds};
use arrow::{
array::ArrayData,
array::{make_array, ArrayData},
pyarrow::{FromPyArrow, ToPyArrow},
};
use dora_ros2_bridge_msg_gen::types::Message;
Expand All @@ -17,7 +18,7 @@ use pyo3::{
types::{PyDict, PyList, PyModule},
PyAny, PyObject, PyResult, Python,
};
use typed::{deserialize::TypedDeserializer, for_message, TypeInfo, TypedValue};
use typed::{deserialize::StructDeserializer, TypeInfo, TypedValue};

pub mod qos;
pub mod typed;
Expand Down Expand Up @@ -52,6 +53,7 @@ impl Ros2Context {
ament_prefix_path_parsed.split(':').map(Path::new).collect()
}
};

let packages = dora_ros2_bridge_msg_gen::get_packages(&paths)
.map_err(|err| eyre!(err))
.context("failed to parse ROS2 message types")?;
Expand Down Expand Up @@ -111,10 +113,11 @@ impl Ros2Node {
let topic = self
.node
.create_topic(&topic_name, message_type_name, &qos.into())?;
let type_info =
for_message(&self.messages, namespace_name, message_name).with_context(|| {
format!("failed to determine type info for message {namespace_name}/{message_name}")
})?;
let type_info = TypeInfo {
package_name: namespace_name.to_owned().into(),
message_name: message_name.to_owned().into(),
messages: self.messages.clone(),
};

Ok(Ros2Topic { topic, type_info })
}
Expand Down Expand Up @@ -143,7 +146,7 @@ impl Ros2Node {
.create_subscription(&topic.topic, qos.map(Into::into))?;
Ok(Ros2Subscription {
subscription: Some(subscription),
deserializer: TypedDeserializer::new(topic.type_info.clone()),
deserializer: StructDeserializer::new(Cow::Owned(topic.type_info.clone())),
})
}
}
Expand Down Expand Up @@ -175,14 +178,14 @@ impl From<Ros2NodeOptions> for ros2_client::NodeOptions {
#[non_exhaustive]
pub struct Ros2Topic {
topic: rustdds::Topic,
type_info: TypeInfo,
type_info: TypeInfo<'static>,
}

#[pyclass]
#[non_exhaustive]
pub struct Ros2Publisher {
publisher: ros2_client::Publisher<TypedValue<'static>>,
type_info: TypeInfo,
type_info: TypeInfo<'static>,
}

#[pymethods]
Expand All @@ -209,7 +212,7 @@ impl Ros2Publisher {
//// add type info to ensure correct serialization (e.g. struct types
//// and map types need to be serialized differently)
let typed_value = TypedValue {
value: &value,
value: &make_array(value),
type_info: &self.type_info,
};

Expand All @@ -224,7 +227,7 @@ impl Ros2Publisher {
#[pyclass]
#[non_exhaustive]
pub struct Ros2Subscription {
deserializer: TypedDeserializer,
deserializer: StructDeserializer<'static>,
subscription: Option<ros2_client::Subscription<ArrayData>>,
}

Expand Down Expand Up @@ -263,7 +266,7 @@ impl Ros2Subscription {
}

pub struct Ros2SubscriptionStream {
deserializer: TypedDeserializer,
deserializer: StructDeserializer<'static>,
subscription: ros2_client::Subscription<ArrayData>,
}

Expand Down
Loading
Loading