diff --git a/Cargo.lock b/Cargo.lock index 195844fe..0529b5a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1568,6 +1568,7 @@ dependencies = [ "http-mock-provider-proxy", "in-memory-mock-provider-proxy", "log", + "mqtt-provider-proxy", "tokio", ] diff --git a/provider_proxies/mqtt/src/mqtt_provider_proxy.rs b/provider_proxies/mqtt/src/mqtt_provider_proxy.rs index c8307596..e53e62fa 100644 --- a/provider_proxies/mqtt/src/mqtt_provider_proxy.rs +++ b/provider_proxies/mqtt/src/mqtt_provider_proxy.rs @@ -108,7 +108,7 @@ impl MqttProviderProxy { value } Err(e) => { - warn!("Failed to parse value: {e}"); + warn!("Failed to parse value |{value}|: {e}"); value } } @@ -184,12 +184,13 @@ impl ProviderProxy for MqttProviderProxy { // Start the thread for handling publishes from providers tokio::spawn(async move { + info!("Started MQTT listener"); for msg in receiver.iter() { if let Some(m) = msg { let subsciptions = subscriptions.lock().await; let entity_id = subsciptions.get(m.topic()).unwrap().clone(); // TODO: additional parsing for value? - let value = Self::parse_value(m.to_string()); + let value = Self::parse_value(m.payload_str().to_string()); signal_values_queue.push(SignalValue { entity_id, value }); } else { let client = client.lock().await; @@ -260,6 +261,8 @@ impl ProviderProxy for MqttProviderProxy { // Topic comes from the endpoint context let topic = endpoint.context.clone(); + debug!("Subscribing to topic {topic}"); + let client = self.client.lock().await; client.subscribe(&topic, QOS_1).map_err(ProviderProxyError::communication)?; let mut subscriptions = self.subscriptions.lock().await; diff --git a/provider_proxy_selector/Cargo.toml b/provider_proxy_selector/Cargo.toml index 86170749..630022c8 100644 --- a/provider_proxy_selector/Cargo.toml +++ b/provider_proxy_selector/Cargo.toml @@ -15,5 +15,6 @@ freyja-contracts = { workspace = true } grpc-provider-proxy-v1 = { path = "../provider_proxies/grpc/v1" } http-mock-provider-proxy = { path = "../provider_proxies/http_mock_provider_proxy" } in-memory-mock-provider-proxy = { path = "../provider_proxies/in_memory_mock_provider_proxy" } +mqtt-provider-proxy = { path = "../provider_proxies/mqtt" } log = { workspace = true } tokio = { workspace = true } diff --git a/provider_proxy_selector/src/provider_proxy_selector_impl.rs b/provider_proxy_selector/src/provider_proxy_selector_impl.rs index afcaaf0a..eefbbc34 100644 --- a/provider_proxy_selector/src/provider_proxy_selector_impl.rs +++ b/provider_proxy_selector/src/provider_proxy_selector_impl.rs @@ -10,6 +10,7 @@ use std::{ use async_trait::async_trait; use crossbeam::queue::SegQueue; use log::debug; +use mqtt_provider_proxy::mqtt_provider_proxy_factory::MqttProviderProxyFactory; use tokio::sync::Mutex; use freyja_contracts::{ @@ -55,6 +56,7 @@ impl ProviderProxySelectorImpl { Box::new(GRPCProviderProxyFactory {}), Box::new(HttpMockProviderProxyFactory {}), Box::new(InMemoryMockProviderProxyFactory {}), + Box::new(MqttProviderProxyFactory {}), ]; ProviderProxySelectorImpl { @@ -126,7 +128,7 @@ impl ProviderProxySelector for ProviderProxySelectorImpl { let provider_proxy_clone = provider_proxy.clone(); tokio::spawn(async move { let _ = provider_proxy_clone.run().await; - }); + }).await.map_err(ProviderProxySelectorError::provider_proxy_error)?; provider_proxy .register_entity(&entity.id, &endpoint)