Skip to content

Commit

Permalink
Add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
devkelley committed Nov 15, 2023
1 parent e24ce27 commit d61e6d6
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
4 changes: 4 additions & 0 deletions contracts/src/provider_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ pub struct SignalValue {
}

#[derive(Clone, Debug, Display, Eq, PartialEq)]
/// Successful return types when a proxy registers an entity
pub enum EntityRegistration {
/// The Entity has been successfully registered by the proxy
Registered,
/// The proxy has requested a loopback with new information for the proxy selector
Loopback(Entity),
}

Expand Down Expand Up @@ -53,6 +56,7 @@ pub trait ProviderProxy {

/// Registers an entity id to a local cache inside a provider proxy to keep track of which entities a provider proxy contains.
/// If the operation is Subscribe for an entity, the expectation is subscribe will happen in this function after registering an entity.
/// Some proxies may return a 'Loopback' with new entity information for the proxy selector to use to select a different proxy.
///
/// # Arguments
/// - `entity_id`: the entity id to add
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use freyja_contracts::{
provider_proxy::{ProviderProxy, ProviderProxyError, ProviderProxyErrorKind, SignalValue, EntityRegistration},
};

/// Interfaces with providers which support GRPC. Based on the Ibeji mixed sample.
/// Note that the current implementation works on the assumption that there is a
/// Interfaces with providers which utilize 'Managed Subscribe'. Based on the Ibeji managed
/// subscribe sample. Note that the current implementation works on the assumption that there is a
/// one-to-one mapping of topic to entity id.
pub struct ManagedSubscribeProviderProxy {
/// The proxy config
Expand Down Expand Up @@ -70,7 +70,8 @@ impl ProviderProxy for ManagedSubscribeProviderProxy {

/// Starts a provider proxy
async fn start(&self) -> Result<(), ProviderProxyError> {
// Not relevant for this proxy, so passthrough.
// Not relevant for this proxy as the proxy is retrieving where to subscribe to and has no
// persistent state.
Ok(())
}

Expand All @@ -83,7 +84,8 @@ impl ProviderProxy for ManagedSubscribeProviderProxy {
Ok(())
}

/// Loopback proxy that calls the 'Managed Subscribe' module in Ibeji to retrieve correct subscription endpoint.
/// Calls the 'Managed Subscribe' module in Ibeji to retrieve correct subscription endpoint and
/// returns a Loopback request to proxy selector
///
/// # Arguments
/// - `entity_id`: the entity id to add
Expand All @@ -106,6 +108,7 @@ impl ProviderProxy for ManagedSubscribeProviderProxy {

let mut client = self.client.clone();

// Set the default frequency to recieve data at
let default_freq_constraint = Constraint {
r#type: self.config.frequency_constraint_type.clone(),
value: self.config.frequency_constraint_value.clone(),
Expand All @@ -116,38 +119,36 @@ impl ProviderProxy for ManagedSubscribeProviderProxy {
constraints: vec![default_freq_constraint],
});

info!("Calling Managed Subscribe with {request:?}.");

let result = client
.get_subscription_info(request)
.await
.map_err(ProviderProxyError::communication)?;

info!("Managed Subscribe returned {result:?}.");

let sub_info = result.into_inner();

// The mqtt proxy supports v5 and v3 so do not need to make a distinction
let mut protocol = sub_info.protocol;

if protocol.contains(MQTT_PROTOCOL) {
protocol = MQTT_PROTOCOL.to_string();
}

// Construct endpoint information from returned result
let endpoint = EntityEndpoint {
protocol,
operations: vec![SUBSCRIBE_OPERATION.to_string()],
uri: sub_info.uri,
context: sub_info.context,
};

// Create new entity object with updated endpoint information.
let new_entity = Entity {
name: Some(entity_id.to_string()),
id: entity_id.to_string(),
description: None,
endpoints: vec![endpoint],
};

info!("New Entity constructed: {new_entity:?}");
info!("Requesting loopback with Entity: {new_entity:?}");

Ok(EntityRegistration::Loopback(new_entity))
}
Expand Down
26 changes: 14 additions & 12 deletions provider_proxy_selector/src/provider_proxy_selector_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use async_trait::async_trait;
use crossbeam::queue::SegQueue;
use log::{debug, info};
use log::debug;
use tokio::sync::Mutex;

use freyja_contracts::{
Expand Down Expand Up @@ -84,6 +84,8 @@ impl ProviderProxySelector for ProviderProxySelectorImpl {
let mut loopback_count = 0;
let mut current_entity = entity.to_owned();

// Proxy selector will loop (up to max attempts) until a proxy registers the entity.
// Will break out of loop on an error.
'loopback: while loopback_count < PROXY_SELECTOR_LOOPBACK_MAX {
let mut state = self.state.lock().await;

Expand All @@ -98,22 +100,22 @@ impl ProviderProxySelector for ProviderProxySelectorImpl {
.await
.map_err(ProviderProxySelectorError::communication)?;

info!("Existing Register Entity result is: {entity_registration}.");

match entity_registration {
EntityRegistration::Registered => {
// There was a successful registration of the entity and can return from the loop.
// There was a successful registration of the entity.
// The entity is added to the map and the selector returns.
state
.entity_map
.insert(String::from(&current_entity.id), String::from(&endpoint.uri));

return Ok(())
},
EntityRegistration::Loopback(new_entity) => {
// The proxy is requesting a loopback with new entity information
current_entity = new_entity.to_owned();
loopback_count += 1;

info!("Hit existing loopback with new entity: {current_entity:?}");
debug!("Loopback requested with: {current_entity:?}. Loopback count is: {loopback_count}.");

continue 'loopback;
}
Expand Down Expand Up @@ -144,40 +146,40 @@ impl ProviderProxySelector for ProviderProxySelectorImpl {
.await
.map_err(ProviderProxySelectorError::provider_proxy_error)?;

// Register the entity with the provider proxy.
// Register the entity with the provider proxy
let entity_registration = provider_proxy
.register_entity(&current_entity.id, &endpoint)
.await
.map_err(ProviderProxySelectorError::provider_proxy_error)?;

// As long as there was not an error with registration, add proxy to map.
// As long as there was not an error with registration, add proxy to map
state
.provider_proxies
.insert(endpoint.uri.clone(), provider_proxy);

info!("New Register Entity result is: {entity_registration}.");

match entity_registration {
EntityRegistration::Registered => {
// There was a successful registration of the entity and can return from the loop.
// There was a successful registration of the entity.
// The entity is added to the map and the selector returns.
state
.entity_map
.insert(String::from(&current_entity.id), String::from(&endpoint.uri));

return Ok(())
},
EntityRegistration::Loopback(new_entity) => {
// The proxy is requesting a loopback with new entity information
current_entity = new_entity.to_owned();
loopback_count += 1;

info!("Hit new loopback with new entity: {current_entity:?}");
debug!("Loopback requested with: {current_entity:?}. Loopback count is: {loopback_count}.");

continue 'loopback;
}
}
}

Err(ProviderProxySelectorError::provider_proxy_error(format!("Unable to select proxy, reached loopback max of: {PROXY_SELECTOR_LOOPBACK_MAX}.")))
Err(ProviderProxySelectorError::provider_proxy_error(format!("Unable to select proxy, reached max attempts of: {PROXY_SELECTOR_LOOPBACK_MAX}.")))
}

/// Requests that the value of an entity be published as soon as possible
Expand Down

0 comments on commit d61e6d6

Please sign in to comment.