diff --git a/contracts/src/provider_proxy.rs b/contracts/src/provider_proxy.rs index f2695649..72e29c4f 100644 --- a/contracts/src/provider_proxy.rs +++ b/contracts/src/provider_proxy.rs @@ -20,8 +20,11 @@ pub struct SignalValue { } #[derive(Clone, Debug, Display, Eq, PartialEq)] +/// Successful return types when a proxy registers an entity pub enum EntityRegistration { + /// The Entity has been successfully registered by the proxy Registered, + /// The proxy has requested a loopback with new information for the proxy selector Loopback(Entity), } @@ -53,6 +56,7 @@ pub trait ProviderProxy { /// Registers an entity id to a local cache inside a provider proxy to keep track of which entities a provider proxy contains. /// If the operation is Subscribe for an entity, the expectation is subscribe will happen in this function after registering an entity. + /// Some proxies may return a 'Loopback' with new entity information for the proxy selector to use to select a different proxy. /// /// # Arguments /// - `entity_id`: the entity id to add diff --git a/provider_proxies/managed_subscribe/src/managed_subscribe_provider_proxy.rs b/provider_proxies/managed_subscribe/src/managed_subscribe_provider_proxy.rs index 1c665559..0fcb96bc 100644 --- a/provider_proxies/managed_subscribe/src/managed_subscribe_provider_proxy.rs +++ b/provider_proxies/managed_subscribe/src/managed_subscribe_provider_proxy.rs @@ -22,8 +22,8 @@ use freyja_contracts::{ provider_proxy::{ProviderProxy, ProviderProxyError, ProviderProxyErrorKind, SignalValue, EntityRegistration}, }; -/// Interfaces with providers which support GRPC. Based on the Ibeji mixed sample. -/// Note that the current implementation works on the assumption that there is a +/// Interfaces with providers which utilize 'Managed Subscribe'. Based on the Ibeji managed +/// subscribe sample. Note that the current implementation works on the assumption that there is a /// one-to-one mapping of topic to entity id. pub struct ManagedSubscribeProviderProxy { /// The proxy config @@ -70,7 +70,8 @@ impl ProviderProxy for ManagedSubscribeProviderProxy { /// Starts a provider proxy async fn start(&self) -> Result<(), ProviderProxyError> { - // Not relevant for this proxy, so passthrough. + // Not relevant for this proxy as the proxy is retrieving where to subscribe to and has no + // persistent state. Ok(()) } @@ -83,7 +84,8 @@ impl ProviderProxy for ManagedSubscribeProviderProxy { Ok(()) } - /// Loopback proxy that calls the 'Managed Subscribe' module in Ibeji to retrieve correct subscription endpoint. + /// Calls the 'Managed Subscribe' module in Ibeji to retrieve correct subscription endpoint and + /// returns a Loopback request to proxy selector /// /// # Arguments /// - `entity_id`: the entity id to add @@ -106,6 +108,7 @@ impl ProviderProxy for ManagedSubscribeProviderProxy { let mut client = self.client.clone(); + // Set the default frequency to recieve data at let default_freq_constraint = Constraint { r#type: self.config.frequency_constraint_type.clone(), value: self.config.frequency_constraint_value.clone(), @@ -116,23 +119,20 @@ impl ProviderProxy for ManagedSubscribeProviderProxy { constraints: vec![default_freq_constraint], }); - info!("Calling Managed Subscribe with {request:?}."); - let result = client .get_subscription_info(request) .await .map_err(ProviderProxyError::communication)?; - info!("Managed Subscribe returned {result:?}."); - let sub_info = result.into_inner(); + // The mqtt proxy supports v5 and v3 so do not need to make a distinction let mut protocol = sub_info.protocol; - if protocol.contains(MQTT_PROTOCOL) { protocol = MQTT_PROTOCOL.to_string(); } + // Construct endpoint information from returned result let endpoint = EntityEndpoint { protocol, operations: vec![SUBSCRIBE_OPERATION.to_string()], @@ -140,6 +140,7 @@ impl ProviderProxy for ManagedSubscribeProviderProxy { context: sub_info.context, }; + // Create new entity object with updated endpoint information. let new_entity = Entity { name: Some(entity_id.to_string()), id: entity_id.to_string(), @@ -147,7 +148,7 @@ impl ProviderProxy for ManagedSubscribeProviderProxy { endpoints: vec![endpoint], }; - info!("New Entity constructed: {new_entity:?}"); + info!("Requesting loopback with Entity: {new_entity:?}"); Ok(EntityRegistration::Loopback(new_entity)) } diff --git a/provider_proxy_selector/src/provider_proxy_selector_impl.rs b/provider_proxy_selector/src/provider_proxy_selector_impl.rs index a096ee12..741cba18 100644 --- a/provider_proxy_selector/src/provider_proxy_selector_impl.rs +++ b/provider_proxy_selector/src/provider_proxy_selector_impl.rs @@ -9,7 +9,7 @@ use std::{ use async_trait::async_trait; use crossbeam::queue::SegQueue; -use log::{debug, info}; +use log::debug; use tokio::sync::Mutex; use freyja_contracts::{ @@ -84,6 +84,8 @@ impl ProviderProxySelector for ProviderProxySelectorImpl { let mut loopback_count = 0; let mut current_entity = entity.to_owned(); + // Proxy selector will loop (up to max attempts) until a proxy registers the entity. + // Will break out of loop on an error. 'loopback: while loopback_count < PROXY_SELECTOR_LOOPBACK_MAX { let mut state = self.state.lock().await; @@ -98,11 +100,10 @@ impl ProviderProxySelector for ProviderProxySelectorImpl { .await .map_err(ProviderProxySelectorError::communication)?; - info!("Existing Register Entity result is: {entity_registration}."); - match entity_registration { EntityRegistration::Registered => { - // There was a successful registration of the entity and can return from the loop. + // There was a successful registration of the entity. + // The entity is added to the map and the selector returns. state .entity_map .insert(String::from(¤t_entity.id), String::from(&endpoint.uri)); @@ -110,10 +111,11 @@ impl ProviderProxySelector for ProviderProxySelectorImpl { return Ok(()) }, EntityRegistration::Loopback(new_entity) => { + // The proxy is requesting a loopback with new entity information current_entity = new_entity.to_owned(); loopback_count += 1; - info!("Hit existing loopback with new entity: {current_entity:?}"); + debug!("Loopback requested with: {current_entity:?}. Loopback count is: {loopback_count}."); continue 'loopback; } @@ -144,22 +146,21 @@ impl ProviderProxySelector for ProviderProxySelectorImpl { .await .map_err(ProviderProxySelectorError::provider_proxy_error)?; - // Register the entity with the provider proxy. + // Register the entity with the provider proxy let entity_registration = provider_proxy .register_entity(¤t_entity.id, &endpoint) .await .map_err(ProviderProxySelectorError::provider_proxy_error)?; - // As long as there was not an error with registration, add proxy to map. + // As long as there was not an error with registration, add proxy to map state .provider_proxies .insert(endpoint.uri.clone(), provider_proxy); - info!("New Register Entity result is: {entity_registration}."); - match entity_registration { EntityRegistration::Registered => { - // There was a successful registration of the entity and can return from the loop. + // There was a successful registration of the entity. + // The entity is added to the map and the selector returns. state .entity_map .insert(String::from(¤t_entity.id), String::from(&endpoint.uri)); @@ -167,17 +168,18 @@ impl ProviderProxySelector for ProviderProxySelectorImpl { return Ok(()) }, EntityRegistration::Loopback(new_entity) => { + // The proxy is requesting a loopback with new entity information current_entity = new_entity.to_owned(); loopback_count += 1; - info!("Hit new loopback with new entity: {current_entity:?}"); + debug!("Loopback requested with: {current_entity:?}. Loopback count is: {loopback_count}."); continue 'loopback; } } } - Err(ProviderProxySelectorError::provider_proxy_error(format!("Unable to select proxy, reached loopback max of: {PROXY_SELECTOR_LOOPBACK_MAX}."))) + Err(ProviderProxySelectorError::provider_proxy_error(format!("Unable to select proxy, reached max attempts of: {PROXY_SELECTOR_LOOPBACK_MAX}."))) } /// Requests that the value of an entity be published as soon as possible