Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
wilyle committed Nov 13, 2023
1 parent 20d682b commit 47adecd
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 30 deletions.
15 changes: 7 additions & 8 deletions common/src/message_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use log::{warn, debug};
use log::{debug, warn};

const METADATA_KEY: &str = "$metadata";

Expand Down Expand Up @@ -49,12 +49,11 @@ pub fn parse_value(value: String) -> String {
_ => continue,
};

let metadata_descriptor =
if property_map.contains_key(&METADATA_KEY.to_string()) {
"has"
} else {
"does not have"
};
let metadata_descriptor = if property_map.contains_key(&METADATA_KEY.to_string()) {
"has"
} else {
"does not have"
};

debug!(
"Value contained {} properties and {metadata_descriptor} a {METADATA_KEY} property. Selecting property with key {} as the signal value",
Expand Down Expand Up @@ -165,4 +164,4 @@ mod message_utils_tests {

assert_eq!(result, expected_value);
}
}
}
2 changes: 1 addition & 1 deletion provider_proxies/mqtt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ use serde::{Deserialize, Serialize};
pub struct Config {
/// The keep alive interval in seconds
pub keep_alive_interval_s: u64,
}
}
38 changes: 21 additions & 17 deletions provider_proxies/mqtt/src/mqtt_provider_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use std::{
collections::HashMap,
sync::Arc, time::Duration,
};
use std::{collections::HashMap, sync::Arc, time::Duration};

use async_trait::async_trait;
use crossbeam::queue::SegQueue;
use log::{info, debug};
use log::{debug, info};
use paho_mqtt::{Client, QOS_1};
use tokio::sync::Mutex;
use uuid::Uuid;

use crate::{config::Config, SUBSCRIBE_OPERATION, MQTT_PROTOCOL};
use crate::{config::Config, MQTT_PROTOCOL, SUBSCRIBE_OPERATION};
use freyja_build_common::config_file_stem;
use freyja_common::{config_utils, out_dir, message_utils};
use freyja_common::{config_utils, message_utils, out_dir};
use freyja_contracts::{
entity::EntityEndpoint,
provider_proxy::{ProviderProxy, ProviderProxyError, ProviderProxyErrorKind, SignalValue},
Expand All @@ -31,7 +28,7 @@ const MQTT_CLIENT_ID_PREFIX: &str = "freyja-mqtt-proxy";
pub struct MqttProviderProxy {
/// The proxy config
config: Config,

/// The MQTT client
client: Arc<Mutex<Client>>,

Expand Down Expand Up @@ -70,8 +67,8 @@ impl ProviderProxy for MqttProviderProxy {
.client_id(client_id)
.finalize();

let client = paho_mqtt::Client::new(create_options)
.map_err(ProviderProxyError::communication)?;
let client =
paho_mqtt::Client::new(create_options).map_err(ProviderProxyError::communication)?;

Ok(MqttProviderProxy {
config,
Expand All @@ -96,12 +93,13 @@ impl ProviderProxy for MqttProviderProxy {
.clean_session(false)
.will_message(lwt)
.finalize();

let receiver;
{
let client = self.client.lock().await;
receiver = client.start_consuming();
let _ = client.connect(connection_options)
let _ = client
.connect(connection_options)
.map_err(ProviderProxyError::communication);
}

Expand Down Expand Up @@ -130,7 +128,7 @@ impl ProviderProxy for MqttProviderProxy {
log::error!("Error resubscribing to topic {topic}: {e}");
}
}
},
}
Err(e) => {
log::error!("Fatal error trying to reconnect to mqtt client: {e}");
break;
Expand Down Expand Up @@ -182,19 +180,25 @@ impl ProviderProxy for MqttProviderProxy {
// Verify that the endpoint has the expected data.
// This shouldn't be necessary since it's first verified by the factory,
// but this ensures we don't get hit by an edge case
if endpoint.protocol != MQTT_PROTOCOL || !endpoint.operations.contains(&SUBSCRIBE_OPERATION.to_string()) {
if endpoint.protocol != MQTT_PROTOCOL
|| !endpoint
.operations
.contains(&SUBSCRIBE_OPERATION.to_string())
{
return Err(ProviderProxyErrorKind::OperationNotSupported.into());
}

// Topic comes from the endpoint context
let topic = endpoint.context.clone();
debug!("Subscribing to topic {topic}");

let client = self.client.lock().await;
client.subscribe(&topic, QOS_1).map_err(ProviderProxyError::communication)?;
client
.subscribe(&topic, QOS_1)
.map_err(ProviderProxyError::communication)?;
let mut subscriptions = self.subscriptions.lock().await;
subscriptions.insert(topic, entity_id.to_string());

Ok(())
}
}
}
4 changes: 1 addition & 3 deletions provider_proxies/mqtt/src/mqtt_provider_proxy_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use freyja_contracts::{
provider_proxy::{ProviderProxy, ProviderProxyError, ProviderProxyFactory, SignalValue},
};

use crate::{
mqtt_provider_proxy::MqttProviderProxy, MQTT_PROTOCOL, SUBSCRIBE_OPERATION,
};
use crate::{mqtt_provider_proxy::MqttProviderProxy, MQTT_PROTOCOL, SUBSCRIBE_OPERATION};

/// Factory for creating MqttProviderProxies
pub struct MqttProviderProxyFactory {}
Expand Down
4 changes: 3 additions & 1 deletion provider_proxy_selector/src/provider_proxy_selector_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ impl ProviderProxySelector for ProviderProxySelectorImpl {
let provider_proxy_clone = provider_proxy.clone();
tokio::spawn(async move {
let _ = provider_proxy_clone.run().await;
}).await.map_err(ProviderProxySelectorError::provider_proxy_error)?;
})
.await
.map_err(ProviderProxySelectorError::provider_proxy_error)?;

provider_proxy
.register_entity(&entity.id, &endpoint)
Expand Down

0 comments on commit 47adecd

Please sign in to comment.