From 8ea46afc48671ecfdddbbcb92d9c052eb1940a57 Mon Sep 17 00:00:00 2001 From: William Lyles <26171886+wilyle@users.noreply.github.com> Date: Fri, 10 Nov 2023 14:08:59 -0800 Subject: [PATCH] readme and a bit of cleanup --- Cargo.lock | 3 - .../grpc/v1/src/grpc_client_impl.rs | 50 +++++------- provider_proxies/mqtt/Cargo.toml | 3 - provider_proxies/mqtt/README.md | 11 ++- .../mqtt/src/mqtt_provider_proxy.rs | 77 ++++++++++++++++++- 5 files changed, 100 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bceabc87..195844fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1165,14 +1165,11 @@ dependencies = [ "freyja-build-common", "freyja-common", "freyja-contracts", - "futures", "log", "paho-mqtt", "serde", "serde_json", - "tempfile", "tokio", - "tokio-stream", "uuid", ] diff --git a/provider_proxies/grpc/v1/src/grpc_client_impl.rs b/provider_proxies/grpc/v1/src/grpc_client_impl.rs index 287e85e1..ea420332 100644 --- a/provider_proxies/grpc/v1/src/grpc_client_impl.rs +++ b/provider_proxies/grpc/v1/src/grpc_client_impl.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use crossbeam::queue::SegQueue; use log::{debug, warn}; -use serde_json::Value; use tonic::{Request, Response, Status}; use freyja_contracts::provider_proxy::SignalValue; @@ -46,7 +45,7 @@ impl GRPCClientImpl { /// # Arguments /// - `value`: the value to attempt to parse fn parse_value(value: String) -> String { - match serde_json::from_str::(&value) { + match serde_json::from_str::(&value) { Ok(v) => { let property_map = match v.as_object() { Some(o) => o, @@ -56,45 +55,36 @@ impl GRPCClientImpl { } }; - let mut selected_property = None; for property in property_map.iter() { if property.0 == METADATA_KEY { continue; } let selected_value = match property.1 { - Value::String(s) => s.clone(), - Value::Bool(b) => b.to_string(), - Value::Number(n) => n.to_string(), + serde_json::Value::String(s) => s.clone(), + serde_json::Value::Bool(b) => b.to_string(), + serde_json::Value::Number(n) => n.to_string(), _ => continue, }; - selected_property = Some((property.0, selected_value)); - break; - } + let metadata_descriptor = + if property_map.contains_key(&METADATA_KEY.to_string()) { + "has" + } else { + "does not have" + }; - match selected_property { - Some((k, v)) => { - let metadata_descriptor = - if property_map.contains_key(&METADATA_KEY.to_string()) { - "has" - } else { - "does not have" - }; - - debug!( - "Value contained {} properties and {metadata_descriptor} a {METADATA_KEY} property. Selecting property with key {} as the signal value", - property_map.len(), - k - ); - - v - } - None => { - warn!("Could not find a property that was parseable as a value"); - value - } + debug!( + "Value contained {} properties and {metadata_descriptor} a {METADATA_KEY} property. Selecting property with key {} as the signal value", + property_map.len(), + property.0 + ); + + return selected_value; } + + warn!("Could not find a property that was parseable as a value"); + value } Err(e) => { warn!("Failed to parse value: {e}"); diff --git a/provider_proxies/mqtt/Cargo.toml b/provider_proxies/mqtt/Cargo.toml index 52c8d9c3..87655643 100644 --- a/provider_proxies/mqtt/Cargo.toml +++ b/provider_proxies/mqtt/Cargo.toml @@ -14,17 +14,14 @@ crossbeam = { workspace = true } freyja-build-common = { workspace = true } freyja-common = { workspace = true } freyja-contracts = { workspace = true } -futures = { workspace = true } log = { workspace = true } paho-mqtt = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -tempfile = { workspace = true } tokio = { workspace = true } uuid = { workspace = true, features = ["v4"]} [dev-dependencies] -tokio-stream = { workspace = true } [build-dependencies] freyja-build-common = { workspace = true } \ No newline at end of file diff --git a/provider_proxies/mqtt/README.md b/provider_proxies/mqtt/README.md index 277fac11..bba87d14 100644 --- a/provider_proxies/mqtt/README.md +++ b/provider_proxies/mqtt/README.md @@ -1,19 +1,18 @@ -# gRPC Provider Proxy +# MQTT Provider Proxy -The gRPC Provider Proxy interfaces with providers which support gRPC. It acts as a consumer for digital twin providers. This proxy supports the `Get` and `Subscribe` operations as defined for the [Ibeji mixed sample](https://github.com/eclipse-ibeji/ibeji/tree/main/samples/mixed). To use this proxy with other providers, those providers will need to support the same API(s) as the provider in that sample (see [Integrating with this Proxy](#integrating-with-this-proxy) for more information). +The MQTT Provider Proxy interfaces with providers which support MQTT. It acts as a consumer for digital twin providers. This proxy supports the `Subscribe` operations as defined for the [Ibeji property sample](https://github.com/eclipse-ibeji/ibeji/tree/main/samples/property). To use this proxy with other providers, those providers will need to support the same API(s) as the provider in that sample (see [Integrating with this Proxy](#integrating-with-this-proxy) for more information). ## Configuration This proxy supports the following configuration settings: -- `consumer_address`: The address for the proxy's consumer. The proxy's gRPC server will be hosted on this address. -- `advertised_consumer_address`: (Optional) The advertised address for the proxy's consumer. This is the address that will be reported as the callback address to providers, enabling scenarios where the providers should use a different address from the actual hosting address. If not specified, this proxy will default to using the consumer address. +- `keep_alive_interval_s`: The keep alive interval for MQTT communications, in seconds -This adapter supports [config overrides](../../../docs/config-overrides.md). The override filename is `grpc_proxy_config.json`, and the default config is located at `res/grpc_proxy_config.default.json`. +This adapter supports [config overrides](../../../docs/config-overrides.md). The override filename is `mqtt_proxy_config.json`, and the default config is located at `res/mqtt_proxy_config.default.json`. ## Integrating with this Proxy -This proxy supports the `Publish` API as [defined by the Ibeji samples](https://github.com/eclipse-ibeji/ibeji/blob/main/samples/interfaces/sample_grpc/v1/digital_twin_consumer.proto). In addition, the `value` property of the `PublishRequest` message that providers publish must conform to one of the following structures in order to properly extract the signal value: +To integrate this proxy with other providers using MQTT, the message that the providers publish must conform to one of the following structures to in order to properly extract the signal value: - A raw value as a string. For example, `"42"` or `"\"foo\""`. diff --git a/provider_proxies/mqtt/src/mqtt_provider_proxy.rs b/provider_proxies/mqtt/src/mqtt_provider_proxy.rs index 5271e2f3..c8307596 100644 --- a/provider_proxies/mqtt/src/mqtt_provider_proxy.rs +++ b/provider_proxies/mqtt/src/mqtt_provider_proxy.rs @@ -9,7 +9,7 @@ use std::{ use async_trait::async_trait; use crossbeam::queue::SegQueue; -use log::{info, debug}; +use log::{info, debug, warn}; use paho_mqtt::{Client, QOS_1}; use tokio::sync::Mutex; use uuid::Uuid; @@ -22,6 +22,7 @@ use freyja_contracts::{ provider_proxy::{ProviderProxy, ProviderProxyError, ProviderProxyErrorKind, SignalValue}, }; +const METADATA_KEY: &str = "$metadata"; const MQTT_CLIENT_ID_PREFIX: &str = "freyja-mqtt-proxy"; /// Interfaces with providers which support GRPC. Based on the Ibeji mixed sample. @@ -42,6 +43,78 @@ pub struct MqttProviderProxy { signal_values_queue: Arc>, } +// TODO: make this common? +impl MqttProviderProxy { + /// Parses the value published by a provider. + /// The current implementation is a workaround for the current Ibeji sample provider implementation, + /// which uses a non-consistent contract as follows: + /// + /// ```ignore + /// { + /// "{propertyName}": "value", + /// "$metadata": {...} + /// } + /// ``` + /// + /// Note that `{propertyName}` is replaced by the name of the property that the provider published. + /// This function will extract the value from the first property satisfying the following conditions: + /// - The property is not named `$metadata` + /// - The property value is a non-null primitive JSON type (string, bool, or number) + /// + /// If any part of parsing fails, a warning is logged and the original value is returned. + /// + /// # Arguments + /// - `value`: the value to attempt to parse + fn parse_value(value: String) -> String { + match serde_json::from_str::(&value) { + Ok(v) => { + let property_map = match v.as_object() { + Some(o) => o, + None => { + warn!("Could not parse value as JSON object"); + return value; + } + }; + + for property in property_map.iter() { + if property.0 == METADATA_KEY { + continue; + } + + let selected_value = match property.1 { + serde_json::Value::String(s) => s.clone(), + serde_json::Value::Bool(b) => b.to_string(), + serde_json::Value::Number(n) => n.to_string(), + _ => continue, + }; + + let metadata_descriptor = + if property_map.contains_key(&METADATA_KEY.to_string()) { + "has" + } else { + "does not have" + }; + + debug!( + "Value contained {} properties and {metadata_descriptor} a {METADATA_KEY} property. Selecting property with key {} as the signal value", + property_map.len(), + property.0 + ); + + return selected_value; + } + + warn!("Could not find a property that was parseable as a value"); + value + } + Err(e) => { + warn!("Failed to parse value: {e}"); + value + } + } + } +} + #[async_trait] impl ProviderProxy for MqttProviderProxy { /// Creates a provider proxy @@ -116,7 +189,7 @@ impl ProviderProxy for MqttProviderProxy { let subsciptions = subscriptions.lock().await; let entity_id = subsciptions.get(m.topic()).unwrap().clone(); // TODO: additional parsing for value? - let value = m.to_string(); + let value = Self::parse_value(m.to_string()); signal_values_queue.push(SignalValue { entity_id, value }); } else { let client = client.lock().await;