Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

py qol #269

Merged
merged 2 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion miniconf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
[![Continuous Integration](https://github.com/vertigo-designs/miniconf/workflows/Continuous%20Integration/badge.svg)](https://github.com/quartiq/miniconf/actions)

`miniconf` enables lightweight (`no_std`/no alloc) serialization, deserialization,
and access within a tree of heretogeneous types by keys.
and access within a tree of heterogeneous types by keys.

## Example

Expand Down
35 changes: 19 additions & 16 deletions py/miniconf-mqtt/miniconf/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import uuid
from typing import Dict, Any

from aiomqtt import Client, Message, MqttError
from paho.mqtt.properties import Properties, PacketTypes
import paho.mqtt
from paho.mqtt.properties import Properties, PacketTypes
from aiomqtt import Client, Message, MqttError

MQTTv5 = paho.mqtt.enums.MQTTProtocolVersion.MQTTv5

Expand Down Expand Up @@ -111,18 +111,17 @@ def _dispatch(self, message: Message):
LOGGER.warning("Discarding message without response code user property")
return

response = message.payload.decode("utf-8")
resp = message.payload.decode("utf-8")
if code == "Continue":
ret.append(response)
return

if code == "Ok":
if response:
ret.append(response)
ret.append(resp)
elif code == "Ok":
if resp:
ret.append(resp)
fut.set_result(ret)
del self._inflight[cd]
else:
fut.set_exception(MiniconfException(code, response))
del self._inflight[cd]
fut.set_exception(MiniconfException(code, resp))
del self._inflight[cd]

async def _do(self, topic: str, *, response=1, **kwargs):
response = int(response)
Expand Down Expand Up @@ -150,6 +149,7 @@ async def _do(self, topic: str, *, response=1, **kwargs):
return ret[0]
assert ret
return ret
return None

async def set(self, path: str, value, retain=False, response=True, **kwargs):
"""Write the provided data to the specified path.
Expand Down Expand Up @@ -206,7 +206,10 @@ async def clear(self, path: str, response=True, **kwargs):
path: The path to clear. Must be a leaf node.
"""
return await self._do(
f"{self.prefix}/settings{path}", retain=True, response=response, **kwargs
f"{self.prefix}/settings{path}",
retain=True,
response=response,
**kwargs,
)


Expand Down Expand Up @@ -234,10 +237,6 @@ async def discover(
suffix = "/alive"
topic = f"{prefix}{suffix}"

t_start = asyncio.get_running_loop().time()
await client.subscribe(topic)
t_subscribe = asyncio.get_running_loop().time() - t_start

async def listen():
async for message in client.messages:
peer = message.topic.value.removesuffix(suffix)
Expand All @@ -249,6 +248,10 @@ async def listen():
logging.info(f"Discovered {peer} alive")
discovered[peer] = payload

t_start = asyncio.get_running_loop().time()
await client.subscribe(topic)
t_subscribe = asyncio.get_running_loop().time() - t_start

try:
await asyncio.wait_for(
listen(), timeout=rel_timeout * t_subscribe + abs_timeout
Expand Down
9 changes: 7 additions & 2 deletions py/miniconf-mqtt/miniconf/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class Miniconf:
"""Miniconf over MQTT (synchronous)"""

def __init__(self, client: Client, prefix: str):
"""
Args:
client: A connected MQTT5 client.
prefix: The MQTT toptic prefix of the device to control.
"""
self.client = client
self.prefix = prefix
self.response_topic = f"{prefix}/response"
Expand Down Expand Up @@ -173,6 +178,8 @@ def get(self, path: str, **kwargs):
def clear(self, path: str, response=True, **kwargs):
"""Clear retained value from a path.

This does not change (`set()`) or reset/clear the value on the device.

Args:
path: The path to clear. Must be a leaf node.
"""
Expand Down Expand Up @@ -208,8 +215,6 @@ def discover(
suffix = "/alive"
topic = f"{prefix}{suffix}"

discovered = {}

def on_message(_client, _userdata, message):
logging.debug(f"Got message from {message.topic}: {message.payload}")
peer = message.topic.removesuffix(suffix)
Expand Down
Loading