Skip to content

Commit

Permalink
integrate new find_by_id contract and use factory model
Browse files Browse the repository at this point in the history
  • Loading branch information
wilyle committed Oct 31, 2023
1 parent 9adb5bd commit 7e420e6
Show file tree
Hide file tree
Showing 22 changed files with 435 additions and 359 deletions.
42 changes: 25 additions & 17 deletions common/src/signal_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ mod signal_store_tests {

use freyja_contracts::{
conversion::Conversion,
entity::Entity,
entity::{Entity, EntityEndpoint},
signal::{Emission, EmissionPolicy, Target},
};

Expand Down Expand Up @@ -264,12 +264,14 @@ mod signal_store_tests {
id: ID.to_string(),
value: Some(ORIGINAL.to_string()),
source: Entity {
id: ID.to_string(),
name: Some(ORIGINAL.to_string()),
uri: ORIGINAL.to_string(),
id: ID.to_string(),
description: Some(ORIGINAL.to_string()),
operation: GET_OPERATION.to_string(),
protocol: ORIGINAL.to_string(),
endpoints: vec![EntityEndpoint {
protocol: ORIGINAL.to_string(),
operations: vec![GET_OPERATION.to_string()],
uri: ORIGINAL.to_string(),
}],
},
target: Target {
metadata: [(ORIGINAL.to_string(), ORIGINAL.to_string())]
Expand All @@ -293,12 +295,14 @@ mod signal_store_tests {
id: ID.to_string(),
value: Some(INCOMING.to_string()),
source: Entity {
id: ID.to_string(),
name: Some(INCOMING.to_string()),
uri: INCOMING.to_string(),
id: ID.to_string(),
description: Some(INCOMING.to_string()),
operation: "FooOperation".to_string(),
protocol: INCOMING.to_string(),
endpoints: vec![EntityEndpoint {
protocol: INCOMING.to_string(),
operations: vec!["FooOperation".to_string()],
uri: INCOMING.to_string(),
}],
},
target: Target {
metadata: [(INCOMING.to_string(), INCOMING.to_string())]
Expand Down Expand Up @@ -363,12 +367,14 @@ mod signal_store_tests {
id: ID.to_string(),
value: Some(INCOMING.to_string()),
source: Entity {
id: ID.to_string(),
name: Some(INCOMING.to_string()),
uri: INCOMING.to_string(),
id: ID.to_string(),
description: Some(INCOMING.to_string()),
operation: GET_OPERATION.to_string(),
protocol: INCOMING.to_string(),
endpoints: vec![EntityEndpoint {
protocol: INCOMING.to_string(),
operations: vec![GET_OPERATION.to_string()],
uri: INCOMING.to_string(),
}],
},
target: Target {
metadata: [(INCOMING.to_string(), INCOMING.to_string())]
Expand Down Expand Up @@ -426,12 +432,14 @@ mod signal_store_tests {
id: ID.to_string(),
value: Some(ORIGINAL.to_string()),
source: Entity {
id: ID.to_string(),
name: Some(ORIGINAL.to_string()),
uri: ORIGINAL.to_string(),
id: ID.to_string(),
description: Some(ORIGINAL.to_string()),
operation: GET_OPERATION.to_string(),
protocol: ORIGINAL.to_string(),
endpoints: vec![EntityEndpoint {
protocol: ORIGINAL.to_string(),
operations: vec![GET_OPERATION.to_string()],
uri: ORIGINAL.to_string(),
}],
},
target: Target {
metadata: [(ORIGINAL.to_string(), ORIGINAL.to_string())]
Expand Down
23 changes: 15 additions & 8 deletions contracts/src/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,28 @@ use serde::{Deserialize, Serialize};
/// Represents an entity
#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Entity {
/// The entity's id
pub id: String,

// The entity's name
pub name: Option<String>,

/// The provider's uri
pub uri: String,
/// The entity's id
pub id: String,

/// The entity's description
pub description: Option<String>,

/// The operation that we will use for this entity
pub operation: String,
/// The list of supported endpoints
pub endpoints: Vec<EntityEndpoint>,
}

/// Represents an entity's endpoint for communication
#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EntityEndpoint {
/// The protocol to use to contact this entity
pub protocol: String,
}

/// The operations that this entity supports
pub operations: Vec<String>,

/// The provider's uri
pub uri: String,
}
23 changes: 12 additions & 11 deletions contracts/src/provider_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use crossbeam::queue::SegQueue;

use crate::entity::{EntityEndpoint, Entity};

/// Represents a signal value
pub struct SignalValue {
/// The entity's id
Expand All @@ -26,11 +28,11 @@ pub trait ProviderProxy: Debug {
fn create_new(
provider_uri: &str,
signal_values_queue: Arc<SegQueue<SignalValue>>,
) -> Result<Box<dyn ProviderProxy + Send + Sync>, ProviderProxyError>
) -> Result<Arc<dyn ProviderProxy + Send + Sync>, ProviderProxyError>
where
Self: Sized;

/// Runs a provider proxy
/// Runs a provider proxy.
async fn run(&self) -> Result<(), ProviderProxyError>;

/// Sends a request to a provider for obtaining the value of an entity
Expand All @@ -44,20 +46,18 @@ pub trait ProviderProxy: Debug {
///
/// # Arguments
/// - `entity_id`: the entity id to add
/// - `operation`: the operation that this entity supports
/// - `endpoint`: the endpoint that this entity supports
async fn register_entity(
&self,
entity_id: &str,
operation: &str,
endpoint: &EntityEndpoint,
) -> Result<(), ProviderProxyError>;
}

/// Checks if this operation is supported
///
/// # Arguments
/// - `operation`: check to see if this operation is supported by this provider proxy
fn is_operation_supported(operation: &str) -> bool
where
Self: Sized + Send + Sync;
pub trait ProviderProxyFactory {
fn is_supported(&self, entity: &Entity) -> Option<EntityEndpoint>;

fn create_proxy(&self, provider_uri: &str, signal_values_queue: Arc<SegQueue<SignalValue>>) -> Result<Arc<dyn ProviderProxy + Send + Sync>, ProviderProxyError>;
}

proc_macros::error! {
Expand All @@ -68,6 +68,7 @@ proc_macros::error! {
Deserialize,
Communication,
EntityNotFound,
OperationNotSupported,
Unknown
}
}
4 changes: 2 additions & 2 deletions contracts/src/provider_proxy_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub trait ProviderProxySelector {
/// # Arguments
/// - `entity`: the entity that the proxy should handle
async fn create_or_update_proxy(
&mut self,
&self,
entity: &Entity,
) -> Result<(), ProviderProxySelectorError>;

Expand All @@ -25,7 +25,7 @@ pub trait ProviderProxySelector {
/// # Arguments
/// - `entity_id`: the entity to request
async fn request_entity_value(
&mut self,
&self,
entity_id: &str,
) -> Result<(), ProviderProxySelectorError>;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ The In-Memory Mock Digital Twin Adapter mocks the behavior of an in-vehicle digi

This adapter supports the following configuration settings:

- `values`: a list of entities to use. Each entry in the list is an object with the following properties:
- `entity`: a digital twin entity that will be exposed to the `find_by_id` API. Entities contain the following properties:
- `id`: this is used as the key when calling `find_by_id`.
- `uri`: the uri that is used to invoke a provider. This is a stand-in for whatever the provider contact info is from Ibeji. This is used as the key when calling `subscribe` and `get` in the [In-Memory Provider Proxy](../../provider_proxies/in_memory_mock_provider_proxy/).
- `operation`: the operation that should be used to access this entity.
- `protocol`: the communication protocol that should be used to access this entity. For this particular adapter, the value should always be `in-memory`.
- `name` and `description`: these are currently unused by Freyja. They are included for completeness and parity with Ibeji's Digital Twin Service contract and may potentially be logged.
- `values`: A list of entities to use. Each entry in the list is an object with the following properties:
- `entity`: A digital twin entity that will be exposed to the `find_by_id` API. Entities contain the following properties:
- `id`: This is used as the key when calling `find_by_id`.
- `name` and `description`: These are currently unused by Freyja. They are included for completeness and parity with Ibeji's Digital Twin Service contract and may potentially be logged.
- `endpoints`: A list of endpoints that this entity supports. Each entry in the list is an object with the following properties:
- `protocol`: The communication protocol that should be used to access this entity. When using this adapter, the value of this property should be `in-memory` for most cases.
- `operations`: A list of operations that can be used to access this entity.
- `uri`: The uri that is used to invoke a provider. This is a stand-in for whatever the provider contact info is from Ibeji. This is used as the key when calling `subscribe` and `get` in the [In-Memory Provider Proxy](../../provider_proxies/in_memory_mock_provider_proxy/).

This adapter supports [config overrides](../../docs/config-overrides.md). The override filename is `in_memory_digital_twin_config.json`, and the default config is located at `res/in_memory_digital_twin_config.default.json`.
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,42 @@
"entity": {
"id": "dtmi:sdv:Vehicle:Cabin:HVAC:AmbientAirTemperature;1",
"name": "AmbientAirTemperature",
"uri": "http://0.0.0.0:1111",
"description": "The immediate surroundings air temperature (in Fahrenheit).",
"operation": "Get",
"protocol": "in-memory"
"description": "The air temperature of the immediate surroundings (in Fahrenheit).",
"endpoints": [
{
"protocol": "in-memory",
"operations": ["Get"],
"uri": "http://0.0.0.0:1111"
}
]
}
},
{
"entity": {
"id": "dtmi:sdv:Vehicle:Cabin:HVAC:IsAirConditioningActive;1",
"name": "IsAirConditioningActive",
"uri": "http://0.0.0.0:1111",
"description": "Is air conditioning active?",
"operation": "Subscribe",
"protocol": "in-memory"
"endpoints": [
{
"protocol": "in-memory",
"operations": ["Subscribe"],
"uri": "http://0.0.0.0:1111"
}
]
}
},
{
"entity": {
"id": "dtmi:sdv:Vehicle:OBD:HybridBatteryRemaining;1",
"name": "HybridBatteryRemaining",
"uri": "http://0.0.0.0:1111",
"description": "Percentage of the hybrid battery remaining",
"operation": "Subscribe",
"protocol": "in-memory"
"endpoints": [
{
"protocol": "in-memory",
"operations": ["Subscribe"],
"uri": "http://0.0.0.0:1111"
}
]
}
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ mod in_memory_mock_digital_twin_adapter_tests {
use super::*;

use crate::config::EntityConfig;
use freyja_contracts::entity::Entity;
use freyja_contracts::entity::{Entity, EntityEndpoint};

const OPERATION: &str = "Subscribe";

Expand All @@ -86,12 +86,14 @@ mod in_memory_mock_digital_twin_adapter_tests {
let config = Config {
values: vec![EntityConfig {
entity: Entity {
id: String::from(ENTITY_ID),
name: None,
uri: String::from("http://0.0.0.0:1111"), // Devskim: ignore DS137138
id: ENTITY_ID.to_string(),
description: None,
operation: OPERATION.to_string(),
protocol: String::from("in-memory"),
endpoints: vec![EntityEndpoint {
protocol: String::from("in-memory"),
operations: vec![OPERATION.to_string()],
uri: String::from("http://0.0.0.0:1111"), // Devskim: ignore DS137138
}],
},
}],
};
Expand All @@ -100,11 +102,17 @@ mod in_memory_mock_digital_twin_adapter_tests {
let request = FindByIdRequest {
entity_id: String::from(ENTITY_ID),
};

let response = in_memory_digital_twin_adapter
.find_by_id(request)
.await
.unwrap();

assert_eq!(response.entity.id, ENTITY_ID);
assert_eq!(response.entity.operation, OPERATION);
assert_eq!(response.entity.endpoints.len(), 1);
let endpoint = response.entity.endpoints.first().unwrap();
assert_eq!(endpoint.operations.len(), 1);
let operation = endpoint.operations.first().unwrap();
assert_eq!(operation, OPERATION);
}
}
16 changes: 9 additions & 7 deletions freyja/src/cartographer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl<
.entity;

{
let mut provider_proxy_selector = self.provider_proxy_selector.lock().await;
let provider_proxy_selector = self.provider_proxy_selector.lock().await;
provider_proxy_selector
.create_or_update_proxy(&signal.source)
.await
Expand All @@ -209,7 +209,7 @@ mod cartographer_tests {
use freyja_contracts::{
digital_twin_adapter::{DigitalTwinAdapterError, FindByIdResponse},
digital_twin_map_entry::DigitalTwinMapEntry,
entity::Entity,
entity::{Entity, EntityEndpoint},
mapping_client::{
CheckForWorkResponse, GetMappingResponse, MappingClientError, SendInventoryRequest,
SendInventoryResponse,
Expand Down Expand Up @@ -264,8 +264,8 @@ mod cartographer_tests {

#[async_trait]
impl ProviderProxySelector for ProviderProxySelector {
async fn create_or_update_proxy(&mut self, entity: &Entity) -> Result<(), ProviderProxySelectorError>;
async fn request_entity_value(&mut self, entity_id: &str) -> Result<(), ProviderProxySelectorError>;
async fn create_or_update_proxy(&self, entity: &Entity) -> Result<(), ProviderProxySelectorError>;
async fn request_entity_value(&self, entity_id: &str) -> Result<(), ProviderProxySelectorError>;
}
}

Expand Down Expand Up @@ -326,10 +326,12 @@ mod cartographer_tests {
let test_entity = Entity {
id: ID.to_string(),
name: Some("name".to_string()),
uri: "uri".to_string(),
description: Some("description".to_string()),
operation: "FooOperation".to_string(),
protocol: "in-memory".to_string(),
endpoints: vec![EntityEndpoint {
operations: vec!["FooOperation".to_string()],
protocol: "in-memory".to_string(),
uri: "uri".to_string(),
}]
};

let test_signal_patch = &mut SignalPatch {
Expand Down
6 changes: 3 additions & 3 deletions freyja/src/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<TCloudAdapter: CloudAdapter, TProviderProxySelector: ProviderProxySelector>
// This approach to requesting signal values introduces an inherent delay in uploading data
// of signal.emission.policy.interval_ms and needs to be revisited.
let proxy_result = {
let mut provider_proxy_selector = self.provider_proxy_selector.lock().await;
let provider_proxy_selector = self.provider_proxy_selector.lock().await;
provider_proxy_selector
.request_entity_value(&signal.id)
.await
Expand Down Expand Up @@ -255,8 +255,8 @@ mod emitter_tests {

#[async_trait]
impl ProviderProxySelector for ProviderProxySelector {
async fn create_or_update_proxy(&mut self, entity: &Entity) -> Result<(), ProviderProxySelectorError>;
async fn request_entity_value(&mut self, entity_id: &str) -> Result<(), ProviderProxySelectorError>;
async fn create_or_update_proxy(&self, entity: &Entity) -> Result<(), ProviderProxySelectorError>;
async fn request_entity_value(&self, entity_id: &str) -> Result<(), ProviderProxySelectorError>;
}
}

Expand Down
Loading

0 comments on commit 7e420e6

Please sign in to comment.