diff --git a/common/src/message_utils.rs b/common/src/message_utils.rs index a069567c..f9ddde87 100644 --- a/common/src/message_utils.rs +++ b/common/src/message_utils.rs @@ -2,7 +2,7 @@ // Licensed under the MIT license. // SPDX-License-Identifier: MIT -use log::{warn, debug}; +use log::{debug, warn}; const METADATA_KEY: &str = "$metadata"; @@ -49,12 +49,11 @@ pub fn parse_value(value: String) -> String { _ => continue, }; - let metadata_descriptor = - if property_map.contains_key(&METADATA_KEY.to_string()) { - "has" - } else { - "does not have" - }; + let metadata_descriptor = if property_map.contains_key(&METADATA_KEY.to_string()) { + "has" + } else { + "does not have" + }; debug!( "Value contained {} properties and {metadata_descriptor} a {METADATA_KEY} property. Selecting property with key {} as the signal value", @@ -165,4 +164,4 @@ mod message_utils_tests { assert_eq!(result, expected_value); } -} \ No newline at end of file +} diff --git a/provider_proxies/mqtt/src/config.rs b/provider_proxies/mqtt/src/config.rs index 43cb7e28..89239bfe 100644 --- a/provider_proxies/mqtt/src/config.rs +++ b/provider_proxies/mqtt/src/config.rs @@ -9,4 +9,4 @@ use serde::{Deserialize, Serialize}; pub struct Config { /// The keep alive interval in seconds pub keep_alive_interval_s: u64, -} \ No newline at end of file +} diff --git a/provider_proxies/mqtt/src/mqtt_provider_proxy.rs b/provider_proxies/mqtt/src/mqtt_provider_proxy.rs index e1c33303..b756457b 100644 --- a/provider_proxies/mqtt/src/mqtt_provider_proxy.rs +++ b/provider_proxies/mqtt/src/mqtt_provider_proxy.rs @@ -2,21 +2,18 @@ // Licensed under the MIT license. // SPDX-License-Identifier: MIT -use std::{ - collections::HashMap, - sync::Arc, time::Duration, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; use crossbeam::queue::SegQueue; -use log::{info, debug}; +use log::{debug, info}; use paho_mqtt::{Client, QOS_1}; use tokio::sync::Mutex; use uuid::Uuid; -use crate::{config::Config, SUBSCRIBE_OPERATION, MQTT_PROTOCOL}; +use crate::{config::Config, MQTT_PROTOCOL, SUBSCRIBE_OPERATION}; use freyja_build_common::config_file_stem; -use freyja_common::{config_utils, out_dir, message_utils}; +use freyja_common::{config_utils, message_utils, out_dir}; use freyja_contracts::{ entity::EntityEndpoint, provider_proxy::{ProviderProxy, ProviderProxyError, ProviderProxyErrorKind, SignalValue}, @@ -31,7 +28,7 @@ const MQTT_CLIENT_ID_PREFIX: &str = "freyja-mqtt-proxy"; pub struct MqttProviderProxy { /// The proxy config config: Config, - + /// The MQTT client client: Arc>, @@ -70,8 +67,8 @@ impl ProviderProxy for MqttProviderProxy { .client_id(client_id) .finalize(); - let client = paho_mqtt::Client::new(create_options) - .map_err(ProviderProxyError::communication)?; + let client = + paho_mqtt::Client::new(create_options).map_err(ProviderProxyError::communication)?; Ok(MqttProviderProxy { config, @@ -96,12 +93,13 @@ impl ProviderProxy for MqttProviderProxy { .clean_session(false) .will_message(lwt) .finalize(); - + let receiver; { let client = self.client.lock().await; receiver = client.start_consuming(); - let _ = client.connect(connection_options) + let _ = client + .connect(connection_options) .map_err(ProviderProxyError::communication); } @@ -130,7 +128,7 @@ impl ProviderProxy for MqttProviderProxy { log::error!("Error resubscribing to topic {topic}: {e}"); } } - }, + } Err(e) => { log::error!("Fatal error trying to reconnect to mqtt client: {e}"); break; @@ -182,19 +180,25 @@ impl ProviderProxy for MqttProviderProxy { // Verify that the endpoint has the expected data. // This shouldn't be necessary since it's first verified by the factory, // but this ensures we don't get hit by an edge case - if endpoint.protocol != MQTT_PROTOCOL || !endpoint.operations.contains(&SUBSCRIBE_OPERATION.to_string()) { + if endpoint.protocol != MQTT_PROTOCOL + || !endpoint + .operations + .contains(&SUBSCRIBE_OPERATION.to_string()) + { return Err(ProviderProxyErrorKind::OperationNotSupported.into()); } - + // Topic comes from the endpoint context let topic = endpoint.context.clone(); debug!("Subscribing to topic {topic}"); let client = self.client.lock().await; - client.subscribe(&topic, QOS_1).map_err(ProviderProxyError::communication)?; + client + .subscribe(&topic, QOS_1) + .map_err(ProviderProxyError::communication)?; let mut subscriptions = self.subscriptions.lock().await; subscriptions.insert(topic, entity_id.to_string()); Ok(()) } -} \ No newline at end of file +} diff --git a/provider_proxies/mqtt/src/mqtt_provider_proxy_factory.rs b/provider_proxies/mqtt/src/mqtt_provider_proxy_factory.rs index ef22cbc1..9c1f5f78 100644 --- a/provider_proxies/mqtt/src/mqtt_provider_proxy_factory.rs +++ b/provider_proxies/mqtt/src/mqtt_provider_proxy_factory.rs @@ -10,9 +10,7 @@ use freyja_contracts::{ provider_proxy::{ProviderProxy, ProviderProxyError, ProviderProxyFactory, SignalValue}, }; -use crate::{ - mqtt_provider_proxy::MqttProviderProxy, MQTT_PROTOCOL, SUBSCRIBE_OPERATION, -}; +use crate::{mqtt_provider_proxy::MqttProviderProxy, MQTT_PROTOCOL, SUBSCRIBE_OPERATION}; /// Factory for creating MqttProviderProxies pub struct MqttProviderProxyFactory {} diff --git a/provider_proxy_selector/src/provider_proxy_selector_impl.rs b/provider_proxy_selector/src/provider_proxy_selector_impl.rs index eefbbc34..e5cf17d6 100644 --- a/provider_proxy_selector/src/provider_proxy_selector_impl.rs +++ b/provider_proxy_selector/src/provider_proxy_selector_impl.rs @@ -128,7 +128,9 @@ impl ProviderProxySelector for ProviderProxySelectorImpl { let provider_proxy_clone = provider_proxy.clone(); tokio::spawn(async move { let _ = provider_proxy_clone.run().await; - }).await.map_err(ProviderProxySelectorError::provider_proxy_error)?; + }) + .await + .map_err(ProviderProxySelectorError::provider_proxy_error)?; provider_proxy .register_entity(&entity.id, &endpoint)