Skip to content

Commit

Permalink
readme and a bit of cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
wilyle committed Nov 10, 2023
1 parent 96fac4f commit 8ea46af
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 44 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 20 additions & 30 deletions provider_proxies/grpc/v1/src/grpc_client_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::sync::Arc;

use crossbeam::queue::SegQueue;
use log::{debug, warn};
use serde_json::Value;
use tonic::{Request, Response, Status};

use freyja_contracts::provider_proxy::SignalValue;
Expand Down Expand Up @@ -46,7 +45,7 @@ impl GRPCClientImpl {
/// # Arguments
/// - `value`: the value to attempt to parse
fn parse_value(value: String) -> String {
match serde_json::from_str::<Value>(&value) {
match serde_json::from_str::<serde_json::Value>(&value) {
Ok(v) => {
let property_map = match v.as_object() {
Some(o) => o,
Expand All @@ -56,45 +55,36 @@ impl GRPCClientImpl {
}
};

let mut selected_property = None;
for property in property_map.iter() {
if property.0 == METADATA_KEY {
continue;
}

let selected_value = match property.1 {
Value::String(s) => s.clone(),
Value::Bool(b) => b.to_string(),
Value::Number(n) => n.to_string(),
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Bool(b) => b.to_string(),
serde_json::Value::Number(n) => n.to_string(),
_ => continue,
};

selected_property = Some((property.0, selected_value));
break;
}
let metadata_descriptor =
if property_map.contains_key(&METADATA_KEY.to_string()) {
"has"
} else {
"does not have"
};

match selected_property {
Some((k, v)) => {
let metadata_descriptor =
if property_map.contains_key(&METADATA_KEY.to_string()) {
"has"
} else {
"does not have"
};

debug!(
"Value contained {} properties and {metadata_descriptor} a {METADATA_KEY} property. Selecting property with key {} as the signal value",
property_map.len(),
k
);

v
}
None => {
warn!("Could not find a property that was parseable as a value");
value
}
debug!(
"Value contained {} properties and {metadata_descriptor} a {METADATA_KEY} property. Selecting property with key {} as the signal value",
property_map.len(),
property.0
);

return selected_value;
}

warn!("Could not find a property that was parseable as a value");
value
}
Err(e) => {
warn!("Failed to parse value: {e}");
Expand Down
3 changes: 0 additions & 3 deletions provider_proxies/mqtt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@ crossbeam = { workspace = true }
freyja-build-common = { workspace = true }
freyja-common = { workspace = true }
freyja-contracts = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
paho-mqtt = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true, features = ["v4"]}

[dev-dependencies]
tokio-stream = { workspace = true }

[build-dependencies]
freyja-build-common = { workspace = true }
11 changes: 5 additions & 6 deletions provider_proxies/mqtt/README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
# gRPC Provider Proxy
# MQTT Provider Proxy

The gRPC Provider Proxy interfaces with providers which support gRPC. It acts as a consumer for digital twin providers. This proxy supports the `Get` and `Subscribe` operations as defined for the [Ibeji mixed sample](https://github.com/eclipse-ibeji/ibeji/tree/main/samples/mixed). To use this proxy with other providers, those providers will need to support the same API(s) as the provider in that sample (see [Integrating with this Proxy](#integrating-with-this-proxy) for more information).
The MQTT Provider Proxy interfaces with providers which support MQTT. It acts as a consumer for digital twin providers. This proxy supports the `Subscribe` operations as defined for the [Ibeji property sample](https://github.com/eclipse-ibeji/ibeji/tree/main/samples/property). To use this proxy with other providers, those providers will need to support the same API(s) as the provider in that sample (see [Integrating with this Proxy](#integrating-with-this-proxy) for more information).

## Configuration

This proxy supports the following configuration settings:

- `consumer_address`: The address for the proxy's consumer. The proxy's gRPC server will be hosted on this address.
- `advertised_consumer_address`: (Optional) The advertised address for the proxy's consumer. This is the address that will be reported as the callback address to providers, enabling scenarios where the providers should use a different address from the actual hosting address. If not specified, this proxy will default to using the consumer address.
- `keep_alive_interval_s`: The keep alive interval for MQTT communications, in seconds

This adapter supports [config overrides](../../../docs/config-overrides.md). The override filename is `grpc_proxy_config.json`, and the default config is located at `res/grpc_proxy_config.default.json`.
This adapter supports [config overrides](../../../docs/config-overrides.md). The override filename is `mqtt_proxy_config.json`, and the default config is located at `res/mqtt_proxy_config.default.json`.

## Integrating with this Proxy

This proxy supports the `Publish` API as [defined by the Ibeji samples](https://github.com/eclipse-ibeji/ibeji/blob/main/samples/interfaces/sample_grpc/v1/digital_twin_consumer.proto). In addition, the `value` property of the `PublishRequest` message that providers publish must conform to one of the following structures in order to properly extract the signal value:
To integrate this proxy with other providers using MQTT, the message that the providers publish must conform to one of the following structures to in order to properly extract the signal value:

- A raw value as a string. For example, `"42"` or `"\"foo\""`.
<!--alex ignore savage-->
Expand Down
77 changes: 75 additions & 2 deletions provider_proxies/mqtt/src/mqtt_provider_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use async_trait::async_trait;
use crossbeam::queue::SegQueue;
use log::{info, debug};
use log::{info, debug, warn};
use paho_mqtt::{Client, QOS_1};
use tokio::sync::Mutex;
use uuid::Uuid;
Expand All @@ -22,6 +22,7 @@ use freyja_contracts::{
provider_proxy::{ProviderProxy, ProviderProxyError, ProviderProxyErrorKind, SignalValue},
};

const METADATA_KEY: &str = "$metadata";
const MQTT_CLIENT_ID_PREFIX: &str = "freyja-mqtt-proxy";

/// Interfaces with providers which support GRPC. Based on the Ibeji mixed sample.
Expand All @@ -42,6 +43,78 @@ pub struct MqttProviderProxy {
signal_values_queue: Arc<SegQueue<SignalValue>>,
}

// TODO: make this common?
impl MqttProviderProxy {
/// Parses the value published by a provider.
/// The current implementation is a workaround for the current Ibeji sample provider implementation,
/// which uses a non-consistent contract as follows:
///
/// ```ignore
/// {
/// "{propertyName}": "value",
/// "$metadata": {...}
/// }
/// ```
///
/// Note that `{propertyName}` is replaced by the name of the property that the provider published.
/// This function will extract the value from the first property satisfying the following conditions:
/// - The property is not named `$metadata`
/// - The property value is a non-null primitive JSON type (string, bool, or number)
///
/// If any part of parsing fails, a warning is logged and the original value is returned.
///
/// # Arguments
/// - `value`: the value to attempt to parse
fn parse_value(value: String) -> String {
match serde_json::from_str::<serde_json::Value>(&value) {
Ok(v) => {
let property_map = match v.as_object() {
Some(o) => o,
None => {
warn!("Could not parse value as JSON object");
return value;
}
};

for property in property_map.iter() {
if property.0 == METADATA_KEY {
continue;
}

let selected_value = match property.1 {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Bool(b) => b.to_string(),
serde_json::Value::Number(n) => n.to_string(),
_ => continue,
};

let metadata_descriptor =
if property_map.contains_key(&METADATA_KEY.to_string()) {
"has"
} else {
"does not have"
};

debug!(
"Value contained {} properties and {metadata_descriptor} a {METADATA_KEY} property. Selecting property with key {} as the signal value",
property_map.len(),
property.0
);

return selected_value;
}

warn!("Could not find a property that was parseable as a value");
value
}
Err(e) => {
warn!("Failed to parse value: {e}");
value
}
}
}
}

#[async_trait]
impl ProviderProxy for MqttProviderProxy {
/// Creates a provider proxy
Expand Down Expand Up @@ -116,7 +189,7 @@ impl ProviderProxy for MqttProviderProxy {
let subsciptions = subscriptions.lock().await;
let entity_id = subsciptions.get(m.topic()).unwrap().clone();
// TODO: additional parsing for value?
let value = m.to_string();
let value = Self::parse_value(m.to_string());
signal_values_queue.push(SignalValue { entity_id, value });
} else {
let client = client.lock().await;
Expand Down

0 comments on commit 8ea46af

Please sign in to comment.