From cead6af86b66f20bd85111ba16235efe19851dd3 Mon Sep 17 00:00:00 2001 From: Pierre Fersing Date: Sun, 7 Jan 2024 15:54:58 +0100 Subject: [PATCH 1/4] Fix is_connected property when not using loop_forever --- ChangeLog.txt | 4 ++++ src/paho/mqtt/client.py | 15 +++++++++++++++ src/paho/mqtt/enums.py | 1 + 3 files changed, 20 insertions(+) diff --git a/ChangeLog.txt b/ChangeLog.txt index 0e13b7d0..eabcd625 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -23,6 +23,10 @@ v2.0.0 - 2023-xx-xx - Fix loading too weak TLS CA file but setting allowed ciphers before loading CA. Closes #676. - Allow to manually ack QoS > 0 messages. Closes #753 & #348. - Improve tests & linters. Modernize build (drop setup.py, use pyproject.toml) +- Fix is_connected property to correctly return False when connection is lost + and loop_start/loop_forever isn't used. Closes #525. + + v1.6.1 - 2021-10-21 =================== diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index 024f80ab..1767f603 100644 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -1290,10 +1290,19 @@ def _loop(self, timeout: float = 1.0) -> MQTTErrorCode: socklist = select.select(rlist, wlist, [], timeout) except TypeError: # Socket isn't correct type, in likelihood connection is lost + # ... or we called disconnect(). In that case the socket will + # be closed but some loop (like loop_forever) will continue to + # call _loop(). We still want to break that loop by returning an + # rc != MQTT_ERR_SUCCESS and we don't want state to change from + # mqtt_cs_disconnecting. + if self._state != ConnectionState.MQTT_CS_DISCONNECTING: + self._state = ConnectionState.MQTT_CS_CONNECTION_LOST return MQTTErrorCode.MQTT_ERR_CONN_LOST except ValueError: # Can occur if we just reconnected but rlist/wlist contain a -1 for # some reason. + if self._state != ConnectionState.MQTT_CS_DISCONNECTING: + self._state = ConnectionState.MQTT_CS_CONNECTION_LOST return MQTTErrorCode.MQTT_ERR_CONN_LOST except Exception: # Note that KeyboardInterrupt, etc. can still terminate since they @@ -1768,6 +1777,8 @@ def loop_misc(self) -> MQTTErrorCode: if self._state == mqtt_cs_disconnecting: rc = MQTTErrorCode.MQTT_ERR_SUCCESS else: + self._state = ConnectionState.MQTT_CS_CONNECTION_LOST + self._easy_log(MQTT_LOG_DEBUG, "... 2") rc = MQTTErrorCode.MQTT_ERR_KEEPALIVE self._do_on_disconnect(rc) @@ -2580,6 +2591,10 @@ def _loop_rc_handle( self._do_on_disconnect(rc, properties) + if rc == MQTT_ERR_CONN_LOST: + self._easy_log(MQTT_LOG_DEBUG, "... 3") + self._state = ConnectionState.MQTT_CS_CONNECTION_LOST + return rc def _packet_read(self) -> MQTTErrorCode: diff --git a/src/paho/mqtt/enums.py b/src/paho/mqtt/enums.py index 33f18cb6..2f167510 100644 --- a/src/paho/mqtt/enums.py +++ b/src/paho/mqtt/enums.py @@ -68,6 +68,7 @@ class ConnectionState(enum.IntEnum): MQTT_CS_CONNECTED = 1 MQTT_CS_DISCONNECTING = 2 MQTT_CS_CONNECT_ASYNC = 3 + MQTT_CS_CONNECTION_LOST = 4 class MessageState(enum.IntEnum): From 16658c94c81dc55b6443ca81e12e400d493d00a3 Mon Sep 17 00:00:00 2001 From: Pierre Fersing Date: Wed, 10 Jan 2024 22:29:23 +0100 Subject: [PATCH 2/4] Add test for is_connected using loop and loop_start --- tests/test_client.py | 120 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 116 insertions(+), 4 deletions(-) diff --git a/tests/test_client.py b/tests/test_client.py index d6457fe7..145f911c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,19 +1,21 @@ +import threading import time import unicodedata import paho.mqtt.client as client import pytest +from paho.mqtt.enums import MQTTErrorCode, MQTTProtocolVersion from paho.mqtt.reasoncodes import ReasonCodes import tests.paho_test as paho_test # Import test fixture -from tests.testsupport.broker import fake_broker # noqa: F401 +from tests.testsupport.broker import FakeBroker, fake_broker # noqa: F401 @pytest.mark.parametrize("proto_ver", [ - (client.MQTTv31), - (client.MQTTv311), + (MQTTProtocolVersion.MQTTv31), + (MQTTProtocolVersion.MQTTv311), ]) class Test_connect: """ @@ -100,7 +102,7 @@ class Test_connect_v5: def test_01_broker_no_support(self, fake_broker): mqttc = client.Client( - "01-broker-no-support", protocol=client.MQTTv5) + "01-broker-no-support", protocol=MQTTProtocolVersion.MQTTv5) def on_connect(mqttc, obj, flags, reason, properties): assert reason == 132 @@ -137,6 +139,116 @@ def on_connect(mqttc, obj, flags, reason, properties): mqttc.loop_stop() +class TestConnectionLost: + def test_with_loop_start(self, fake_broker: FakeBroker): + mqttc = client.Client( + "test_with_loop_start", + protocol=MQTTProtocolVersion.MQTTv311, + reconnect_on_failure=False, + ) + + on_connect_reached = threading.Event() + on_disconnect_reached = threading.Event() + + + def on_connect(mqttc, obj, flags, rc): + assert rc == 0 + on_connect_reached.set() + + def on_disconnect(*args): + on_disconnect_reached.set() + + mqttc.on_connect = on_connect + mqttc.on_disconnect = on_disconnect + + mqttc.connect_async("localhost", fake_broker.port) + mqttc.loop_start() + + try: + fake_broker.start() + + connect_packet = paho_test.gen_connect( + "test_with_loop_start", keepalive=60, + proto_ver=MQTTProtocolVersion.MQTTv311) + packet_in = fake_broker.receive_packet(1000) + assert packet_in # Check connection was not closed + assert packet_in == connect_packet + + connack_packet = paho_test.gen_connack(rc=0) + count = fake_broker.send_packet(connack_packet) + assert count # Check connection was not closed + assert count == len(connack_packet) + + assert on_connect_reached.wait(1) + assert mqttc.is_connected() + + fake_broker.finish() + + assert on_disconnect_reached.wait(1) + assert not mqttc.is_connected() + + finally: + mqttc.loop_stop() + + def test_with_loop(self, fake_broker: FakeBroker): + mqttc = client.Client( + "test_with_loop", + clean_session=True, + ) + + on_connect_reached = threading.Event() + on_disconnect_reached = threading.Event() + + + def on_connect(mqttc, obj, flags, rc): + assert rc == 0 + on_connect_reached.set() + + def on_disconnect(*args): + on_disconnect_reached.set() + + mqttc.on_connect = on_connect + mqttc.on_disconnect = on_disconnect + + + try: + mqttc.connect("localhost", fake_broker.port) + + fake_broker.start() + + # not yet connected, packet are not yet processed by loop() + assert not mqttc.is_connected() + + # connect packet is sent during connect() call + connect_packet = paho_test.gen_connect( + "test_with_loop", keepalive=60, + proto_ver=MQTTProtocolVersion.MQTTv311) + packet_in = fake_broker.receive_packet(1000) + assert packet_in # Check connection was not closed + assert packet_in == connect_packet + + connack_packet = paho_test.gen_connack(rc=0) + count = fake_broker.send_packet(connack_packet) + assert count # Check connection was not closed + assert count == len(connack_packet) + + # call loop() to process the connack packet + assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_SUCCESS + + assert on_connect_reached.wait(1) + assert mqttc.is_connected() + + fake_broker.finish() + + # call loop() to detect the connection lost + assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_CONN_LOST + + assert on_disconnect_reached.wait(1) + assert not mqttc.is_connected() + + finally: + mqttc.loop_stop() + class TestPublishBroker2Client: def test_invalid_utf8_topic(self, fake_broker): From 3c7484fe396d4526a903c6556c7151ac1b340765 Mon Sep 17 00:00:00 2001 From: Pierre Fersing Date: Sat, 13 Jan 2024 18:19:14 +0100 Subject: [PATCH 3/4] Drop temporary debug logs --- src/paho/mqtt/client.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index 1767f603..0943dd03 100644 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -1778,7 +1778,6 @@ def loop_misc(self) -> MQTTErrorCode: rc = MQTTErrorCode.MQTT_ERR_SUCCESS else: self._state = ConnectionState.MQTT_CS_CONNECTION_LOST - self._easy_log(MQTT_LOG_DEBUG, "... 2") rc = MQTTErrorCode.MQTT_ERR_KEEPALIVE self._do_on_disconnect(rc) @@ -2592,7 +2591,6 @@ def _loop_rc_handle( self._do_on_disconnect(rc, properties) if rc == MQTT_ERR_CONN_LOST: - self._easy_log(MQTT_LOG_DEBUG, "... 3") self._state = ConnectionState.MQTT_CS_CONNECTION_LOST return rc From a66ef8f269c6f644c5b1f34e0b95bb229edda3b6 Mon Sep 17 00:00:00 2001 From: Pierre Fersing Date: Sat, 13 Jan 2024 18:19:36 +0100 Subject: [PATCH 4/4] Fix test that try to loop_stop() when not using loop_start() --- tests/test_client.py | 52 ++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/tests/test_client.py b/tests/test_client.py index 145f911c..6cb3c4e4 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -210,44 +210,40 @@ def on_disconnect(*args): mqttc.on_connect = on_connect mqttc.on_disconnect = on_disconnect + mqttc.connect("localhost", fake_broker.port) - try: - mqttc.connect("localhost", fake_broker.port) + fake_broker.start() - fake_broker.start() + # not yet connected, packet are not yet processed by loop() + assert not mqttc.is_connected() - # not yet connected, packet are not yet processed by loop() - assert not mqttc.is_connected() - - # connect packet is sent during connect() call - connect_packet = paho_test.gen_connect( - "test_with_loop", keepalive=60, - proto_ver=MQTTProtocolVersion.MQTTv311) - packet_in = fake_broker.receive_packet(1000) - assert packet_in # Check connection was not closed - assert packet_in == connect_packet + # connect packet is sent during connect() call + connect_packet = paho_test.gen_connect( + "test_with_loop", keepalive=60, + proto_ver=MQTTProtocolVersion.MQTTv311) + packet_in = fake_broker.receive_packet(1000) + assert packet_in # Check connection was not closed + assert packet_in == connect_packet - connack_packet = paho_test.gen_connack(rc=0) - count = fake_broker.send_packet(connack_packet) - assert count # Check connection was not closed - assert count == len(connack_packet) + connack_packet = paho_test.gen_connack(rc=0) + count = fake_broker.send_packet(connack_packet) + assert count # Check connection was not closed + assert count == len(connack_packet) - # call loop() to process the connack packet - assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_SUCCESS + # call loop() to process the connack packet + assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_SUCCESS - assert on_connect_reached.wait(1) - assert mqttc.is_connected() + assert on_connect_reached.wait(1) + assert mqttc.is_connected() - fake_broker.finish() + fake_broker.finish() - # call loop() to detect the connection lost - assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_CONN_LOST + # call loop() to detect the connection lost + assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_CONN_LOST - assert on_disconnect_reached.wait(1) - assert not mqttc.is_connected() + assert on_disconnect_reached.wait(1) + assert not mqttc.is_connected() - finally: - mqttc.loop_stop() class TestPublishBroker2Client: