Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix is_connected property when not using loop_forever #795

Merged
merged 4 commits into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ v2.0.0 - 2023-xx-xx
- Fix loading too weak TLS CA file but setting allowed ciphers before loading CA. Closes #676.
- Allow to manually ack QoS > 0 messages. Closes #753 & #348.
- Improve tests & linters. Modernize build (drop setup.py, use pyproject.toml)
- Fix is_connected property to correctly return False when connection is lost
and loop_start/loop_forever isn't used. Closes #525.



v1.6.1 - 2021-10-21
===================
Expand Down
13 changes: 13 additions & 0 deletions src/paho/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,10 +1290,19 @@ def _loop(self, timeout: float = 1.0) -> MQTTErrorCode:
socklist = select.select(rlist, wlist, [], timeout)
except TypeError:
# Socket isn't correct type, in likelihood connection is lost
# ... or we called disconnect(). In that case the socket will
# be closed but some loop (like loop_forever) will continue to
# call _loop(). We still want to break that loop by returning an
# rc != MQTT_ERR_SUCCESS and we don't want state to change from
# mqtt_cs_disconnecting.
if self._state != ConnectionState.MQTT_CS_DISCONNECTING:
self._state = ConnectionState.MQTT_CS_CONNECTION_LOST
return MQTTErrorCode.MQTT_ERR_CONN_LOST
except ValueError:
# Can occur if we just reconnected but rlist/wlist contain a -1 for
# some reason.
if self._state != ConnectionState.MQTT_CS_DISCONNECTING:
self._state = ConnectionState.MQTT_CS_CONNECTION_LOST
return MQTTErrorCode.MQTT_ERR_CONN_LOST
except Exception:
# Note that KeyboardInterrupt, etc. can still terminate since they
Expand Down Expand Up @@ -1768,6 +1777,7 @@ def loop_misc(self) -> MQTTErrorCode:
if self._state == mqtt_cs_disconnecting:
rc = MQTTErrorCode.MQTT_ERR_SUCCESS
else:
self._state = ConnectionState.MQTT_CS_CONNECTION_LOST
rc = MQTTErrorCode.MQTT_ERR_KEEPALIVE

self._do_on_disconnect(rc)
Expand Down Expand Up @@ -2580,6 +2590,9 @@ def _loop_rc_handle(

self._do_on_disconnect(rc, properties)

if rc == MQTT_ERR_CONN_LOST:
self._state = ConnectionState.MQTT_CS_CONNECTION_LOST

return rc

def _packet_read(self) -> MQTTErrorCode:
Expand Down
1 change: 1 addition & 0 deletions src/paho/mqtt/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class ConnectionState(enum.IntEnum):
MQTT_CS_CONNECTED = 1
MQTT_CS_DISCONNECTING = 2
MQTT_CS_CONNECT_ASYNC = 3
MQTT_CS_CONNECTION_LOST = 4


class MessageState(enum.IntEnum):
Expand Down
116 changes: 112 additions & 4 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import threading
import time
import unicodedata

import paho.mqtt.client as client
import pytest
from paho.mqtt.enums import MQTTErrorCode, MQTTProtocolVersion
from paho.mqtt.reasoncodes import ReasonCodes

import tests.paho_test as paho_test

# Import test fixture
from tests.testsupport.broker import fake_broker # noqa: F401
from tests.testsupport.broker import FakeBroker, fake_broker # noqa: F401


@pytest.mark.parametrize("proto_ver", [
(client.MQTTv31),
(client.MQTTv311),
(MQTTProtocolVersion.MQTTv31),
(MQTTProtocolVersion.MQTTv311),
])
class Test_connect:
"""
Expand Down Expand Up @@ -100,7 +102,7 @@ class Test_connect_v5:

def test_01_broker_no_support(self, fake_broker):
mqttc = client.Client(
"01-broker-no-support", protocol=client.MQTTv5)
"01-broker-no-support", protocol=MQTTProtocolVersion.MQTTv5)

def on_connect(mqttc, obj, flags, reason, properties):
assert reason == 132
Expand Down Expand Up @@ -137,6 +139,112 @@ def on_connect(mqttc, obj, flags, reason, properties):
mqttc.loop_stop()


class TestConnectionLost:
def test_with_loop_start(self, fake_broker: FakeBroker):
mqttc = client.Client(
"test_with_loop_start",
protocol=MQTTProtocolVersion.MQTTv311,
reconnect_on_failure=False,
)

on_connect_reached = threading.Event()
on_disconnect_reached = threading.Event()


def on_connect(mqttc, obj, flags, rc):
assert rc == 0
on_connect_reached.set()

def on_disconnect(*args):
on_disconnect_reached.set()

mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect

mqttc.connect_async("localhost", fake_broker.port)
mqttc.loop_start()

try:
fake_broker.start()

connect_packet = paho_test.gen_connect(
"test_with_loop_start", keepalive=60,
proto_ver=MQTTProtocolVersion.MQTTv311)
packet_in = fake_broker.receive_packet(1000)
assert packet_in # Check connection was not closed
assert packet_in == connect_packet

connack_packet = paho_test.gen_connack(rc=0)
count = fake_broker.send_packet(connack_packet)
assert count # Check connection was not closed
assert count == len(connack_packet)

assert on_connect_reached.wait(1)
assert mqttc.is_connected()

fake_broker.finish()

assert on_disconnect_reached.wait(1)
assert not mqttc.is_connected()

finally:
mqttc.loop_stop()

def test_with_loop(self, fake_broker: FakeBroker):
mqttc = client.Client(
"test_with_loop",
clean_session=True,
)

on_connect_reached = threading.Event()
on_disconnect_reached = threading.Event()


def on_connect(mqttc, obj, flags, rc):
assert rc == 0
on_connect_reached.set()

def on_disconnect(*args):
on_disconnect_reached.set()

mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect

mqttc.connect("localhost", fake_broker.port)

fake_broker.start()

# not yet connected, packet are not yet processed by loop()
assert not mqttc.is_connected()

# connect packet is sent during connect() call
connect_packet = paho_test.gen_connect(
"test_with_loop", keepalive=60,
proto_ver=MQTTProtocolVersion.MQTTv311)
packet_in = fake_broker.receive_packet(1000)
assert packet_in # Check connection was not closed
assert packet_in == connect_packet

connack_packet = paho_test.gen_connack(rc=0)
count = fake_broker.send_packet(connack_packet)
assert count # Check connection was not closed
assert count == len(connack_packet)

# call loop() to process the connack packet
assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_SUCCESS

assert on_connect_reached.wait(1)
assert mqttc.is_connected()

fake_broker.finish()

# call loop() to detect the connection lost
assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_CONN_LOST

assert on_disconnect_reached.wait(1)
assert not mqttc.is_connected()


class TestPublishBroker2Client:

def test_invalid_utf8_topic(self, fake_broker):
Expand Down
Loading