From 7e420e6dc72ae34fbc1e0eee8ede52d1a2f781ce Mon Sep 17 00:00:00 2001 From: William Lyles <26171886+wilyle@users.noreply.github.com> Date: Tue, 31 Oct 2023 14:42:21 -0700 Subject: [PATCH] integrate new find_by_id contract and use factory model --- common/src/signal_store.rs | 42 +-- contracts/src/entity.rs | 23 +- contracts/src/provider_proxy.rs | 23 +- contracts/src/provider_proxy_selector.rs | 4 +- .../README.md | 15 +- ...in_memory_digital_twin_config.default.json | 32 ++- .../in_memory_mock_digital_twin_adapter.rs | 20 +- freyja/src/cartographer.rs | 16 +- freyja/src/emitter.rs | 6 +- .../res/mock_digital_twin_config.default.json | 30 +- mocks/mock_digital_twin/src/lib.rs | 4 +- mocks/mock_digital_twin/src/main.rs | 15 +- .../grpc/v1/src/grpc_provider_proxy.rs | 49 ++-- .../v1/src/grpc_provider_proxy_factory.rs | 35 +++ provider_proxies/grpc/v1/src/lib.rs | 5 + .../src/http_mock_provider_proxy.rs | 88 +++--- .../src/http_mock_provider_proxy_factory.rs | 35 +++ .../http_mock_provider_proxy/src/lib.rs | 5 + ...xy.rs => in_memory_mock_provider_proxy.rs} | 42 +-- .../in_memory_mock_provider_proxy_factory.rs | 35 +++ .../in_memory_mock_provider_proxy/src/lib.rs | 7 +- .../src/provider_proxy_selector_impl.rs | 263 ++++++------------ 22 files changed, 435 insertions(+), 359 deletions(-) create mode 100644 provider_proxies/grpc/v1/src/grpc_provider_proxy_factory.rs create mode 100644 provider_proxies/http_mock_provider_proxy/src/http_mock_provider_proxy_factory.rs rename provider_proxies/in_memory_mock_provider_proxy/src/{in_memory_provider_proxy.rs => in_memory_mock_provider_proxy.rs} (90%) create mode 100644 provider_proxies/in_memory_mock_provider_proxy/src/in_memory_mock_provider_proxy_factory.rs diff --git a/common/src/signal_store.rs b/common/src/signal_store.rs index 4fcf6c28..6c4ddc3c 100644 --- a/common/src/signal_store.rs +++ b/common/src/signal_store.rs @@ -174,7 +174,7 @@ mod signal_store_tests { use freyja_contracts::{ conversion::Conversion, - entity::Entity, + entity::{Entity, EntityEndpoint}, signal::{Emission, EmissionPolicy, Target}, }; @@ -264,12 +264,14 @@ mod signal_store_tests { id: ID.to_string(), value: Some(ORIGINAL.to_string()), source: Entity { - id: ID.to_string(), name: Some(ORIGINAL.to_string()), - uri: ORIGINAL.to_string(), + id: ID.to_string(), description: Some(ORIGINAL.to_string()), - operation: GET_OPERATION.to_string(), - protocol: ORIGINAL.to_string(), + endpoints: vec![EntityEndpoint { + protocol: ORIGINAL.to_string(), + operations: vec![GET_OPERATION.to_string()], + uri: ORIGINAL.to_string(), + }], }, target: Target { metadata: [(ORIGINAL.to_string(), ORIGINAL.to_string())] @@ -293,12 +295,14 @@ mod signal_store_tests { id: ID.to_string(), value: Some(INCOMING.to_string()), source: Entity { - id: ID.to_string(), name: Some(INCOMING.to_string()), - uri: INCOMING.to_string(), + id: ID.to_string(), description: Some(INCOMING.to_string()), - operation: "FooOperation".to_string(), - protocol: INCOMING.to_string(), + endpoints: vec![EntityEndpoint { + protocol: INCOMING.to_string(), + operations: vec!["FooOperation".to_string()], + uri: INCOMING.to_string(), + }], }, target: Target { metadata: [(INCOMING.to_string(), INCOMING.to_string())] @@ -363,12 +367,14 @@ mod signal_store_tests { id: ID.to_string(), value: Some(INCOMING.to_string()), source: Entity { - id: ID.to_string(), name: Some(INCOMING.to_string()), - uri: INCOMING.to_string(), + id: ID.to_string(), description: Some(INCOMING.to_string()), - operation: GET_OPERATION.to_string(), - protocol: INCOMING.to_string(), + endpoints: vec![EntityEndpoint { + protocol: INCOMING.to_string(), + operations: vec![GET_OPERATION.to_string()], + uri: INCOMING.to_string(), + }], }, target: Target { metadata: [(INCOMING.to_string(), INCOMING.to_string())] @@ -426,12 +432,14 @@ mod signal_store_tests { id: ID.to_string(), value: Some(ORIGINAL.to_string()), source: Entity { - id: ID.to_string(), name: Some(ORIGINAL.to_string()), - uri: ORIGINAL.to_string(), + id: ID.to_string(), description: Some(ORIGINAL.to_string()), - operation: GET_OPERATION.to_string(), - protocol: ORIGINAL.to_string(), + endpoints: vec![EntityEndpoint { + protocol: ORIGINAL.to_string(), + operations: vec![GET_OPERATION.to_string()], + uri: ORIGINAL.to_string(), + }], }, target: Target { metadata: [(ORIGINAL.to_string(), ORIGINAL.to_string())] diff --git a/contracts/src/entity.rs b/contracts/src/entity.rs index d390b97a..7f0c9b55 100644 --- a/contracts/src/entity.rs +++ b/contracts/src/entity.rs @@ -7,21 +7,28 @@ use serde::{Deserialize, Serialize}; /// Represents an entity #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct Entity { - /// The entity's id - pub id: String, - // The entity's name pub name: Option, - /// The provider's uri - pub uri: String, + /// The entity's id + pub id: String, /// The entity's description pub description: Option, - /// The operation that we will use for this entity - pub operation: String, + /// The list of supported endpoints + pub endpoints: Vec, +} +/// Represents an entity's endpoint for communication +#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct EntityEndpoint { /// The protocol to use to contact this entity pub protocol: String, -} + + /// The operations that this entity supports + pub operations: Vec, + + /// The provider's uri + pub uri: String, +} \ No newline at end of file diff --git a/contracts/src/provider_proxy.rs b/contracts/src/provider_proxy.rs index 0a13afd7..1f0e96a0 100644 --- a/contracts/src/provider_proxy.rs +++ b/contracts/src/provider_proxy.rs @@ -7,6 +7,8 @@ use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; use crossbeam::queue::SegQueue; +use crate::entity::{EntityEndpoint, Entity}; + /// Represents a signal value pub struct SignalValue { /// The entity's id @@ -26,11 +28,11 @@ pub trait ProviderProxy: Debug { fn create_new( provider_uri: &str, signal_values_queue: Arc>, - ) -> Result, ProviderProxyError> + ) -> Result, ProviderProxyError> where Self: Sized; - /// Runs a provider proxy + /// Runs a provider proxy. async fn run(&self) -> Result<(), ProviderProxyError>; /// Sends a request to a provider for obtaining the value of an entity @@ -44,20 +46,18 @@ pub trait ProviderProxy: Debug { /// /// # Arguments /// - `entity_id`: the entity id to add - /// - `operation`: the operation that this entity supports + /// - `endpoint`: the endpoint that this entity supports async fn register_entity( &self, entity_id: &str, - operation: &str, + endpoint: &EntityEndpoint, ) -> Result<(), ProviderProxyError>; +} - /// Checks if this operation is supported - /// - /// # Arguments - /// - `operation`: check to see if this operation is supported by this provider proxy - fn is_operation_supported(operation: &str) -> bool - where - Self: Sized + Send + Sync; +pub trait ProviderProxyFactory { + fn is_supported(&self, entity: &Entity) -> Option; + + fn create_proxy(&self, provider_uri: &str, signal_values_queue: Arc>) -> Result, ProviderProxyError>; } proc_macros::error! { @@ -68,6 +68,7 @@ proc_macros::error! { Deserialize, Communication, EntityNotFound, + OperationNotSupported, Unknown } } diff --git a/contracts/src/provider_proxy_selector.rs b/contracts/src/provider_proxy_selector.rs index 6dc74430..816f17cd 100644 --- a/contracts/src/provider_proxy_selector.rs +++ b/contracts/src/provider_proxy_selector.rs @@ -16,7 +16,7 @@ pub trait ProviderProxySelector { /// # Arguments /// - `entity`: the entity that the proxy should handle async fn create_or_update_proxy( - &mut self, + &self, entity: &Entity, ) -> Result<(), ProviderProxySelectorError>; @@ -25,7 +25,7 @@ pub trait ProviderProxySelector { /// # Arguments /// - `entity_id`: the entity to request async fn request_entity_value( - &mut self, + &self, entity_id: &str, ) -> Result<(), ProviderProxySelectorError>; } diff --git a/digital_twin_adapters/in_memory_mock_digital_twin_adapter/README.md b/digital_twin_adapters/in_memory_mock_digital_twin_adapter/README.md index d9e73a88..aed5f234 100644 --- a/digital_twin_adapters/in_memory_mock_digital_twin_adapter/README.md +++ b/digital_twin_adapters/in_memory_mock_digital_twin_adapter/README.md @@ -6,12 +6,13 @@ The In-Memory Mock Digital Twin Adapter mocks the behavior of an in-vehicle digi This adapter supports the following configuration settings: -- `values`: a list of entities to use. Each entry in the list is an object with the following properties: - - `entity`: a digital twin entity that will be exposed to the `find_by_id` API. Entities contain the following properties: - - `id`: this is used as the key when calling `find_by_id`. - - `uri`: the uri that is used to invoke a provider. This is a stand-in for whatever the provider contact info is from Ibeji. This is used as the key when calling `subscribe` and `get` in the [In-Memory Provider Proxy](../../provider_proxies/in_memory_mock_provider_proxy/). - - `operation`: the operation that should be used to access this entity. - - `protocol`: the communication protocol that should be used to access this entity. For this particular adapter, the value should always be `in-memory`. - - `name` and `description`: these are currently unused by Freyja. They are included for completeness and parity with Ibeji's Digital Twin Service contract and may potentially be logged. +- `values`: A list of entities to use. Each entry in the list is an object with the following properties: + - `entity`: A digital twin entity that will be exposed to the `find_by_id` API. Entities contain the following properties: + - `id`: This is used as the key when calling `find_by_id`. + - `name` and `description`: These are currently unused by Freyja. They are included for completeness and parity with Ibeji's Digital Twin Service contract and may potentially be logged. + - `endpoints`: A list of endpoints that this entity supports. Each entry in the list is an object with the following properties: + - `protocol`: The communication protocol that should be used to access this entity. When using this adapter, the value of this property should be `in-memory` for most cases. + - `operations`: A list of operations that can be used to access this entity. + - `uri`: The uri that is used to invoke a provider. This is a stand-in for whatever the provider contact info is from Ibeji. This is used as the key when calling `subscribe` and `get` in the [In-Memory Provider Proxy](../../provider_proxies/in_memory_mock_provider_proxy/). This adapter supports [config overrides](../../docs/config-overrides.md). The override filename is `in_memory_digital_twin_config.json`, and the default config is located at `res/in_memory_digital_twin_config.default.json`. diff --git a/digital_twin_adapters/in_memory_mock_digital_twin_adapter/res/in_memory_digital_twin_config.default.json b/digital_twin_adapters/in_memory_mock_digital_twin_adapter/res/in_memory_digital_twin_config.default.json index bd297922..6f4f13fb 100644 --- a/digital_twin_adapters/in_memory_mock_digital_twin_adapter/res/in_memory_digital_twin_config.default.json +++ b/digital_twin_adapters/in_memory_mock_digital_twin_adapter/res/in_memory_digital_twin_config.default.json @@ -4,30 +4,42 @@ "entity": { "id": "dtmi:sdv:Vehicle:Cabin:HVAC:AmbientAirTemperature;1", "name": "AmbientAirTemperature", - "uri": "http://0.0.0.0:1111", - "description": "The immediate surroundings air temperature (in Fahrenheit).", - "operation": "Get", - "protocol": "in-memory" + "description": "The air temperature of the immediate surroundings (in Fahrenheit).", + "endpoints": [ + { + "protocol": "in-memory", + "operations": ["Get"], + "uri": "http://0.0.0.0:1111" + } + ] } }, { "entity": { "id": "dtmi:sdv:Vehicle:Cabin:HVAC:IsAirConditioningActive;1", "name": "IsAirConditioningActive", - "uri": "http://0.0.0.0:1111", "description": "Is air conditioning active?", - "operation": "Subscribe", - "protocol": "in-memory" + "endpoints": [ + { + "protocol": "in-memory", + "operations": ["Subscribe"], + "uri": "http://0.0.0.0:1111" + } + ] } }, { "entity": { "id": "dtmi:sdv:Vehicle:OBD:HybridBatteryRemaining;1", "name": "HybridBatteryRemaining", - "uri": "http://0.0.0.0:1111", "description": "Percentage of the hybrid battery remaining", - "operation": "Subscribe", - "protocol": "in-memory" + "endpoints": [ + { + "protocol": "in-memory", + "operations": ["Subscribe"], + "uri": "http://0.0.0.0:1111" + } + ] } } ] diff --git a/digital_twin_adapters/in_memory_mock_digital_twin_adapter/src/in_memory_mock_digital_twin_adapter.rs b/digital_twin_adapters/in_memory_mock_digital_twin_adapter/src/in_memory_mock_digital_twin_adapter.rs index c6a012b6..6fd04be8 100644 --- a/digital_twin_adapters/in_memory_mock_digital_twin_adapter/src/in_memory_mock_digital_twin_adapter.rs +++ b/digital_twin_adapters/in_memory_mock_digital_twin_adapter/src/in_memory_mock_digital_twin_adapter.rs @@ -69,7 +69,7 @@ mod in_memory_mock_digital_twin_adapter_tests { use super::*; use crate::config::EntityConfig; - use freyja_contracts::entity::Entity; + use freyja_contracts::entity::{Entity, EntityEndpoint}; const OPERATION: &str = "Subscribe"; @@ -86,12 +86,14 @@ mod in_memory_mock_digital_twin_adapter_tests { let config = Config { values: vec![EntityConfig { entity: Entity { - id: String::from(ENTITY_ID), name: None, - uri: String::from("http://0.0.0.0:1111"), // Devskim: ignore DS137138 + id: ENTITY_ID.to_string(), description: None, - operation: OPERATION.to_string(), - protocol: String::from("in-memory"), + endpoints: vec![EntityEndpoint { + protocol: String::from("in-memory"), + operations: vec![OPERATION.to_string()], + uri: String::from("http://0.0.0.0:1111"), // Devskim: ignore DS137138 + }], }, }], }; @@ -100,11 +102,17 @@ mod in_memory_mock_digital_twin_adapter_tests { let request = FindByIdRequest { entity_id: String::from(ENTITY_ID), }; + let response = in_memory_digital_twin_adapter .find_by_id(request) .await .unwrap(); + assert_eq!(response.entity.id, ENTITY_ID); - assert_eq!(response.entity.operation, OPERATION); + assert_eq!(response.entity.endpoints.len(), 1); + let endpoint = response.entity.endpoints.first().unwrap(); + assert_eq!(endpoint.operations.len(), 1); + let operation = endpoint.operations.first().unwrap(); + assert_eq!(operation, OPERATION); } } diff --git a/freyja/src/cartographer.rs b/freyja/src/cartographer.rs index 9f3c149e..e0b7f799 100644 --- a/freyja/src/cartographer.rs +++ b/freyja/src/cartographer.rs @@ -186,7 +186,7 @@ impl< .entity; { - let mut provider_proxy_selector = self.provider_proxy_selector.lock().await; + let provider_proxy_selector = self.provider_proxy_selector.lock().await; provider_proxy_selector .create_or_update_proxy(&signal.source) .await @@ -209,7 +209,7 @@ mod cartographer_tests { use freyja_contracts::{ digital_twin_adapter::{DigitalTwinAdapterError, FindByIdResponse}, digital_twin_map_entry::DigitalTwinMapEntry, - entity::Entity, + entity::{Entity, EntityEndpoint}, mapping_client::{ CheckForWorkResponse, GetMappingResponse, MappingClientError, SendInventoryRequest, SendInventoryResponse, @@ -264,8 +264,8 @@ mod cartographer_tests { #[async_trait] impl ProviderProxySelector for ProviderProxySelector { - async fn create_or_update_proxy(&mut self, entity: &Entity) -> Result<(), ProviderProxySelectorError>; - async fn request_entity_value(&mut self, entity_id: &str) -> Result<(), ProviderProxySelectorError>; + async fn create_or_update_proxy(&self, entity: &Entity) -> Result<(), ProviderProxySelectorError>; + async fn request_entity_value(&self, entity_id: &str) -> Result<(), ProviderProxySelectorError>; } } @@ -326,10 +326,12 @@ mod cartographer_tests { let test_entity = Entity { id: ID.to_string(), name: Some("name".to_string()), - uri: "uri".to_string(), description: Some("description".to_string()), - operation: "FooOperation".to_string(), - protocol: "in-memory".to_string(), + endpoints: vec![EntityEndpoint { + operations: vec!["FooOperation".to_string()], + protocol: "in-memory".to_string(), + uri: "uri".to_string(), + }] }; let test_signal_patch = &mut SignalPatch { diff --git a/freyja/src/emitter.rs b/freyja/src/emitter.rs index 3a392157..34453d2a 100644 --- a/freyja/src/emitter.rs +++ b/freyja/src/emitter.rs @@ -121,7 +121,7 @@ impl // This approach to requesting signal values introduces an inherent delay in uploading data // of signal.emission.policy.interval_ms and needs to be revisited. let proxy_result = { - let mut provider_proxy_selector = self.provider_proxy_selector.lock().await; + let provider_proxy_selector = self.provider_proxy_selector.lock().await; provider_proxy_selector .request_entity_value(&signal.id) .await @@ -255,8 +255,8 @@ mod emitter_tests { #[async_trait] impl ProviderProxySelector for ProviderProxySelector { - async fn create_or_update_proxy(&mut self, entity: &Entity) -> Result<(), ProviderProxySelectorError>; - async fn request_entity_value(&mut self, entity_id: &str) -> Result<(), ProviderProxySelectorError>; + async fn create_or_update_proxy(&self, entity: &Entity) -> Result<(), ProviderProxySelectorError>; + async fn request_entity_value(&self, entity_id: &str) -> Result<(), ProviderProxySelectorError>; } } diff --git a/mocks/mock_digital_twin/res/mock_digital_twin_config.default.json b/mocks/mock_digital_twin/res/mock_digital_twin_config.default.json index b728e712..3a4e8ca7 100644 --- a/mocks/mock_digital_twin/res/mock_digital_twin_config.default.json +++ b/mocks/mock_digital_twin/res/mock_digital_twin_config.default.json @@ -7,10 +7,14 @@ "entity": { "id": "dtmi:sdv:Vehicle:Cabin:HVAC:AmbientAirTemperature;1", "name": "AmbientAirTemperature", - "uri": "http://127.0.0.1:8800", "description": "The immediate surroundings air temperature (in Fahrenheit).", - "operation": "Get", - "protocol": "http" + "endpoints": [ + { + "protocol": "http", + "operations": ["Get"], + "uri": "http://127.0.0.1:8800/request-value" + } + ] }, "values": { "Static": "42.0" @@ -22,10 +26,14 @@ "entity": { "id": "dtmi:sdv:Vehicle:Cabin:HVAC:IsAirConditioningActive;1", "name": "IsAirConditioningActive", - "uri": "http://127.0.0.1:8800", "description": "Is air conditioning active?", - "operation": "Get", - "protocol": "http" + "endpoints": [ + { + "protocol": "http", + "operations": ["Get"], + "uri": "http://127.0.0.1:8800/request-value" + } + ] }, "values": { "Static": "true" @@ -37,10 +45,14 @@ "entity": { "id": "dtmi:sdv:Vehicle:OBD:HybridBatteryRemaining;1", "name": "HybridBatteryRemaining", - "uri": "http://127.0.0.1:8800", "description": "Percentage of the hybrid battery remaining", - "operation": "Subscribe", - "protocol": "http" + "endpoints": [ + { + "protocol": "http", + "operations": ["Subscribe"], + "uri": "http://127.0.0.1:8800/subscribe" + } + ] }, "values": { "Stepwise": { diff --git a/mocks/mock_digital_twin/src/lib.rs b/mocks/mock_digital_twin/src/lib.rs index 5d56d8f0..7edf67a4 100644 --- a/mocks/mock_digital_twin/src/lib.rs +++ b/mocks/mock_digital_twin/src/lib.rs @@ -4,5 +4,5 @@ pub const ENTITY_PATH: &str = "/entity"; pub const ENTITY_QUERY_PATH: &str = "/entity?id="; -pub const ENTITY_SUBSCRIBE_PATH: &str = "/entity/subscribe"; -pub const ENTITY_GET_VALUE_PATH: &str = "/entity/request=value"; +pub const ENTITY_SUBSCRIBE_PATH: &str = "/subscribe"; +pub const ENTITY_GET_VALUE_PATH: &str = "/request-value"; diff --git a/mocks/mock_digital_twin/src/main.rs b/mocks/mock_digital_twin/src/main.rs index 3735d24a..f3928ce0 100644 --- a/mocks/mock_digital_twin/src/main.rs +++ b/mocks/mock_digital_twin/src/main.rs @@ -24,8 +24,6 @@ use http_mock_provider_proxy::http_mock_provider_proxy::{EntityValueRequest, Ent use mock_digital_twin::{ENTITY_GET_VALUE_PATH, ENTITY_PATH, ENTITY_SUBSCRIBE_PATH}; const CONFIG_FILE_STEM: &str = "mock_digital_twin_config"; -const GET_OPERATION: &str = "Get"; -const SUBSCRIBE_OPERATION: &str = "Subscribe"; /// Stores the state of active entities, subscribers, and relays responses /// for getting/subscribing to an entity. @@ -254,18 +252,7 @@ async fn get_entity( let state = state.lock().unwrap(); find_entity(&state, &query.id) .map(|(config_item, _)| { - let operation_path = if config_item.entity.operation == SUBSCRIBE_OPERATION { - ENTITY_SUBSCRIBE_PATH - } else if config_item.entity.operation == GET_OPERATION { - ENTITY_GET_VALUE_PATH - } else { - return server_error!("Entity didn't have a valid operation"); - }; - - let mut entity = config_item.entity.clone(); - entity.uri = format!("{}{operation_path}", config_item.entity.uri); - - ok!(FindByIdResponse { entity }) + ok!(FindByIdResponse { entity: config_item.entity.clone() }) }) .unwrap_or(not_found!()) } diff --git a/provider_proxies/grpc/v1/src/grpc_provider_proxy.rs b/provider_proxies/grpc/v1/src/grpc_provider_proxy.rs index c4014eea..890dfbe2 100644 --- a/provider_proxies/grpc/v1/src/grpc_provider_proxy.rs +++ b/provider_proxies/grpc/v1/src/grpc_provider_proxy.rs @@ -19,13 +19,10 @@ use samples_protobuf_data_access::sample_grpc::v1::{ }; use tonic::transport::{Channel, Server}; -use crate::{config::Config, grpc_client_impl::GRPCClientImpl}; -use freyja_contracts::provider_proxy::{ProviderProxy, ProviderProxyError, SignalValue}; +use crate::{config::Config, grpc_client_impl::GRPCClientImpl, GET_OPERATION, SUBSCRIBE_OPERATION}; +use freyja_contracts::{provider_proxy::{ProviderProxy, ProviderProxyError, SignalValue, ProviderProxyErrorKind}, entity::EntityEndpoint}; const CONFIG_FILE_STEM: &str = "grpc_proxy_config"; -const GET_OPERATION: &str = "Get"; -const SUBSCRIBE_OPERATION: &str = "Subscribe"; -const SUPPORTED_OPERATIONS: &[&str] = &[GET_OPERATION, SUBSCRIBE_OPERATION]; /// Interfaces with providers which support GRPC. Based on the Ibeji mixed sample. #[derive(Debug)] @@ -53,7 +50,7 @@ impl ProviderProxy for GRPCProviderProxy { fn create_new( provider_uri: &str, signal_values_queue: Arc>, - ) -> Result, ProviderProxyError> + ) -> Result, ProviderProxyError> where Self: Sized, { @@ -77,7 +74,7 @@ impl ProviderProxy for GRPCProviderProxy { entity_operation_map: Mutex::new(HashMap::new()), signal_values_queue, }) - .map(|r| Box::new(r) as _) + .map(|r| Arc::new(r) as _) } /// Runs a provider proxy @@ -146,18 +143,34 @@ impl ProviderProxy for GRPCProviderProxy { /// /// # Arguments /// - `entity_id`: the entity id to add - /// - `operation`: the operation that this entity supports + /// - `endpoint`: the endpoint that this entity supports async fn register_entity( &self, entity_id: &str, - operation: &str, + endpoint: &EntityEndpoint, ) -> Result<(), ProviderProxyError> { + // Prefer subscribe if present + let selected_operation = { + let mut result = None; + for operation in endpoint.operations.iter() { + if operation == SUBSCRIBE_OPERATION { + result = Some(SUBSCRIBE_OPERATION); + break; + } else if operation == GET_OPERATION { + // Set result, but don't break the loop in case there's a subscribe operation later in the list + result = Some(GET_OPERATION); + } + } + + result.ok_or::(ProviderProxyErrorKind::OperationNotSupported.into())? + }; + self.entity_operation_map .lock() .unwrap() - .insert(String::from(entity_id), String::from(operation)); + .insert(String::from(entity_id), String::from(selected_operation)); - if operation == SUBSCRIBE_OPERATION { + if selected_operation == SUBSCRIBE_OPERATION { let consumer_uri = format!("http://{}", self.config.consumer_address); // Devskim: ignore DS137138 let mut client = self.provider_client.clone(); let request = tonic::Request::new(SubscribeRequest { @@ -178,14 +191,6 @@ impl ProviderProxy for GRPCProviderProxy { Ok(()) } - - /// Checks if the operation is supported - /// - /// # Arguments - /// - `operation`: check to see if this operation is supported by this provider proxy - fn is_operation_supported(operation: &str) -> bool { - SUPPORTED_OPERATIONS.contains(&operation) - } } #[cfg(test)] @@ -261,6 +266,8 @@ mod grpc_provider_proxy_v1_tests { /// so you would need to set an arbitrary port per test for TCP/IP sockets. #[cfg(unix)] mod unix_tests { + use crate::GRPC_PROTOCOL; + use super::*; use std::sync::Arc; @@ -326,7 +333,7 @@ mod grpc_provider_proxy_v1_tests { let entity_id = "operation_get_entity_id"; let result = grpc_provider_proxy - .register_entity(entity_id, GET_OPERATION) + .register_entity(entity_id, &EntityEndpoint { protocol: GRPC_PROTOCOL.to_string(), operations: vec![GET_OPERATION.to_string()], uri: "foo".to_string() }) .await; assert!(result.is_ok()); assert!(grpc_provider_proxy @@ -336,7 +343,7 @@ mod grpc_provider_proxy_v1_tests { let entity_id = "operation_subscribe_entity_id"; let result = grpc_provider_proxy - .register_entity(entity_id, SUBSCRIBE_OPERATION) + .register_entity(entity_id, &EntityEndpoint { protocol: GRPC_PROTOCOL.to_string(), operations: vec![SUBSCRIBE_OPERATION.to_string()], uri: "foo".to_string() }) .await; assert!(result.is_ok()); assert!(grpc_provider_proxy diff --git a/provider_proxies/grpc/v1/src/grpc_provider_proxy_factory.rs b/provider_proxies/grpc/v1/src/grpc_provider_proxy_factory.rs new file mode 100644 index 00000000..d08190a3 --- /dev/null +++ b/provider_proxies/grpc/v1/src/grpc_provider_proxy_factory.rs @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +use std::sync::Arc; + +use crossbeam::queue::SegQueue; +use freyja_contracts::{entity::{Entity, EntityEndpoint}, provider_proxy::{ProviderProxy, SignalValue, ProviderProxyError, ProviderProxyFactory}}; + +use crate::{grpc_provider_proxy::GRPCProviderProxy, GET_OPERATION, SUBSCRIBE_OPERATION, GRPC_PROTOCOL}; + +pub struct GRPCProviderProxyFactory { +} + +impl ProviderProxyFactory for GRPCProviderProxyFactory { + fn is_supported(&self, entity: &Entity) -> Option { + for endpoint in entity.endpoints.iter() { + if endpoint.protocol == GRPC_PROTOCOL { + for operation in endpoint.operations.iter() { + if operation == GET_OPERATION || operation == SUBSCRIBE_OPERATION { + // This entity is supported! The proxy will worry about how to handle operations, + // right now we just need to know if it can do anything at all with this entity. + return Some(endpoint.clone()); + } + } + } + } + + None + } + + fn create_proxy(&self, provider_uri: &str, signal_values_queue: Arc>) -> Result, ProviderProxyError> { + GRPCProviderProxy::create_new(provider_uri, signal_values_queue) + } +} \ No newline at end of file diff --git a/provider_proxies/grpc/v1/src/lib.rs b/provider_proxies/grpc/v1/src/lib.rs index 00f422e9..ee86281e 100644 --- a/provider_proxies/grpc/v1/src/lib.rs +++ b/provider_proxies/grpc/v1/src/lib.rs @@ -5,3 +5,8 @@ mod config; mod grpc_client_impl; pub mod grpc_provider_proxy; +pub mod grpc_provider_proxy_factory; + +const GRPC_PROTOCOL: &str = "grpc"; +const GET_OPERATION: &str = "Get"; +const SUBSCRIBE_OPERATION: &str = "Subscribe"; \ No newline at end of file diff --git a/provider_proxies/http_mock_provider_proxy/src/http_mock_provider_proxy.rs b/provider_proxies/http_mock_provider_proxy/src/http_mock_provider_proxy.rs index 6ad7deb9..0f8cea00 100644 --- a/provider_proxies/http_mock_provider_proxy/src/http_mock_provider_proxy.rs +++ b/provider_proxies/http_mock_provider_proxy/src/http_mock_provider_proxy.rs @@ -12,17 +12,16 @@ use axum::routing::post; use axum::Router; use crossbeam::queue::SegQueue; use freyja_common::{config_utils, out_dir}; +use freyja_contracts::entity::EntityEndpoint; use log::{debug, error, info}; use reqwest::Client; use serde::{Deserialize, Serialize}; +use crate::{GET_OPERATION, SUBSCRIBE_OPERATION}; use crate::config::Config; -use freyja_contracts::provider_proxy::{ProviderProxy, ProviderProxyError, SignalValue}; +use freyja_contracts::provider_proxy::{ProviderProxy, ProviderProxyError, SignalValue, ProviderProxyErrorKind}; const CONFIG_FILE_STEM: &str = "http_mock_provider_proxy"; -const GET_OPERATION: &str = "Get"; -const SUBSCRIBE_OPERATION: &str = "Subscribe"; -const SUPPORTED_OPERATIONS: &[&str] = &[GET_OPERATION, SUBSCRIBE_OPERATION]; const CALLBACK_FOR_VALUES_PATH: &str = "/value"; macro_rules! ok { @@ -148,7 +147,7 @@ impl ProviderProxy for HttpMockProviderProxy { fn create_new( provider_uri: &str, signal_values_queue: Arc>, - ) -> Result, ProviderProxyError> + ) -> Result, ProviderProxyError> where Self: Sized, { @@ -167,7 +166,7 @@ impl ProviderProxy for HttpMockProviderProxy { client: reqwest::Client::new(), entity_operation_map: Mutex::new(HashMap::new()), }) - .map(|r| Box::new(r) as _) + .map(|r| Arc::new(r) as _) } /// Runs a provider proxy @@ -224,52 +223,57 @@ impl ProviderProxy for HttpMockProviderProxy { /// /// # Arguments /// - `entity_id`: the entity id to add - /// - `operation`: the operation that this entity supports + /// - `endpoint`: the endpoint that this entity supports async fn register_entity( &self, entity_id: &str, - operation: &str, + endpoint: &EntityEndpoint, ) -> Result<(), ProviderProxyError> { + // Prefer subscribe if present + let selected_operation = { + let mut result = None; + for operation in endpoint.operations.iter() { + if operation == SUBSCRIBE_OPERATION { + result = Some(SUBSCRIBE_OPERATION); + break; + } else if operation == GET_OPERATION { + // Set result, but don't break the loop in case there's a subscribe operation later in the list + result = Some(GET_OPERATION); + } + } + + result.ok_or::(ProviderProxyErrorKind::OperationNotSupported.into())? + }; + self.entity_operation_map .lock() .unwrap() - .insert(String::from(entity_id), String::from(operation)); - - if operation != SUBSCRIBE_OPERATION { - return Ok(()); - } - - // Subscribe - let request = EntityValueRequest { - entity_id: String::from(entity_id), - callback_uri: Self::construct_callback_uri(&self.config.proxy_callback_address), - }; + .insert(String::from(entity_id), String::from(selected_operation)); - let subscribe_endpoint_for_entity = self.provider_uri.clone(); - let result = self - .client - .post(&subscribe_endpoint_for_entity) - .json(&request) - .send() - .await - .map_err(ProviderProxyError::communication)? - .error_for_status() - .map_err(ProviderProxyError::unknown); - - // Remove from map if the subscribe operation fails - if result.is_err() { - error!("Cannot subscribe to {entity_id} due to {result:?}"); - self.entity_operation_map.lock().unwrap().remove(entity_id); + if selected_operation == SUBSCRIBE_OPERATION { + let request = EntityValueRequest { + entity_id: String::from(entity_id), + callback_uri: Self::construct_callback_uri(&self.config.proxy_callback_address), + }; + + let subscribe_endpoint_for_entity = self.provider_uri.clone(); + let result = self + .client + .post(&subscribe_endpoint_for_entity) + .json(&request) + .send() + .await + .map_err(ProviderProxyError::communication)? + .error_for_status() + .map_err(ProviderProxyError::unknown); + + // Remove from map if the subscribe operation fails + if result.is_err() { + error!("Cannot subscribe to {entity_id} due to {result:?}"); + self.entity_operation_map.lock().unwrap().remove(entity_id); + } } Ok(()) } - - /// Checks if the operation is supported - /// - /// # Arguments - /// - `operation`: check to see if this operation is supported by this provider proxy - fn is_operation_supported(operation: &str) -> bool { - SUPPORTED_OPERATIONS.contains(&operation) - } } diff --git a/provider_proxies/http_mock_provider_proxy/src/http_mock_provider_proxy_factory.rs b/provider_proxies/http_mock_provider_proxy/src/http_mock_provider_proxy_factory.rs new file mode 100644 index 00000000..5f8fd879 --- /dev/null +++ b/provider_proxies/http_mock_provider_proxy/src/http_mock_provider_proxy_factory.rs @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +use std::sync::Arc; + +use crossbeam::queue::SegQueue; +use freyja_contracts::{entity::{Entity, EntityEndpoint}, provider_proxy::{ProviderProxy, SignalValue, ProviderProxyError, ProviderProxyFactory}}; + +use crate::{http_mock_provider_proxy::HttpMockProviderProxy, HTTP_PROTOCOL, GET_OPERATION, SUBSCRIBE_OPERATION}; + +pub struct HttpMockProviderProxyFactory { +} + +impl ProviderProxyFactory for HttpMockProviderProxyFactory { + fn is_supported(&self, entity: &Entity) -> Option { + for endpoint in entity.endpoints.iter() { + if endpoint.protocol == HTTP_PROTOCOL { + for operation in endpoint.operations.iter() { + if operation == GET_OPERATION || operation == SUBSCRIBE_OPERATION { + // This entity is supported! The proxy will worry about how to handle operations, + // right now we just need to know if it can do anything at all with this entity. + return Some(endpoint.clone()); + } + } + } + } + + None + } + + fn create_proxy(&self, provider_uri: &str, signal_values_queue: Arc>) -> Result, ProviderProxyError> { + HttpMockProviderProxy::create_new(provider_uri, signal_values_queue) + } +} \ No newline at end of file diff --git a/provider_proxies/http_mock_provider_proxy/src/lib.rs b/provider_proxies/http_mock_provider_proxy/src/lib.rs index 4b93e70e..35f1e190 100644 --- a/provider_proxies/http_mock_provider_proxy/src/lib.rs +++ b/provider_proxies/http_mock_provider_proxy/src/lib.rs @@ -4,3 +4,8 @@ mod config; pub mod http_mock_provider_proxy; +pub mod http_mock_provider_proxy_factory; + +const HTTP_PROTOCOL: &str = "http"; +const GET_OPERATION: &str = "Get"; +const SUBSCRIBE_OPERATION: &str = "Subscribe"; \ No newline at end of file diff --git a/provider_proxies/in_memory_mock_provider_proxy/src/in_memory_provider_proxy.rs b/provider_proxies/in_memory_mock_provider_proxy/src/in_memory_mock_provider_proxy.rs similarity index 90% rename from provider_proxies/in_memory_mock_provider_proxy/src/in_memory_provider_proxy.rs rename to provider_proxies/in_memory_mock_provider_proxy/src/in_memory_mock_provider_proxy.rs index 8e030d20..b4cecb8a 100644 --- a/provider_proxies/in_memory_mock_provider_proxy/src/in_memory_provider_proxy.rs +++ b/provider_proxies/in_memory_mock_provider_proxy/src/in_memory_mock_provider_proxy.rs @@ -15,13 +15,10 @@ use crossbeam::queue::SegQueue; use freyja_common::{config_utils, out_dir}; use log::info; -use crate::config::{Config, EntityConfig}; -use freyja_contracts::provider_proxy::{ProviderProxy, ProviderProxyError, SignalValue}; +use crate::{config::{Config, EntityConfig}, GET_OPERATION, SUBSCRIBE_OPERATION}; +use freyja_contracts::{provider_proxy::{ProviderProxy, ProviderProxyError, SignalValue, ProviderProxyErrorKind}, entity::EntityEndpoint}; const CONFIG_FILE_STEM: &str = "in_memory_mock_proxy_config"; -const GET_OPERATION: &str = "Get"; -const SUBSCRIBE_OPERATION: &str = "Subscribe"; -const SUPPORTED_OPERATIONS: &[&str] = &[GET_OPERATION, SUBSCRIBE_OPERATION]; #[derive(Debug)] pub struct InMemoryMockProviderProxy { @@ -97,7 +94,7 @@ impl ProviderProxy for InMemoryMockProviderProxy { fn create_new( _provider_uri: &str, signal_values_queue: Arc>, - ) -> Result, ProviderProxyError> + ) -> Result, ProviderProxyError> where Self: Sized, { @@ -109,7 +106,7 @@ impl ProviderProxy for InMemoryMockProviderProxy { ProviderProxyError::deserialize, )?; - Self::from_config(config, signal_values_queue).map(|r| Box::new(r) as _) + Self::from_config(config, signal_values_queue).map(|r| Arc::new(r) as _) } /// Runs a provider proxy @@ -178,25 +175,34 @@ impl ProviderProxy for InMemoryMockProviderProxy { /// /// # Arguments /// - `entity_id`: the entity id to add - /// - `operation`: the operation that this entity supports + /// - `endpoint`: the endpoint that this entity supports async fn register_entity( &self, entity_id: &str, - operation: &str, + endpoint: &EntityEndpoint, ) -> Result<(), ProviderProxyError> { + // Prefer subscribe if present + let selected_operation = { + let mut result = None; + for operation in endpoint.operations.iter() { + if operation == SUBSCRIBE_OPERATION { + result = Some(SUBSCRIBE_OPERATION); + break; + } else if operation == GET_OPERATION { + // Set result, but don't break the loop in case there's a subscribe operation later in the list + result = Some(GET_OPERATION); + } + } + + result.ok_or::(ProviderProxyErrorKind::OperationNotSupported.into())? + }; + self.entity_operation_map .lock() .unwrap() - .insert(String::from(entity_id), String::from(operation)); - Ok(()) - } + .insert(String::from(entity_id), String::from(selected_operation)); - /// Checks if the operation is supported - /// - /// # Arguments - /// - `operation`: check to see if this operation is supported by this provider proxy - fn is_operation_supported(operation: &str) -> bool { - SUPPORTED_OPERATIONS.contains(&operation) + Ok(()) } } diff --git a/provider_proxies/in_memory_mock_provider_proxy/src/in_memory_mock_provider_proxy_factory.rs b/provider_proxies/in_memory_mock_provider_proxy/src/in_memory_mock_provider_proxy_factory.rs new file mode 100644 index 00000000..655089b1 --- /dev/null +++ b/provider_proxies/in_memory_mock_provider_proxy/src/in_memory_mock_provider_proxy_factory.rs @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. +// SPDX-License-Identifier: MIT + +use std::sync::Arc; + +use crossbeam::queue::SegQueue; +use freyja_contracts::{entity::{Entity, EntityEndpoint}, provider_proxy::{ProviderProxy, SignalValue, ProviderProxyError, ProviderProxyFactory}}; + +use crate::{IN_MEMORY_PROTOCOL, GET_OPERATION, SUBSCRIBE_OPERATION, in_memory_mock_provider_proxy::InMemoryMockProviderProxy}; + +pub struct InMemoryMockProviderProxyFactory { +} + +impl ProviderProxyFactory for InMemoryMockProviderProxyFactory { + fn is_supported(&self, entity: &Entity) -> Option { + for endpoint in entity.endpoints.iter() { + if endpoint.protocol == IN_MEMORY_PROTOCOL { + for operation in endpoint.operations.iter() { + if operation == GET_OPERATION || operation == SUBSCRIBE_OPERATION { + // This entity is supported! The proxy will worry about how to handle operations, + // right now we just need to know if it can do anything at all with this entity. + return Some(endpoint.clone()); + } + } + } + } + + None + } + + fn create_proxy(&self, provider_uri: &str, signal_values_queue: Arc>) -> Result, ProviderProxyError> { + InMemoryMockProviderProxy::create_new(provider_uri, signal_values_queue) + } +} \ No newline at end of file diff --git a/provider_proxies/in_memory_mock_provider_proxy/src/lib.rs b/provider_proxies/in_memory_mock_provider_proxy/src/lib.rs index 5691cca4..2f27ad93 100644 --- a/provider_proxies/in_memory_mock_provider_proxy/src/lib.rs +++ b/provider_proxies/in_memory_mock_provider_proxy/src/lib.rs @@ -3,4 +3,9 @@ // SPDX-License-Identifier: MIT mod config; -pub mod in_memory_provider_proxy; +pub mod in_memory_mock_provider_proxy; +pub mod in_memory_mock_provider_proxy_factory; + +const IN_MEMORY_PROTOCOL: &str = "in-memory"; +const GET_OPERATION: &str = "Get"; +const SUBSCRIBE_OPERATION: &str = "Subscribe"; \ No newline at end of file diff --git a/provider_proxy_selector/src/provider_proxy_selector_impl.rs b/provider_proxy_selector/src/provider_proxy_selector_impl.rs index d86ace8d..628d2017 100644 --- a/provider_proxy_selector/src/provider_proxy_selector_impl.rs +++ b/provider_proxy_selector/src/provider_proxy_selector_impl.rs @@ -4,140 +4,40 @@ use std::{ collections::{hash_map::Entry, HashMap}, - str::FromStr, sync::Arc, }; use async_trait::async_trait; use crossbeam::queue::SegQueue; -use log::{debug, info, warn}; -use strum_macros::{Display, EnumString}; +use log::debug; use freyja_contracts::{ entity::Entity, - provider_proxy::{ProviderProxy, ProviderProxyError, SignalValue}, - provider_proxy_selector::{ProviderProxySelector, ProviderProxySelectorError}, + provider_proxy::{ProviderProxy, SignalValue, ProviderProxyFactory}, + provider_proxy_selector::{ProviderProxySelector, ProviderProxySelectorError, ProviderProxySelectorErrorKind}, }; -use grpc_provider_proxy_v1::grpc_provider_proxy::GRPCProviderProxy; -use http_mock_provider_proxy::http_mock_provider_proxy::HttpMockProviderProxy; -use in_memory_mock_provider_proxy::in_memory_provider_proxy::InMemoryMockProviderProxy; - -type ProviderProxyImpl = Arc>; - -/// Provider Proxy for matching the message delivery model -#[derive(Debug, Clone, EnumString, Display, PartialEq)] -#[strum(ascii_case_insensitive)] -pub enum ProviderProxyKind { - #[strum(serialize = "grpc")] - GRPCProviderProxy, - - #[strum(serialize = "mqtt")] - MqttProviderProxy, - - #[strum(serialize = "in-memory")] - InMemoryMockProviderProxy, - - #[strum(serialize = "http")] - HttpProviderProxy, -} - -impl ProviderProxyKind { - /// Handles the result from a provider proxy's instantiation - /// - /// # Arguments - /// - `provider_proxy_result`: the result of creating a provider proxy - /// - `provider_proxy_kind`: the provider proxy kind - fn handle_provider_proxy_result( - provider_proxy_result: Result, ProviderProxyError>, - provider_proxy_kind: ProviderProxyKind, - ) -> Result { - if let Err(error) = provider_proxy_result { - warn!("Cannot create a {provider_proxy_kind} due to {error:?}"); - return Err(ProviderProxySelectorError::communication(error)); - } - - Ok(Arc::new(provider_proxy_result.unwrap())) - } - - /// Instantiates the respective provider proxy using protocol and operation - /// - /// # Arguments - /// - `protocol`: the protocol for identifying the provider proxy - /// - `operation`: the operation for identifying the provider proxy - /// - `provider_uri`: the provider uri to contact - /// - `signal_values_queue`: shared queue for all proxies to push new signal values of entities - async fn create_provider_proxy( - protocol: &str, - operation: &str, - provider_uri: &str, - signal_values_queue: Arc>, - ) -> Result { - // Take the protocol match it to the ProviderProxyKind concrete implementation - // With the concrete implementation, check if our operation matches with anything that the provider has - let protocol_kind = ProviderProxyKind::from_str(protocol) - .map_err(ProviderProxySelectorError::protocol_not_supported)?; - - let provider_proxy: ProviderProxyImpl; - match protocol_kind { - ProviderProxyKind::GRPCProviderProxy - if GRPCProviderProxy::is_operation_supported(operation) => - { - info!("Creating a GRPCProviderProxy"); - - let grpc_provider_proxy_result = - GRPCProviderProxy::create_new(provider_uri, signal_values_queue); - - provider_proxy = ProviderProxyKind::handle_provider_proxy_result( - grpc_provider_proxy_result, - ProviderProxyKind::GRPCProviderProxy, - )?; - } - ProviderProxyKind::InMemoryMockProviderProxy - if InMemoryMockProviderProxy::is_operation_supported(operation) => - { - info!("Creating an InMemoryProviderProxy"); - - let in_memory_mock_provider_proxy_result = - InMemoryMockProviderProxy::create_new(provider_uri, signal_values_queue); - - provider_proxy = ProviderProxyKind::handle_provider_proxy_result( - in_memory_mock_provider_proxy_result, - ProviderProxyKind::InMemoryMockProviderProxy, - )?; - } - ProviderProxyKind::HttpProviderProxy - if HttpMockProviderProxy::is_operation_supported(operation) => - { - info!("Creating an HttpProviderProxy"); - let http_provider_proxy_result = - HttpMockProviderProxy::create_new(provider_uri, signal_values_queue); - - provider_proxy = ProviderProxyKind::handle_provider_proxy_result( - http_provider_proxy_result, - ProviderProxyKind::HttpProviderProxy, - )?; - } - _ => { - return Err(ProviderProxySelectorError::operation_not_supported( - "operation not supported", - )) - } - } - Ok(provider_proxy) - } -} +use grpc_provider_proxy_v1::grpc_provider_proxy_factory::GRPCProviderProxyFactory; +use http_mock_provider_proxy::http_mock_provider_proxy_factory::HttpMockProviderProxyFactory; +use in_memory_mock_provider_proxy::in_memory_mock_provider_proxy_factory::InMemoryMockProviderProxyFactory; +use tokio::sync::Mutex; /// The provider proxy selector selects which provider proxy to create based on protocol and operation. /// This struct is **not** thread-safe and should be shared with `Arc>`. +/// +/// Note: This struct makes use of two separate mutexes. To avoid deadlocking, +/// you MUST ensure that the entity_map mutex is only acquired if the provider_proxies map has also been acquired pub struct ProviderProxySelectorImpl { + /// The set of factories that have been registered + factories: Vec>, + /// A map of entity uri to provider proxy - pub provider_proxies: HashMap, + provider_proxies: Mutex>>, /// A map of entity id to provider uri - pub entity_map: HashMap, + entity_map: Mutex>, /// The signal values queue used for creating the proxies - pub signal_values_queue: Arc>, + signal_values_queue: Arc>, } impl ProviderProxySelectorImpl { @@ -146,9 +46,16 @@ impl ProviderProxySelectorImpl { /// # Arguments /// - `signal_values_queue`: The queue that is passed to proxies andused to update the emitter pub fn new(signal_values_queue: Arc>) -> Self { + let factories: Vec> = vec![ + Box::new(GRPCProviderProxyFactory {}), + Box::new(HttpMockProviderProxyFactory {}), + Box::new(InMemoryMockProviderProxyFactory {}), + ]; + ProviderProxySelectorImpl { - provider_proxies: HashMap::new(), - entity_map: HashMap::new(), + factories, + provider_proxies: Mutex::new(HashMap::new()), + entity_map: Mutex::new(HashMap::new()), signal_values_queue, } } @@ -162,49 +69,59 @@ impl ProviderProxySelector for ProviderProxySelectorImpl { /// # Arguments /// - `entity`: the entity that the proxy should handle async fn create_or_update_proxy( - &mut self, + &self, entity: &Entity, ) -> Result<(), ProviderProxySelectorError> { - let (entity_id, provider_uri, operation, protocol) = - (&entity.id, &entity.uri, &entity.operation, &entity.protocol); - - // If a provider proxy already exists for this uri, - // then we notify that proxy to include this new entity_id - if let Some(provider_proxy) = self.provider_proxies.get(provider_uri).cloned() { - debug!("A provider proxy for {provider_uri} already exists"); - self.entity_map - .insert(String::from(entity_id), String::from(provider_uri)); - return provider_proxy - .register_entity(entity_id, operation) - .await - .map_err(ProviderProxySelectorError::communication); + let mut provider_proxies = self.provider_proxies.lock().await; + let mut entity_map = self.entity_map.lock().await; + + // If a provider proxy already exists for one of this entity's uris, + // then we notify that proxy to include this new entity + for endpoint in entity.endpoints.iter() { + if let Some(provider_proxy) = provider_proxies.get(&endpoint.uri) { + debug!("A provider proxy for {} already exists", &endpoint.uri); + + entity_map.insert(String::from(&entity.id), String::from(&endpoint.uri)); + + return provider_proxy + .register_entity(&entity.id, &endpoint) + .await + .map_err(ProviderProxySelectorError::communication); + } } - let provider_proxy = ProviderProxyKind::create_provider_proxy( - protocol, - operation, - provider_uri, - self.signal_values_queue.clone(), - ) - .await?; + // If there's not a proxy we can resuse, find the right factory to create a new one + let (provider_proxy, endpoint) = { + let mut result = None; + for factory in self.factories.iter() { + if let Some(endpoint) = factory.is_supported(entity) { + let proxy = factory.create_proxy(&endpoint.uri, self.signal_values_queue.clone()) + .map_err(ProviderProxySelectorError::provider_proxy_error)?; + result = Some((proxy, endpoint)); + } + } + + result.ok_or::(ProviderProxySelectorErrorKind::OperationNotSupported.into())? + }; // If we're able to create a provider_proxy then map the // provider uri to that created proxy - self.provider_proxies - .insert(provider_uri.clone(), provider_proxy.clone()); - self.entity_map - .insert(String::from(entity_id), String::from(provider_uri)); + entity_map.insert(String::from(entity.id.clone()), String::from(endpoint.uri.clone())); - let proxy = provider_proxy.clone(); + let provider_proxy_clone = provider_proxy.clone(); tokio::spawn(async move { - let _ = proxy.run().await; + let _ = provider_proxy_clone.run().await; }); provider_proxy - .register_entity(entity_id, operation) + .register_entity(&entity.id, &endpoint) .await - .map_err(ProviderProxySelectorError::provider_proxy_error) + .map_err(ProviderProxySelectorError::provider_proxy_error)?; + + provider_proxies.insert(endpoint.uri.clone(), provider_proxy); + + Ok(()) } /// Requests that the value of an entity be published as soon as possible @@ -212,19 +129,20 @@ impl ProviderProxySelector for ProviderProxySelectorImpl { /// # Arguments /// - `entity_id`: the entity to request async fn request_entity_value( - &mut self, + &self, entity_id: &str, ) -> Result<(), ProviderProxySelectorError> { - let provider_uri = { - self.entity_map - .get(entity_id) - .ok_or(ProviderProxySelectorError::entity_not_found(format!( - "Unable to retrieve entity uri for {entity_id}" - )))? - .to_owned() - }; + let mut provider_proxies = self.provider_proxies.lock().await; + let entity_map = self.entity_map.lock().await; + + let provider_uri = entity_map + .get(entity_id) + .ok_or(ProviderProxySelectorError::entity_not_found(format!( + "Unable to retrieve entity uri for {entity_id}" + )))? + .to_owned(); - match self.provider_proxies.entry(provider_uri) { + match provider_proxies.entry(provider_uri) { Entry::Occupied(provider_proxy) => provider_proxy .get() .send_request_to_provider(entity_id) @@ -241,7 +159,7 @@ impl ProviderProxySelector for ProviderProxySelectorImpl { mod provider_proxy_selector_tests { use super::*; - use freyja_contracts::provider_proxy_selector::ProviderProxySelectorErrorKind; + use freyja_contracts::{provider_proxy_selector::ProviderProxySelectorErrorKind, entity::EntityEndpoint}; const AMBIENT_AIR_TEMPERATURE_ID: &str = "dtmi:sdv:Vehicle:Cabin:HVAC:AmbientAirTemperature;1"; const OPERATION: &str = "Subscribe"; @@ -249,15 +167,17 @@ mod provider_proxy_selector_tests { #[tokio::test] async fn handle_start_provider_proxy_request_return_err_test() { let signal_values_queue: Arc> = Arc::new(SegQueue::new()); - let mut uut = ProviderProxySelectorImpl::new(signal_values_queue); + let uut = ProviderProxySelectorImpl::new(signal_values_queue); let entity = Entity { id: String::from(AMBIENT_AIR_TEMPERATURE_ID), - uri: String::new(), name: None, description: None, - operation: OPERATION.to_string(), - protocol: String::from("grpc"), + endpoints: vec![EntityEndpoint { + operations: vec![OPERATION.to_string()], + uri: String::new(), + protocol: String::from("grpc"), + }] }; let result = uut.create_or_update_proxy(&entity).await; @@ -268,23 +188,4 @@ mod provider_proxy_selector_tests { ProviderProxySelectorErrorKind::Communication ); } - - #[test] - fn protocol_kind_match_test() { - let mut grpc = String::from("grpc"); - let mut protocol_kind = ProviderProxyKind::from_str(&grpc).unwrap(); - assert_eq!(protocol_kind, ProviderProxyKind::GRPCProviderProxy); - - grpc = String::from("gRPC"); - protocol_kind = ProviderProxyKind::from_str(&grpc).unwrap(); - assert_eq!(protocol_kind, ProviderProxyKind::GRPCProviderProxy); - - let mut mqtt = String::from("mqtt"); - protocol_kind = ProviderProxyKind::from_str(&mqtt).unwrap(); - assert_eq!(protocol_kind, ProviderProxyKind::MqttProviderProxy); - - mqtt = String::from("mQTT"); - protocol_kind = ProviderProxyKind::from_str(&mqtt).unwrap(); - assert_eq!(protocol_kind, ProviderProxyKind::MqttProviderProxy); - } }