Skip to content

Commit

Permalink
Improve test coverage (#1314)
Browse files Browse the repository at this point in the history
* Add peer_linkstate and router_linkstate tests

* Add three_node_combination_multicast test

* Remove unused Mux Primitve implementation

* Remove client multicast support

* Add linkstate liveliness tests
  • Loading branch information
OlivierHecart authored Oct 28, 2024
1 parent 4f51c01 commit 1a4a295
Show file tree
Hide file tree
Showing 3 changed files with 516 additions and 350 deletions.
314 changes: 1 addition & 313 deletions zenoh/src/net/primitives/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use zenoh_protocol::{
};
use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast};

use super::{EPrimitives, Primitives};
use super::EPrimitives;
use crate::net::routing::{
dispatcher::face::{Face, WeakFace},
interceptor::{InterceptorTrait, InterceptorsChain},
Expand All @@ -45,164 +45,6 @@ impl Mux {
}
}

impl Primitives for Mux {
fn send_interest(&self, msg: Interest) {
let msg = NetworkMessage {
body: NetworkBody::Interest(msg),
reliability: Reliability::Reliable,
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let Some(face) = face.upgrade() else {
tracing::debug!("Invalid face: {:?}. Interest not sent: {:?}", face, msg);
return;
};
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::debug!("Uninitialized multiplexer. Interest not sent: {:?}", msg);
}
}

fn send_declare(&self, msg: Declare) {
let msg = NetworkMessage {
body: NetworkBody::Declare(msg),
reliability: Reliability::Reliable,
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
}
}

fn send_push(&self, msg: Push, reliability: Reliability) {
let msg = NetworkMessage {
body: NetworkBody::Push(msg),
reliability,
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
}
}

fn send_request(&self, msg: Request) {
let msg = NetworkMessage {
body: NetworkBody::Request(msg),
reliability: Reliability::Reliable,
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
}
}

fn send_response(&self, msg: Response) {
let msg = NetworkMessage {
body: NetworkBody::Response(msg),
reliability: Reliability::Reliable,
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
}
}

fn send_response_final(&self, msg: ResponseFinal) {
let msg = NetworkMessage {
body: NetworkBody::ResponseFinal(msg),
reliability: Reliability::Reliable,
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
}
}

fn send_close(&self) {}
}

impl EPrimitives for Mux {
fn send_interest(&self, ctx: RoutingContext<Interest>) {
let ctx = RoutingContext {
Expand Down Expand Up @@ -377,160 +219,6 @@ impl McastMux {
}
}

impl Primitives for McastMux {
fn send_interest(&self, msg: Interest) {
let msg = NetworkMessage {
body: NetworkBody::Interest(msg),
reliability: Reliability::Reliable,
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
}
}

fn send_declare(&self, msg: Declare) {
let msg = NetworkMessage {
body: NetworkBody::Declare(msg),
reliability: Reliability::Reliable,
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
}
}

fn send_push(&self, msg: Push, reliability: Reliability) {
let msg = NetworkMessage {
body: NetworkBody::Push(msg),
reliability,
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
}
}

fn send_request(&self, msg: Request) {
let msg = NetworkMessage {
body: NetworkBody::Request(msg),
reliability: Reliability::Reliable,
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
}
}

fn send_response(&self, msg: Response) {
let msg = NetworkMessage {
body: NetworkBody::Response(msg),
reliability: Reliability::Reliable,
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
}
}

fn send_response_final(&self, msg: ResponseFinal) {
let msg = NetworkMessage {
body: NetworkBody::ResponseFinal(msg),
reliability: Reliability::Reliable,
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
}
}

fn send_close(&self) {}
}

impl EPrimitives for McastMux {
fn send_interest(&self, ctx: RoutingContext<Interest>) {
let ctx = RoutingContext {
Expand Down
30 changes: 0 additions & 30 deletions zenoh/src/net/routing/hat/client/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,26 +140,6 @@ fn declare_simple_subscription(
register_simple_subscription(tables, face, id, res, sub_info);

propagate_simple_subscription(tables, res, sub_info, face, send_declare);
// This introduced a buffer overflow on windows
// @TODO: Let's deactivate this on windows until Fixed
#[cfg(not(windows))]
for mcast_group in &tables.mcast_groups {
mcast_group
.primitives
.send_declare(RoutingContext::with_expr(
Declare {
interest_id: None,
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::DeclareSubscriber(DeclareSubscriber {
id: 0, // @TODO use proper SubscriberId
wire_expr: res.expr().into(),
}),
},
res.expr(),
))
}
}

#[inline]
Expand Down Expand Up @@ -418,16 +398,6 @@ impl HatPubSubTrait for HatCode {
}
}
}
for mcast_group in &tables.mcast_groups {
route.insert(
mcast_group.id,
(
mcast_group.clone(),
expr.full_expr().to_string().into(),
NodeId::default(),
),
);
}
Arc::new(route)
}

Expand Down
Loading

0 comments on commit 1a4a295

Please sign in to comment.