Skip to content

Commit

Permalink
Merge branch 'dora-rs-deserialize-seed-2'
Browse files Browse the repository at this point in the history
  • Loading branch information
jhelovuo committed Apr 15, 2024
2 parents d59bfdd + 34196be commit a7906c7
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 13 deletions.
4 changes: 2 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::path::{Path, PathBuf};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
//use mio::Evented;
use serde::{de::DeserializeOwned, Serialize};
use serde::Serialize;
use rustdds::{
dds::CreateResult,
no_key::{DeserializerAdapter, SerializerAdapter},
Expand Down Expand Up @@ -255,7 +255,7 @@ impl Context {
qos: Option<QosPolicies>,
) -> dds::CreateResult<Subscription<M>>
where
M: 'static + DeserializeOwned,
M: 'static,
{
let datareader = self
.get_ros_default_subscriber()
Expand Down
4 changes: 2 additions & 2 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::{pin_mut, stream::FusedStream, FutureExt, Stream, StreamExt};
use async_channel::Receiver;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use serde::{de::DeserializeOwned, Serialize};
use serde::Serialize;
use rustdds::{
dds::{CreateError, CreateResult},
*,
Expand Down Expand Up @@ -1255,7 +1255,7 @@ impl Node {
/// * `topic` - Reference to topic created with `create_ros_topic`.
/// * `qos` - Should take [QOS](../dds/qos/struct.QosPolicies.html) and use if
/// it's compatible with topics QOS. `None` indicates the use of Topics QOS.
pub fn create_subscription<D: DeserializeOwned + 'static>(
pub fn create_subscription<D: 'static>(
&mut self,
topic: &Topic,
qos: Option<QosPolicies>,
Expand Down
52 changes: 44 additions & 8 deletions src/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::io;
use std::{io, marker::PhantomData};

use mio::{Evented, Poll, PollOpt, Ready, Token};
use futures::{
pin_mut,
stream::{FusedStream, StreamExt},
stream::{FusedStream, StreamExt}, Stream,
};
use rustdds::{
dds::{ReadError, ReadResult, WriteResult},
*,
serialization::CdrDeserializeSeedDecoder,
};
use serde::{de::DeserializeOwned, Serialize};

Expand Down Expand Up @@ -94,16 +95,49 @@ impl<M: Serialize> Publisher<M> {
///
/// Corresponds to a (simplified) [`DataReader`](rustdds::no_key::DataReader) in
/// DDS
pub struct Subscription<M: DeserializeOwned> {
pub struct Subscription<M> {
datareader: no_key::SimpleDataReaderCdr<M>,
}

impl<M: 'static + DeserializeOwned> Subscription<M> {
impl<M> Subscription<M>
where
M: 'static,
{
// These must be created from Node
pub(crate) fn new(datareader: no_key::SimpleDataReaderCdr<M>) -> Subscription<M> {
Subscription { datareader }
}

pub fn take_seed<'de, S>(&self, seed: S) -> ReadResult<Option<(M, MessageInfo)>>
where
S: serde::de::DeserializeSeed<'de, Value = M> + Clone,
M: 'static,
{
self.datareader.drain_read_notifications();
let decoder = CdrDeserializeSeedDecoder::new(seed, PhantomData::<()>);
let ds: Option<no_key::DeserializedCacheChange<M>> =
self.datareader.try_take_one_with(decoder)?;
Ok(ds.map(dcc_to_value_and_messageinfo))
}

// Returns an async Stream of messages with MessageInfo metadata
pub fn async_stream_seed<'a, 'de, S>(
&'a self,
seed: S,
) -> impl Stream<Item = ReadResult<(M, MessageInfo)>> + FusedStream + 'a
where
S: serde::de::DeserializeSeed<'de, Value = M> + Clone + 'a,
M: 'static,
{
let decoder = CdrDeserializeSeedDecoder::new(seed, PhantomData::<()>);
self
.datareader
.as_async_stream_with(decoder)
.map(|result| result.map(dcc_to_value_and_messageinfo))
}
}

impl<M: 'static + DeserializeOwned> Subscription<M> {
pub fn take(&self) -> ReadResult<Option<(M, MessageInfo)>> {
self.datareader.drain_read_notifications();
let ds: Option<no_key::DeserializedCacheChange<M>> = self.datareader.try_take_one()?;
Expand All @@ -130,7 +164,12 @@ impl<M: 'static + DeserializeOwned> Subscription<M> {
.as_async_stream()
.map(|result| result.map(dcc_to_value_and_messageinfo))
}
}

impl<M> Subscription<M>
where
M: 'static,
{
pub fn guid(&self) -> rustdds::GUID {
self.datareader.guid()
}
Expand Down Expand Up @@ -159,10 +198,7 @@ impl<M: 'static + DeserializeOwned> Subscription<M> {

// helper
#[inline]
fn dcc_to_value_and_messageinfo<M>(dcc: no_key::DeserializedCacheChange<M>) -> (M, MessageInfo)
where
M: DeserializeOwned,
{
fn dcc_to_value_and_messageinfo<M>(dcc: no_key::DeserializedCacheChange<M>) -> (M, MessageInfo) {
let mi = MessageInfo::from(&dcc);
(dcc.into_value(), mi)
}
Expand Down
26 changes: 25 additions & 1 deletion src/service/wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,36 @@ impl<RW> ServiceDeserializerAdapter<RW> {

impl<RW: Wrapper> no_key::DeserializerAdapter<RW> for ServiceDeserializerAdapter<RW> {
type Error = ReadError;
type Decoded = RW;

fn supported_encodings() -> &'static [RepresentationIdentifier] {
&Self::REPR_IDS
}

fn from_bytes(input_bytes: &[u8], encoding: RepresentationIdentifier) -> ReadResult<RW> {
fn transform_decoded(decoded: Self::Decoded) -> RW {
decoded
}
}

impl<RW: Wrapper> no_key::DefaultDecoder<RW> for ServiceDeserializerAdapter<RW> {
type Decoder = WrapperDecoder;
const DECODER: Self::Decoder = WrapperDecoder;
}

#[derive(Clone)]
pub struct WrapperDecoder;

impl<RW> no_key::Decode<RW> for WrapperDecoder
where
RW: Wrapper,
{
type Error = ReadError;

fn decode_bytes(
self,
input_bytes: &[u8],
encoding: RepresentationIdentifier,
) -> Result<RW, Self::Error> {
Ok(RW::from_bytes_and_ri(input_bytes, encoding))
}
}
Expand Down

0 comments on commit a7906c7

Please sign in to comment.