Skip to content

Commit

Permalink
Simplify publisher type by embedding channel parameter in Bus trait
Browse files Browse the repository at this point in the history
  • Loading branch information
hdoordt committed Apr 25, 2023
1 parent ea9a335 commit c58be6f
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 60 deletions.
33 changes: 21 additions & 12 deletions src/chan/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,15 @@ impl DirectChannel {
}

/// Create a new [Publisher] that allows for publishing on the [DirectBus]
pub fn publisher<B: DirectBus>(&self) -> Publisher<Self, B> {
pub fn publisher<B: DirectBus<Chan = DirectChannel>>(&self) -> Publisher<B> {
debug!("Created publisher for direct bus {}", type_name::<B>());
Publisher {
chan: self.clone(),
_marker: PhantomData,
}
}
}

impl<'p, B> Publisher<DirectChannel, B>
impl<'p, B> Publisher<B>
where
B: DirectBus,
B::PublishPayload: Deserialize<'p> + Serialize,
Expand Down Expand Up @@ -159,7 +158,7 @@ pub mod tests {
});

let channel = DirectChannel::new(&connection).await.unwrap();
let publisher: Publisher<_, FrameBus> = channel.publisher();
let publisher: Publisher<FrameBus> = channel.publisher();

publisher
.publish(
Expand All @@ -181,16 +180,11 @@ pub mod tests {
#[macro_export]
macro_rules! direct_bus {
($doc:literal, $bus:ident, $publish_payload:ty, $args:ty, $queue:expr) => {
$crate::bus!($doc, $bus, $publish_payload);
$crate::bus!($doc, $bus);

impl $crate::DirectBus for $bus {
type Args = $args;
$crate::bus_impl!($bus, $crate::DirectChannel, $publish_payload);

fn queue(args: Self::Args) -> String {
#[allow(clippy::redundant_closure_call)]
($queue)(args)
}
}
$crate::direct_bus_impl!($bus, $args, $queue);
};
(doc = $doc:literal, bus = $bus:ident, publish = $publish_payload:ty, args = $args:ty, queue = $queue:expr) => {
$crate::direct_bus!($doc, $bus, $publish_payload, $args, $queue);
Expand All @@ -202,3 +196,18 @@ macro_rules! direct_bus {
$crate::direct_bus!($bus, $publish_payload, $args, $queue);
};
}

#[doc(hidden)]
#[macro_export]
macro_rules! direct_bus_impl {
($bus:ident, $args:ty, $queue:expr) => {
impl $crate::DirectBus for $bus {
type Args = $args;

fn queue(args: Self::Args) -> String {
#[allow(clippy::redundant_closure_call)]
($queue)(args)
}
}
};
}
52 changes: 39 additions & 13 deletions src/chan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub use topic::*;

/// A Bus. Base trait for several other buses
pub trait Bus: Unpin {
/// The [Channel] this bus is associated with.
type Chan: Channel;
/// The type of payload of the messages that are published on or consumed from it.
type PublishPayload;
}
Expand Down Expand Up @@ -72,14 +74,13 @@ where
/// of the [Bus::PublishPayload] type. [Publisher]s take care
/// of serializing the payloads before publishing.
#[derive(Clone)]
pub struct Publisher<C, B> {
chan: C,
_marker: PhantomData<B>,
pub struct Publisher<B: Bus> {
chan: B::Chan,
}

impl<C, B> Publisher<C, B>
impl<B> Publisher<B>
where
C: Channel,
B: Bus,
{
async fn publish_with_properties<'p, P>(
&self,
Expand Down Expand Up @@ -110,9 +111,12 @@ pub use tests::*;

#[cfg(test)]
mod tests {
use async_trait::async_trait;
use lapin::BasicProperties;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::bus;
use crate::{bus, bus_impl, Channel, Never};

pub const RABBIT_MQ_URL: &str = "amqp://tg:secret@localhost:5673";

Expand All @@ -121,22 +125,44 @@ mod tests {
pub message: String,
}

bus!("A frame bus", FrameBus, FramePayload);
#[async_trait]
impl Channel for Never {
async fn publish_with_properties(
&self,
_payload_bytes: &[u8],
_routing_key: &str,
_properties: BasicProperties,
_correlation_uuid: Uuid,
_reply_uuid: Option<Uuid>,
) -> crate::Result<()> {
unreachable!()
}
}

bus!("A frame bus", FrameBus);
bus_impl!(FrameBus, Never, FramePayload);
}

#[doc(hidden)]
#[macro_export]
macro_rules! bus {
($doc:literal, $name:ident, $publish_payload:ty) => {
($doc:literal, $bus:ident) => {
#[doc = $doc]
#[derive(Clone, Copy, Debug)]
pub enum $name {}
pub enum $bus {}
};
($bus:ident) => {
$crate::bus!("", $bus);
};
}

impl $crate::Bus for $name {
#[doc(hidden)]
#[macro_export]
macro_rules! bus_impl {
($bus:ident, $chan:ty, $publish_payload:ty) => {
impl $crate::Bus for $bus {
type Chan = $chan;
type PublishPayload = $publish_payload;
}
};
($name:ident, $publish_payload:ty) => {
$crate::bus!("", $name, $publish_payload);
};
}
40 changes: 25 additions & 15 deletions src/chan/rpc/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub trait RpcCommBus: Unpin {
}

impl<B: RpcCommBus> Bus for B {
type Chan = RpcChannel;
type PublishPayload = B::InitialPayload;
}

Expand All @@ -46,7 +47,7 @@ impl<B: RpcCommBus> DirectBus for B {
}
}

#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Deserialize, Serialize, Clone)]
#[doc(hidden)]
pub enum Never {}

Expand All @@ -57,6 +58,7 @@ pub struct BackReply<B> {
}

impl<B: RpcCommBus> Bus for BackReply<B> {
type Chan = RpcChannel;
type PublishPayload = B::BackPayload;
}

Expand All @@ -79,6 +81,7 @@ pub struct ForthReply<B> {
}

impl<B: RpcCommBus> Bus for ForthReply<B> {
type Chan = RpcChannel;
type PublishPayload = B::ForthPayload;
}

Expand Down Expand Up @@ -120,16 +123,15 @@ impl RpcChannel {
}

/// Setup a new [Publisher] associated wit the [RpcCommBus].
pub fn comm_publisher<B: RpcCommBus>(&self) -> Publisher<Self, B> {
pub fn comm_publisher<B: RpcCommBus>(&self) -> Publisher<B> {
debug!("Created comm publisher for RPC bus {}", type_name::<B>());
Publisher {
chan: self.clone(),
_marker: PhantomData,
}
}
}

impl<'r, 'p, B> Publisher<RpcChannel, B>
impl<'r, 'p, B> Publisher<B>
where
B: RpcCommBus,
B::InitialPayload: Deserialize<'r> + Serialize,
Expand Down Expand Up @@ -280,17 +282,7 @@ macro_rules! rpc_comm_bus {
#[derive(Clone, Copy, Debug)]
pub enum $bus {}

impl $crate::RpcCommBus for $bus {
type InitialPayload = $initial_payload;
type BackPayload = $back_payload;
type ForthPayload = $forth_payload;
type Args = $args;

fn queue(args: Self::Args) -> String {
#[allow(clippy::redundant_closure_call)]
($queue)(args)
}
}
$crate::rpc_comm_bus_impl!($bus, $initial_payload, $back_payload, $forth_payload, $args, $queue);
};
(doc = $doc:literal, bus = $bus:ident, initial = $initial_payload:ty, back = $back_payload:ty, forth = $forth_payload:ty, args = $args:ty, queue = $queue:expr) => {
$crate::rpc_comm_bus!(
Expand Down Expand Up @@ -325,3 +317,21 @@ macro_rules! rpc_comm_bus {
);
};
}

#[doc(hidden)]
#[macro_export]
macro_rules! rpc_comm_bus_impl {
($bus:ty, $back_payload:ty, $initial_payload:ty, $forth_payload:ty, $args:ty, $queue:expr) => {
impl $crate::RpcCommBus for $bus {
type InitialPayload = $initial_payload;
type BackPayload = $back_payload;
type ForthPayload = $forth_payload;
type Args = $args;

fn queue(args: Self::Args) -> String {
#[allow(clippy::redundant_closure_call)]
($queue)(args)
}
}
};
}
32 changes: 22 additions & 10 deletions src/chan/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct Reply<B> {
}

impl<B: RpcBus> Bus for Reply<B> {
type Chan = RpcChannel;
type PublishPayload = B::ReplyPayload;
}

Expand Down Expand Up @@ -177,11 +178,10 @@ impl RpcChannel {
}

/// Create a new [Publisher] that allows for publishing on the [RpcBus]
pub fn publisher<B: RpcBus>(&self) -> Publisher<Self, B> {
pub fn publisher<B: RpcBus<Chan = Self>>(&self) -> Publisher<B> {
debug!("Created publisher for RPC bus {}", type_name::<B>());
Publisher {
chan: self.clone(),
_marker: PhantomData,
}
}
}
Expand Down Expand Up @@ -214,9 +214,9 @@ impl Channel for RpcChannel {
}
}

impl<'r, 'p, B> Publisher<RpcChannel, B>
impl<'r, 'p, B> Publisher<B>
where
B: RpcBus,
B: RpcBus<Chan = RpcChannel>,
B::PublishPayload: Deserialize<'p> + Serialize,
B::ReplyPayload: Deserialize<'r> + Serialize,
{
Expand Down Expand Up @@ -398,7 +398,7 @@ mod tests {
});

let channel = RpcChannel::new(&connection).await.unwrap();
let publisher: Publisher<_, FrameBus> = channel.publisher();
let publisher: Publisher<FrameBus> = channel.publisher();

let mut rx = publisher
.publish_recv_many(
Expand Down Expand Up @@ -437,7 +437,7 @@ mod tests {
});

let channel = RpcChannel::new(&connection).await.unwrap();
let publisher: Publisher<_, FrameBus> = channel.publisher();
let publisher: Publisher<FrameBus> = channel.publisher();

let fut = publisher
.publish_recv_one(
Expand All @@ -459,11 +459,13 @@ mod tests {
/// Declare a new [RpcBus].
macro_rules! rpc_bus {
($doc:literal, $bus:ident, $publish_payload:ty, $reply_payload:ty, $args:ty, $queue:expr) => {
$crate::direct_bus!($doc, $bus, $publish_payload, $args, $queue);
$crate::bus!($doc, $bus);

impl $crate::RpcBus for $bus {
type ReplyPayload = $reply_payload;
}
$crate::bus_impl!($bus, $crate::RpcChannel, $publish_payload);

$crate::direct_bus_impl!($bus, $args, $queue);

$crate::rpc_bus_impl!($bus, $reply_payload);
};
(doc = $doc:literal, bus = $bus:ident, publish = $publish_payload:ty, reply = $reply_payload:ty, args = $args:ty, queue = $queue:expr) => {
$crate::rpc_bus!($doc, $bus, $publish_payload, $reply_payload, $args, $queue);
Expand All @@ -475,3 +477,13 @@ macro_rules! rpc_bus {
$crate::rpc_bus!($bus, $publish_payload, $reply_payload, $args, $queue);
};
}

#[doc(hidden)]
#[macro_export]
macro_rules! rpc_bus_impl {
($bus:ident, $reply_payload:ty) => {
impl $crate::RpcBus for $bus {
type ReplyPayload = $reply_payload;
}
};
}
Loading

0 comments on commit c58be6f

Please sign in to comment.