Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to RustDDS with DeserializeSeed support #20

Merged
merged 5 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ security = [

[dependencies]

# rustdds = { path = "../RustDDS" } # dev setting
# rustdds = { git = "https://github.com/jhelovuo/RustDDS.git" }
rustdds = { version = "~0.9.2" } # release setting
# rustdds = { path = "../RustDDS" } # dev setting
rustdds = { git = "https://github.com/jhelovuo/RustDDS.git", branch = "deserialize-with-seed" }
# rustdds = { version = "~0.9.2" } # release setting

mio = "^0.6.23"
mio-extras = "2.0.6"
Expand Down
4 changes: 2 additions & 2 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::path::{Path, PathBuf};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
//use mio::Evented;
use serde::{de::DeserializeOwned, Serialize};
use serde::Serialize;
use rustdds::{
dds::CreateResult,
no_key::{DeserializerAdapter, SerializerAdapter},
Expand Down Expand Up @@ -251,7 +251,7 @@ impl Context {
qos: Option<QosPolicies>,
) -> dds::CreateResult<Subscription<M>>
where
M: 'static + DeserializeOwned,
M: 'static,
{
let datareader = self
.get_ros_default_subscriber()
Expand Down
4 changes: 2 additions & 2 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::{pin_mut, stream::FusedStream, FutureExt, Stream, StreamExt};
use async_channel::Receiver;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use serde::{de::DeserializeOwned, Serialize};
use serde::Serialize;
use rustdds::{dds::CreateError, dds::CreateResult, *};

use crate::{
Expand Down Expand Up @@ -1146,7 +1146,7 @@ impl Node {
/// * `topic` - Reference to topic created with `create_ros_topic`.
/// * `qos` - Should take [QOS](../dds/qos/struct.QosPolicies.html) and use if
/// it's compatible with topics QOS. `None` indicates the use of Topics QOS.
pub fn create_subscription<D: DeserializeOwned + 'static>(
pub fn create_subscription<D: 'static>(
&mut self,
topic: &Topic,
qos: Option<QosPolicies>,
Expand Down
52 changes: 44 additions & 8 deletions src/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::io;
use std::{io, marker::PhantomData};

use mio::{Evented, Poll, PollOpt, Ready, Token};
use futures::{
pin_mut,
stream::{FusedStream, StreamExt},
stream::{FusedStream, StreamExt}, Stream,
};
use rustdds::{
dds::{ReadError, ReadResult, WriteResult},
*,
serialization::CdrDeserializeSeedDecoder,
};
use serde::{de::DeserializeOwned, Serialize};

Expand Down Expand Up @@ -94,16 +95,49 @@ impl<M: Serialize> Publisher<M> {
///
/// Corresponds to a (simplified) [`DataReader`](rustdds::no_key::DataReader) in
/// DDS
pub struct Subscription<M: DeserializeOwned> {
pub struct Subscription<M> {
datareader: no_key::SimpleDataReaderCdr<M>,
}

impl<M: 'static + DeserializeOwned> Subscription<M> {
impl<M> Subscription<M>
where
M: 'static,
{
// These must be created from Node
pub(crate) fn new(datareader: no_key::SimpleDataReaderCdr<M>) -> Subscription<M> {
Subscription { datareader }
}

pub fn take_seed<'de, S>(&self, seed: S) -> ReadResult<Option<(M, MessageInfo)>>
where
S: serde::de::DeserializeSeed<'de, Value = M> + Clone,
M: 'static,
{
self.datareader.drain_read_notifications();
let decoder = CdrDeserializeSeedDecoder::new(seed, PhantomData::<()>);
let ds: Option<no_key::DeserializedCacheChange<M>> =
self.datareader.try_take_one_with(decoder)?;
Ok(ds.map(dcc_to_value_and_messageinfo))
}

// Returns an async Stream of messages with MessageInfo metadata
pub fn async_stream_seed<'a, 'de, S>(
&'a self,
seed: S,
) -> impl Stream<Item = ReadResult<(M, MessageInfo)>> + FusedStream + 'a
where
S: serde::de::DeserializeSeed<'de, Value = M> + Clone + 'a,
M: 'static,
{
let decoder = CdrDeserializeSeedDecoder::new(seed, PhantomData::<()>);
self
.datareader
.as_async_stream_with(decoder)
.map(|result| result.map(dcc_to_value_and_messageinfo))
}
}

impl<M: 'static + DeserializeOwned> Subscription<M> {
pub fn take(&self) -> ReadResult<Option<(M, MessageInfo)>> {
self.datareader.drain_read_notifications();
let ds: Option<no_key::DeserializedCacheChange<M>> = self.datareader.try_take_one()?;
Expand All @@ -130,7 +164,12 @@ impl<M: 'static + DeserializeOwned> Subscription<M> {
.as_async_stream()
.map(|result| result.map(dcc_to_value_and_messageinfo))
}
}

impl<M> Subscription<M>
where
M: 'static,
{
pub fn guid(&self) -> rustdds::GUID {
self.datareader.guid()
}
Expand Down Expand Up @@ -159,10 +198,7 @@ impl<M: 'static + DeserializeOwned> Subscription<M> {

// helper
#[inline]
fn dcc_to_value_and_messageinfo<M>(dcc: no_key::DeserializedCacheChange<M>) -> (M, MessageInfo)
where
M: DeserializeOwned,
{
fn dcc_to_value_and_messageinfo<M>(dcc: no_key::DeserializedCacheChange<M>) -> (M, MessageInfo) {
let mi = MessageInfo::from(&dcc);
(dcc.into_value(), mi)
}
Expand Down
26 changes: 25 additions & 1 deletion src/service/wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,36 @@ impl<RW> ServiceDeserializerAdapter<RW> {

impl<RW: Wrapper> no_key::DeserializerAdapter<RW> for ServiceDeserializerAdapter<RW> {
type Error = ReadError;
type Decoded = RW;

fn supported_encodings() -> &'static [RepresentationIdentifier] {
&Self::REPR_IDS
}

fn from_bytes(input_bytes: &[u8], encoding: RepresentationIdentifier) -> ReadResult<RW> {
fn transform_decoded(decoded: Self::Decoded) -> RW {
decoded
}
}

impl<RW: Wrapper> no_key::DefaultDecoder<RW> for ServiceDeserializerAdapter<RW> {
type Decoder = WrapperDecoder;
const DECODER: Self::Decoder = WrapperDecoder;
}

#[derive(Clone)]
pub struct WrapperDecoder;

impl<RW> no_key::Decode<RW> for WrapperDecoder
where
RW: Wrapper,
{
type Error = ReadError;

fn decode_bytes(
self,
input_bytes: &[u8],
encoding: RepresentationIdentifier,
) -> Result<RW, Self::Error> {
Ok(RW::from_bytes_and_ri(input_bytes, encoding))
}
}
Expand Down