Skip to content

Commit

Permalink
Add properties to access Client attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
PierreF committed Jan 6, 2024
1 parent 3f71356 commit a2a4840
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 22 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ v2.0.0 - 2023-xx-xx
Minimum tested version is Python 3.7
- Add on_pre_connect() callback, which is called immediately before a
connection attempt is made.
- Add properties to access most Client attribute. Closes #764.


v1.6.1 - 2021-10-21
Expand Down
222 changes: 200 additions & 22 deletions src/paho/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
This is an MQTT client module. MQTT is a lightweight pub/sub messaging
protocol that is easy to implement and suitable for low powered devices.
"""
from __future__ import annotations

import base64
import collections
Expand All @@ -33,6 +34,7 @@
import urllib.parse
import urllib.request
import uuid
import warnings

from .matcher import MQTTMatcher
from .properties import Properties
Expand Down Expand Up @@ -512,19 +514,16 @@ def __init__(self, client_id="", clean_session=None, userdata=None,
locally.
"""

if transport.lower() not in ('websockets', 'tcp'):
raise ValueError(
f'transport must be "websockets" or "tcp", not {transport}')
self._manual_ack = manual_ack
self._transport = transport.lower()
self.transport = transport
self._protocol = protocol
self._userdata = userdata
self._sock = None
self._sockpairR, self._sockpairW = (None, None,)
self._keepalive = 60
self._connect_timeout = 5.0
self._client_mode = MQTT_CLIENT
self._clean_start: int = MQTT_CLEAN_START_FIRST_ONLY

if protocol == MQTTv5:
if clean_session is not None:
Expand Down Expand Up @@ -625,6 +624,196 @@ def __init__(self, client_id="", clean_session=None, userdata=None,
def __del__(self):
self._reset_sockets()

@property
def host(self) -> str:
"Host we try to connect to. If don't yet called connect() return empty string"
return self._host

@host.setter
def host(self, value: str) -> None:
"""
Update host. This will only be used on future (re)connection. You should probably
use reconnect() to update the connection if established.
"""
if value is None or len(value) == 0:
raise ValueError("Invalid host.")
self._host = value

@property
def port(self) -> int:
"TCP port we try to connect to."
return self._port

@port.setter
def port(self, value: int) -> None:
"""
Update post. This will only be used on future (re)connection. You should probably
use reconnect() to update the connection if established.
"""
if value <= 0:
raise ValueError("Invalid port number.")
self._port = value

@property
def keepalive(self) -> int:
"Keepalive in seconds used by the client "
return self._keepalive

@keepalive.setter
def keepalive(self, value: int) -> None:
"Update keepalive. It's behavior is undefined if the connection is already open"
if self._sock is not None:
# The issue here is that the previous value of keepalive matter to possibly
# sent ping packet.
warnings.warn(
"updating keepalive on established connection is not supported",
stacklevel=2,
)
self._sock.settimeout(value)

if value < 0:
raise ValueError("Keepalive must be >=0.")

self._keepalive = value

@property
def transport(self) -> str:
'Transport used for the connection, could be "tcp" or "websockets".'
return self._transport

@transport.setter
def transport(self, value: str) -> None:
"""
Update transport which should be "tcp" or "websockets".
This will only be used on future (re)connection. You should probably
use reconnect() to update the connection if established.
"""
if value.lower() not in ("websockets", "tcp"):
raise ValueError(
f'transport must be "websockets" or "tcp", not {value}')

self._transport = value.lower()

@property
def protocol(self) -> int:
"Protocol version used (MQTT v3, MQTT v3.11, MQTTv5)"
return self.protocol

@property
def connect_timeout(self) -> float:
"Timeout used to establish the TCP (& TLS / websocket if enabled) in seconds"
return self._connect_timeout

@connect_timeout.setter
def connect_timeout(self, value: float):
"Change connect_timeout for future (re)connection"
if value <= 0.0:
raise ValueError("timeout must be a positive number")

self._connect_timeout = value

@property
def username(self) -> str | None:
"Return the username use to connect to MQTT broken or None if not credenials are used."
if self._username is None:
return None
return self._username.decode("utf-8")

@username.setter
def username(self, value: str | None) -> None:
"""
Update username. This will only be used on future (re)connection. You should probably
use reconnect() to update the connection if established.
"""
if value is None:
self._username = None
else:
self._username = value.encode("utf-8")

@property
def password(self) -> str | None:
"Return the password use to connect to MQTT broken or None if not password are used."
if self._password is None:
return None
return self._password.decode("utf-8")

@password.setter
def password(self, value: str | None) -> None:
"""
Update password. This will only be used on future (re)connection. You should probably
use reconnect() to update the connection if established.
"""
if value is None:
self._password = None
else:
self._password = value.encode("utf-8")

@property
def max_inflight_messages(self) -> int:
"Maximum number of message with QoS > 0 that can be part way through their network flow at once"
return self._max_inflight_messages

@max_inflight_messages.setter
def max_inflight_messages(self, value: int) -> None:
"Update max_inflight_messages. It's behavior is undefined if the connection is already open"
if self._sock is not None:
# Not tested. Some doubt that everything is okay when max_inflight change between 0
# and > 0 value because _update_inflight is skipped when _max_inflight_messages == 0
warnings.warn(
"updating max_inflight_messages on established connection is not supported",
stacklevel=2,
)

if value < 0:
raise ValueError("Invalid inflight.")

self._max_inflight_messages = value

@property
def max_queued_messages(self) -> int:
"Maximum number of message with QoS > 0 that can be part way through their network flow at once"
return self._max_queued_messages

@max_queued_messages.setter
def max_queued_messages(self, value: int) -> None:
"Update max_queued_messages. It's behavior is undefined if the connection is already open"
if self._sock is not None:
# Not tested.
warnings.warn(
"updating max_queued_messages on established connection is not supported",
stacklevel=2,
)

if value < 0:
raise ValueError("Invalid queue size.")

self._max_queued_messages = value

@property
def will_topic(self) -> str | None:
"Return the topic will is sent to on unexpected disconnect, or None if will isn't set"
if self._will_topic is None:
return None

return self._will_topic.decode("utf-8")

@property
def will_payload(self) -> bytes | None:
"Return the payload will send on unexpected disconnect, or None if will isn't set"
if self._will_topic is None:
return None

return self._will_topic.decode("utf-8")

@property
def logger(self) -> logging.Logger | None:
return self._logger

@logger.setter
def logger(self, value: logging.Logger) -> None:
self._logger = value


def _sock_recv(self, bufsize):
try:
return self._sock.recv(bufsize)
Expand Down Expand Up @@ -870,7 +1059,7 @@ def enable_logger(self, logger=None):
# Do not replace existing logger
return
logger = logging.getLogger(__name__)
self._logger = logger
self.logger = logger

def disable_logger(self):
self._logger = None
Expand Down Expand Up @@ -968,25 +1157,18 @@ def connect_async(self, host, port=1883, keepalive=60, bind_address="", bind_por
properties: (MQTT v5.0 only) the MQTT v5.0 properties to be sent in the
MQTT connect packet. Use the Properties class.
"""
if host is None or len(host) == 0:
raise ValueError('Invalid host.')
if port <= 0:
raise ValueError('Invalid port number.')
if keepalive < 0:
raise ValueError('Keepalive must be >=0.')
if bind_port < 0:
raise ValueError('Invalid bind port number.')

self._host = host
self._port = port
self._keepalive = keepalive
self.host = host
self.port = port
self.keepalive = keepalive
self._bind_address = bind_address
self._bind_port = bind_port
self._clean_start = clean_start
self._connect_properties = properties
self._state = mqtt_cs_connect_async


def reconnect_delay_set(self, min_delay=1, max_delay=120):
""" Configure the exponential reconnect delay
Expand Down Expand Up @@ -1631,18 +1813,14 @@ def loop_misc(self):
def max_inflight_messages_set(self, inflight):
"""Set the maximum number of messages with QoS>0 that can be part way
through their network flow at once. Defaults to 20."""
if inflight < 0:
raise ValueError('Invalid inflight.')
self._max_inflight_messages = inflight
self.max_inflight_messages = inflight

def max_queued_messages_set(self, queue_size):
"""Set the maximum number of messages in the outgoing message queue.
0 means unlimited."""
if queue_size < 0:
raise ValueError('Invalid queue size.')
if not isinstance(queue_size, int):
raise ValueError('Invalid type of queue size.')
self._max_queued_messages = queue_size
self.max_queued_messages = queue_size
return self

def message_retry_set(self, retry):
Expand Down

0 comments on commit a2a4840

Please sign in to comment.