Skip to content

Commit

Permalink
changes after testing
Browse files Browse the repository at this point in the history
  • Loading branch information
wilyle committed Nov 13, 2023
1 parent 22b59c5 commit 4afe8a0
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions provider_proxies/mqtt/src/mqtt_provider_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl MqttProviderProxy {
value
}
Err(e) => {
warn!("Failed to parse value: {e}");
warn!("Failed to parse value |{value}|: {e}");
value
}
}
Expand Down Expand Up @@ -184,12 +184,13 @@ impl ProviderProxy for MqttProviderProxy {

// Start the thread for handling publishes from providers
tokio::spawn(async move {
info!("Started MQTT listener");
for msg in receiver.iter() {
if let Some(m) = msg {
let subsciptions = subscriptions.lock().await;
let entity_id = subsciptions.get(m.topic()).unwrap().clone();
// TODO: additional parsing for value?
let value = Self::parse_value(m.to_string());
let value = Self::parse_value(m.payload_str().to_string());
signal_values_queue.push(SignalValue { entity_id, value });
} else {
let client = client.lock().await;
Expand Down Expand Up @@ -260,6 +261,8 @@ impl ProviderProxy for MqttProviderProxy {

// Topic comes from the endpoint context
let topic = endpoint.context.clone();
debug!("Subscribing to topic {topic}");

let client = self.client.lock().await;
client.subscribe(&topic, QOS_1).map_err(ProviderProxyError::communication)?;
let mut subscriptions = self.subscriptions.lock().await;
Expand Down
1 change: 1 addition & 0 deletions provider_proxy_selector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ freyja-contracts = { workspace = true }
grpc-provider-proxy-v1 = { path = "../provider_proxies/grpc/v1" }
http-mock-provider-proxy = { path = "../provider_proxies/http_mock_provider_proxy" }
in-memory-mock-provider-proxy = { path = "../provider_proxies/in_memory_mock_provider_proxy" }
mqtt-provider-proxy = { path = "../provider_proxies/mqtt" }
log = { workspace = true }
tokio = { workspace = true }
4 changes: 3 additions & 1 deletion provider_proxy_selector/src/provider_proxy_selector_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
use async_trait::async_trait;
use crossbeam::queue::SegQueue;
use log::debug;
use mqtt_provider_proxy::mqtt_provider_proxy_factory::MqttProviderProxyFactory;
use tokio::sync::Mutex;

use freyja_contracts::{
Expand Down Expand Up @@ -55,6 +56,7 @@ impl ProviderProxySelectorImpl {
Box::new(GRPCProviderProxyFactory {}),
Box::new(HttpMockProviderProxyFactory {}),
Box::new(InMemoryMockProviderProxyFactory {}),
Box::new(MqttProviderProxyFactory {}),
];

ProviderProxySelectorImpl {
Expand Down Expand Up @@ -126,7 +128,7 @@ impl ProviderProxySelector for ProviderProxySelectorImpl {
let provider_proxy_clone = provider_proxy.clone();
tokio::spawn(async move {
let _ = provider_proxy_clone.run().await;
});
}).await.map_err(ProviderProxySelectorError::provider_proxy_error)?;

provider_proxy
.register_entity(&entity.id, &endpoint)
Expand Down

0 comments on commit 4afe8a0

Please sign in to comment.