Skip to content

Commit

Permalink
minimq master, embedded-nal 0.9
Browse files Browse the repository at this point in the history
  • Loading branch information
jordens committed Jan 16, 2025
1 parent 0958b29 commit b66a2e9
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 34 deletions.
4 changes: 2 additions & 2 deletions miniconf_mqtt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ categories = ["no-std", "config", "rust-patterns", "parsing"]

[dependencies]
miniconf = { version = "0.18.0", features = ["json-core"], default-features = false, path = "../miniconf" }
minimq = "0.9.0"
minimq = { version = "0.9.0", git = "https://github.com/quartiq/minimq.git" }
smlang = "0.8"
embedded-io = "0.6"
log = "0.4"
Expand All @@ -27,7 +27,7 @@ name = "mqtt"
[dev-dependencies]
machine = "0.3"
env_logger = "0.11"
std-embedded-nal = "0.3"
std-embedded-nal = "0.4"
tokio = { version = "1.9", features = ["rt-multi-thread", "time", "macros"] }
std-embedded-time = "0.1"
miniconf = { features = ["json-core", "derive"], path = "../miniconf" }
Expand Down
2 changes: 1 addition & 1 deletion miniconf_mqtt/examples/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn main() {
env_logger::init();

let mut buffer = [0u8; 1024];
let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap();
let localhost: core::net::IpAddr = "127.0.0.1".parse().unwrap();

// Construct a settings configuration interface.
let mut client = miniconf_mqtt::MqttClient::<_, _, _, _, 4>::new(
Expand Down
57 changes: 26 additions & 31 deletions miniconf_mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl From<ResponseCode> for minimq::Property<'static> {
/// }
///
/// let mut buffer = [0u8; 1024];
/// let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap();
/// let localhost: core::net::IpAddr = "127.0.0.1".parse().unwrap();
/// let mut client = miniconf_mqtt::MqttClient::<_, _, _, _, 1>::new(
/// std_embedded_nal::Stack::default(),
/// "quartiq/application/12345", // prefix
Expand Down Expand Up @@ -344,12 +344,9 @@ where
// Publish a connection status message.
let mut topic: String<MAX_TOPIC_LENGTH> = self.prefix.try_into().unwrap();
topic.push_str("/alive").unwrap();
let msg = Publication::new(self.alive.as_bytes())
.topic(&topic)
let msg = Publication::new(&topic, self.alive.as_bytes())
.qos(QoS::AtLeastOnce)
.retain()
.finish()
.unwrap(); // Note(unwrap): has topic
.retain();
self.mqtt.client().publish(msg)
}

Expand Down Expand Up @@ -387,18 +384,20 @@ where
};

let props = [code.into()];
let mut response = Publication::new(path.as_bytes())
.topic(self.pending.response_topic.as_ref().unwrap()) // Note(unwrap) checked in update()
.properties(&props)
.qos(QoS::AtLeastOnce);
let mut response = Publication::new(
self.pending.response_topic.as_ref().unwrap(),
path.as_bytes(),
)
.properties(&props)
.qos(QoS::AtLeastOnce);

if let Some(cd) = &self.pending.correlation_data {
response = response.correlate(cd);
}

self.mqtt
.client()
.publish(response.finish().unwrap()) // Note(unwrap): has topic
.publish(response) // Note(unwrap): has topic
.unwrap(); // Note(unwrap) checked can_publish()

if code != ResponseCode::Continue {
Expand Down Expand Up @@ -426,8 +425,7 @@ where

let props = [ResponseCode::Ok.into()];
let mut response =
DeferredPublication::new(|buf| json::get_by_key(settings, &path, buf))
.topic(&topic)
DeferredPublication::new(&topic, |buf| json::get_by_key(settings, &path, buf))
.properties(&props)
.qos(QoS::AtLeastOnce);

Expand All @@ -436,7 +434,7 @@ where
}

// Note(unwrap): has topic
match self.mqtt.client().publish(response.finish().unwrap()) {
match self.mqtt.client().publish(response) {
Err(minimq::PubError::Serialization(miniconf::Error::Traversal(
Traversal::Absent(_),
))) => {}
Expand All @@ -447,18 +445,18 @@ where
)),
))) => {
let props = [ResponseCode::Error.into()];
let mut response = Publication::new("Serialized value too large".as_bytes())
.topic(&topic)
.properties(&props)
.qos(QoS::AtLeastOnce);
let mut response =
Publication::new(&topic, "Serialized value too large".as_bytes())
.properties(&props)
.qos(QoS::AtLeastOnce);

if let Some(cd) = &self.pending.correlation_data {
response = response.correlate(cd);
}

self.mqtt
.client()
.publish(response.finish().unwrap()) // Note(unwrap): has topic
.publish(response) // Note(unwrap): has topic
.unwrap(); // Note(unwrap): checked can_publish, error message is short
}
other => other.unwrap(),
Expand All @@ -477,15 +475,13 @@ where
> {
client
.publish(
DeferredPublication::new(|mut buf| {
DeferredPublication::respond(request, |mut buf| {
let start = buf.len();
write!(buf, "{}", response).and_then(|_| Ok(start - buf.len()))
})
.reply(request)
.unwrap()
.properties(&[code.into()])
.qos(QoS::AtLeastOnce)
.finish()
.map_err(minimq::Error::from)?,
.qos(QoS::AtLeastOnce),
)
.inspect_err(|err| {
info!("Response failure: {err:?}");
Expand Down Expand Up @@ -514,13 +510,12 @@ where
// Get, Dump, or List
// Try a Get assuming a leaf node
if let Err(err) = client.publish(
DeferredPublication::new(|buf| json::get_by_key(settings, path, buf))
.topic(topic)
.reply(properties)
.properties(&[ResponseCode::Ok.into()])
.qos(QoS::AtLeastOnce)
.finish()
.unwrap(), // Note(unwrap): has topic
DeferredPublication::respond(properties, |buf| {
json::get_by_key(settings, path, buf)
})
.unwrap()
.properties(&[ResponseCode::Ok.into()])
.qos(QoS::AtLeastOnce),
) {
match err {
minimq::PubError::Serialization(miniconf::Error::Traversal(
Expand Down

0 comments on commit b66a2e9

Please sign in to comment.