Skip to content

Commit

Permalink
change to a start model for proxies
Browse files Browse the repository at this point in the history
  • Loading branch information
wilyle committed Nov 13, 2023
1 parent 47adecd commit 5f15239
Show file tree
Hide file tree
Showing 19 changed files with 170 additions and 128 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions contracts/src/provider_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ pub trait ProviderProxy {
where
Self: Sized;

/// Runs a provider proxy.
async fn run(&self) -> Result<(), ProviderProxyError>;
/// Starts a provider proxy.
/// This should not block once intialization is complete, so anything that needs to run indefinitely
/// (such as a server or a listener) should spawn its own task.
async fn start(&self) -> Result<(), ProviderProxyError>;

/// Sends a request to a provider for obtaining the value of an entity
///
Expand All @@ -57,6 +59,11 @@ pub trait ProviderProxy {

/// Factory for creating ProviderProxies
pub trait ProviderProxyFactory {
/// Create a new factory
fn new() -> Self
where
Self: Sized;

/// Check to see whether this factory can create a proxy for the requested entity.
/// Returns the first endpoint found that is supported by this factory.
///
Expand Down
5 changes: 4 additions & 1 deletion contracts/src/provider_proxy_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

use async_trait::async_trait;

use crate::entity::Entity;
use crate::{entity::Entity, provider_proxy::ProviderProxyFactory};

/// Manages a collection of proxies and provides access to them.
/// Conceptually similar to a gateway for the proxies.
#[async_trait]
pub trait ProviderProxySelector {
/// Registers a `ProviderProxyFactory` with this selector.
fn register<TFactory: ProviderProxyFactory + Send + Sync + 'static>(&mut self) -> Result<(), ProviderProxySelectorError>;

/// Updates an existing proxy for an entity if possible,
/// otherwise creates a new proxy to handle that entity.
///
Expand Down
5 changes: 5 additions & 0 deletions freyja/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ provider-proxy-selector = { workspace = true }
time = { workspace = true }
tokio = { workspace = true }

grpc-provider-proxy-v1 = { path = "../provider_proxies/grpc/v1" }
http-mock-provider-proxy = { path = "../provider_proxies/http_mock_provider_proxy" }
in-memory-mock-provider-proxy = { path = "../provider_proxies/in_memory_mock_provider_proxy" }
mqtt-provider-proxy = { path = "../provider_proxies/mqtt" }

[dev-dependencies]
# Dependencies for testing
mockall = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion freyja/src/cartographer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ mod cartographer_tests {
CheckForWorkResponse, GetMappingResponse, MappingClientError, SendInventoryRequest,
SendInventoryResponse,
},
provider_proxy_selector::ProviderProxySelectorError,
provider_proxy_selector::ProviderProxySelectorError, provider_proxy::ProviderProxyFactory,
};

mock! {
Expand Down Expand Up @@ -264,6 +264,7 @@ mod cartographer_tests {

#[async_trait]
impl ProviderProxySelector for ProviderProxySelector {
fn register<TFactory: ProviderProxyFactory + Send + Sync + 'static>(&mut self) -> Result<(), ProviderProxySelectorError>;
async fn create_or_update_proxy(&self, entity: &Entity) -> Result<(), ProviderProxySelectorError>;
async fn request_entity_value(&self, entity_id: &str) -> Result<(), ProviderProxySelectorError>;
}
Expand Down
3 changes: 2 additions & 1 deletion freyja/src/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ mod emitter_tests {
cloud_adapter::{CloudAdapterError, CloudAdapterErrorKind},
entity::Entity,
provider_proxy_selector::ProviderProxySelectorError,
signal::{Emission, EmissionPolicy},
signal::{Emission, EmissionPolicy}, provider_proxy::ProviderProxyFactory,
};

mock! {
Expand All @@ -255,6 +255,7 @@ mod emitter_tests {

#[async_trait]
impl ProviderProxySelector for ProviderProxySelector {
fn register<TFactory: ProviderProxyFactory + Send + Sync + 'static>(&mut self) -> Result<(), ProviderProxySelectorError>;
async fn create_or_update_proxy(&self, entity: &Entity) -> Result<(), ProviderProxySelectorError>;
async fn request_entity_value(&self, entity_id: &str) -> Result<(), ProviderProxySelectorError>;
}
Expand Down
17 changes: 13 additions & 4 deletions freyja/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use grpc_provider_proxy_v1::grpc_provider_proxy_factory::GRPCProviderProxyFactory;
use http_mock_provider_proxy::http_mock_provider_proxy_factory::HttpMockProviderProxyFactory;
use in_memory_mock_provider_proxy::in_memory_mock_provider_proxy_factory::InMemoryMockProviderProxyFactory;
use mqtt_provider_proxy::mqtt_provider_proxy_factory::MqttProviderProxyFactory;
// Re-export this macro for convenience so users don't need to manually import the proc_macros crate
pub use proc_macros::freyja_main;
use tokio::sync::Mutex;
Expand All @@ -20,7 +24,7 @@ use emitter::Emitter;
use freyja_common::signal_store::SignalStore;
use freyja_contracts::{
cloud_adapter::CloudAdapter, digital_twin_adapter::DigitalTwinAdapter,
mapping_client::MappingClient, provider_proxy::SignalValue,
mapping_client::MappingClient, provider_proxy::SignalValue, provider_proxy_selector::ProviderProxySelector,
};
use provider_proxy_selector::provider_proxy_selector_impl::ProviderProxySelectorImpl;

Expand Down Expand Up @@ -59,9 +63,14 @@ pub async fn freyja_main<

let signal_store = Arc::new(SignalStore::new());
let signal_values_queue: Arc<SegQueue<SignalValue>> = Arc::new(SegQueue::new());
let provider_proxy_selector = Arc::new(Mutex::new(ProviderProxySelectorImpl::new(
signal_values_queue.clone(),
)));

let mut provider_proxy_selector = ProviderProxySelectorImpl::new(signal_values_queue.clone());
provider_proxy_selector.register::<GRPCProviderProxyFactory>()?;
provider_proxy_selector.register::<HttpMockProviderProxyFactory>()?;
provider_proxy_selector.register::<InMemoryMockProviderProxyFactory>()?;
provider_proxy_selector.register::<MqttProviderProxyFactory>()?;

let provider_proxy_selector = Arc::new(Mutex::new(provider_proxy_selector));

// Setup cartographer
let cartographer_poll_interval = Duration::from_secs(5);
Expand Down
2 changes: 1 addition & 1 deletion provider_proxies/grpc/v1/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ samples-protobuf-data-access = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
tower = { workspace = true }

[dev-dependencies]
tokio = { workspace = true }
tokio-stream = { workspace = true }

[build-dependencies]
Expand Down
16 changes: 9 additions & 7 deletions provider_proxies/grpc/v1/src/grpc_provider_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,8 @@ impl ProviderProxy for GRPCProviderProxy {
.map(|r| Arc::new(r) as _)
}

/// Runs a provider proxy
async fn run(&self) -> Result<(), ProviderProxyError> {
info!("Started a GRPCProviderProxy!");

/// Starts a provider proxy
async fn start(&self) -> Result<(), ProviderProxyError> {
let addr: SocketAddr = self
.config
.consumer_address
Expand All @@ -97,9 +95,13 @@ impl ProviderProxy for GRPCProviderProxy {
let server_future = Server::builder()
.add_service(DigitalTwinConsumerServer::new(consumer_impl))
.serve(addr);
let _ = server_future
.await
.map_err(ProviderProxyError::communication);

tokio::spawn(async move {
let _ = server_future
.await;
});

info!("Started a GRPCProviderProxy!");

Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions provider_proxies/grpc/v1/src/grpc_provider_proxy_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ use crate::{
pub struct GRPCProviderProxyFactory {}

impl ProviderProxyFactory for GRPCProviderProxyFactory {
/// Create a new `GRPCProviderProxyFactory`
fn new() -> Self {
Self { }
}

/// Check to see whether this factory can create a proxy for the requested entity.
/// Returns the first endpoint found that is supported by this factory.
///
Expand Down
1 change: 1 addition & 0 deletions provider_proxies/http_mock_provider_proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ freyja-contracts = { workspace = true }
log = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }

[build-dependencies]
freyja-build-common = { workspace = true }
Original file line number Diff line number Diff line change
Expand Up @@ -104,39 +104,6 @@ impl HttpMockProviderProxy {

ok!()
}

/// Run a listener, so the providers' server can publish data back
///
/// # Arguments
/// - `signal_values_queue`: shared queue for all proxies to push new signal values of entities
async fn run_signal_values_listener(
&self,
signal_values_queue: Arc<SegQueue<SignalValue>>,
) -> Result<(), ProviderProxyError> {
let server_endpoint_addr = SocketAddr::from_str(&self.config.proxy_callback_address)
.map_err(ProviderProxyError::parse)?;
// Start a listener server to have a digital twin provider push data
// http://{provider_callback_authority}/value
// POST request where the json body is GetSignalValueResponse
// Set up router path
let router = Router::new()
.route(CALLBACK_FOR_VALUES_PATH, post(Self::receive_value_handler))
.with_state(signal_values_queue);

// Run the listener
let builder = axum::Server::try_bind(&server_endpoint_addr)
.map_err(ProviderProxyError::communication)?;
builder
.serve(router.into_make_service())
.await
.map_err(ProviderProxyError::communication)?;

info!(
"Http Provider Proxy listening at http://{}", // Devskim: ignore DS137138
self.config.proxy_callback_address
);
Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -171,11 +138,33 @@ impl ProviderProxy for HttpMockProviderProxy {
.map(|r| Arc::new(r) as _)
}

/// Runs a provider proxy
async fn run(&self) -> Result<(), ProviderProxyError> {
info!("Started an HttpProviderProxy!");
self.run_signal_values_listener(self.signal_values_queue.clone())
.await?;
/// Starts a provider proxy
async fn start(&self) -> Result<(), ProviderProxyError> {
let server_endpoint_addr = SocketAddr::from_str(&self.config.proxy_callback_address)
.map_err(ProviderProxyError::parse)?;
// Start a listener server to have a digital twin provider push data
// http://{provider_callback_authority}/value
// POST request where the json body is GetSignalValueResponse
// Set up router path
let router = Router::new()
.route(CALLBACK_FOR_VALUES_PATH, post(Self::receive_value_handler))
.with_state(self.signal_values_queue.clone());

// Run the listener
let builder = axum::Server::try_bind(&server_endpoint_addr)
.map_err(ProviderProxyError::communication)?;

tokio::spawn(async move {
let _ = builder
.serve(router.into_make_service())
.await;
});

info!(
"Http Provider Proxy listening at http://{}", // Devskim: ignore DS137138
self.config.proxy_callback_address
);

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ use crate::{
pub struct HttpMockProviderProxyFactory {}

impl ProviderProxyFactory for HttpMockProviderProxyFactory {
/// Create a new `GRPCProviderProxyFactory`
fn new() -> Self {
Self { }
}

/// Check to see whether this factory can create a proxy for the requested entity.
/// Returns the first endpoint found that is supported by this factory.
///
Expand Down
Loading

0 comments on commit 5f15239

Please sign in to comment.