From 08f607bd04b84f9a77b50f29d053a85faa8b3545 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Mon, 18 Dec 2023 18:12:41 +0100 Subject: [PATCH] Add support for subscribe with seed-based deserialization --- src/context.rs | 4 ++-- src/node.rs | 4 ++-- src/pubsub.rs | 50 +++++++++++++++++++++++++++++++++++------ src/service/wrappers.rs | 26 ++++++++++++++++++++- 4 files changed, 72 insertions(+), 12 deletions(-) diff --git a/src/context.rs b/src/context.rs index f022952f..b7a77619 100644 --- a/src/context.rs +++ b/src/context.rs @@ -9,7 +9,7 @@ use std::path::{Path, PathBuf}; #[allow(unused_imports)] use log::{debug, error, info, trace, warn}; //use mio::Evented; -use serde::{de::DeserializeOwned, Serialize}; +use serde::Serialize; use rustdds::{ dds::CreateResult, no_key::{DeserializerAdapter, SerializerAdapter}, @@ -251,7 +251,7 @@ impl Context { qos: Option, ) -> dds::CreateResult> where - M: 'static + DeserializeOwned, + M: 'static, { let datareader = self .get_ros_default_subscriber() diff --git a/src/node.rs b/src/node.rs index 901e32a0..609b1adf 100644 --- a/src/node.rs +++ b/src/node.rs @@ -7,7 +7,7 @@ use futures::{pin_mut, stream::FusedStream, FutureExt, Stream, StreamExt}; use async_channel::Receiver; #[allow(unused_imports)] use log::{debug, error, info, trace, warn}; -use serde::{de::DeserializeOwned, Serialize}; +use serde::Serialize; use rustdds::{dds::CreateError, dds::CreateResult, *}; use crate::{ @@ -1146,7 +1146,7 @@ impl Node { /// * `topic` - Reference to topic created with `create_ros_topic`. /// * `qos` - Should take [QOS](../dds/qos/struct.QosPolicies.html) and use if /// it's compatible with topics QOS. `None` indicates the use of Topics QOS. - pub fn create_subscription( + pub fn create_subscription( &mut self, topic: &Topic, qos: Option, diff --git a/src/pubsub.rs b/src/pubsub.rs index 22623d68..e9c608db 100644 --- a/src/pubsub.rs +++ b/src/pubsub.rs @@ -3,11 +3,12 @@ use std::io; use mio::{Evented, Poll, PollOpt, Ready, Token}; use futures::{ pin_mut, - stream::{FusedStream, StreamExt}, + stream::{FusedStream, StreamExt}, Stream, }; use rustdds::{ dds::{ReadError, ReadResult, WriteResult}, *, + serialization::CdrDeserializeSeedDecoder, }; use serde::{de::DeserializeOwned, Serialize}; @@ -94,16 +95,49 @@ impl Publisher { /// /// Corresponds to a (simplified) [`DataReader`](rustdds::no_key::DataReader) in /// DDS -pub struct Subscription { +pub struct Subscription { datareader: no_key::SimpleDataReaderCdr, } -impl Subscription { +impl Subscription +where + M: 'static, +{ // These must be created from Node pub(crate) fn new(datareader: no_key::SimpleDataReaderCdr) -> Subscription { Subscription { datareader } } + pub fn take_seed<'de, S>(&self, seed: S) -> ReadResult> + where + S: serde::de::DeserializeSeed<'de, Value = M>, + M: 'static, + { + self.datareader.drain_read_notifications(); + let decoder = CdrDeserializeSeedDecoder::from(seed); + let ds: Option> = + self.datareader.try_take_one_with(decoder)?; + Ok(ds.map(dcc_to_value_and_messageinfo)) + } + + // Returns an async Stream of messages with MessageInfo metadata + pub fn async_stream_seed<'a, 'de, S>( + &'a self, + seed: S, + ) -> impl Stream> + FusedStream + 'a + where + S: serde::de::DeserializeSeed<'de, Value = M> + Clone + 'a, + M: 'static, + { + let decoder = CdrDeserializeSeedDecoder::from(seed); + self + .datareader + .as_async_stream_with(decoder) + .map(|result| result.map(dcc_to_value_and_messageinfo)) + } +} + +impl Subscription { pub fn take(&self) -> ReadResult> { self.datareader.drain_read_notifications(); let ds: Option> = self.datareader.try_take_one()?; @@ -130,7 +164,12 @@ impl Subscription { .as_async_stream() .map(|result| result.map(dcc_to_value_and_messageinfo)) } +} +impl Subscription +where + M: 'static, +{ pub fn guid(&self) -> rustdds::GUID { self.datareader.guid() } @@ -159,10 +198,7 @@ impl Subscription { // helper #[inline] -fn dcc_to_value_and_messageinfo(dcc: no_key::DeserializedCacheChange) -> (M, MessageInfo) -where - M: DeserializeOwned, -{ +fn dcc_to_value_and_messageinfo(dcc: no_key::DeserializedCacheChange) -> (M, MessageInfo) { let mi = MessageInfo::from(&dcc); (dcc.into_value(), mi) } diff --git a/src/service/wrappers.rs b/src/service/wrappers.rs index 4d9f56ca..4d71135d 100644 --- a/src/service/wrappers.rs +++ b/src/service/wrappers.rs @@ -334,12 +334,36 @@ impl ServiceDeserializerAdapter { impl no_key::DeserializerAdapter for ServiceDeserializerAdapter { type Error = ReadError; + type Deserialized = RW; fn supported_encodings() -> &'static [RepresentationIdentifier] { &Self::REPR_IDS } - fn from_bytes(input_bytes: &[u8], encoding: RepresentationIdentifier) -> ReadResult { + fn transform_deserialized(deserialized: Self::Deserialized) -> RW { + deserialized + } +} + +impl no_key::DefaultDecoder for ServiceDeserializerAdapter { + type Decoder = WrapperDecoder; + const DECODER: Self::Decoder = WrapperDecoder; +} + +#[derive(Clone)] +pub struct WrapperDecoder; + +impl no_key::Decode for WrapperDecoder +where + RW: Wrapper, +{ + type Error = ReadError; + + fn decode_bytes( + self, + input_bytes: &[u8], + encoding: RepresentationIdentifier, + ) -> Result { Ok(RW::from_bytes_and_ri(input_bytes, encoding)) } }