-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add optional timeout to subscribe() #631
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -11,6 +11,7 @@ | |||||
# | ||||||
# Contributors: | ||||||
# Roger Light - initial API and implementation | ||||||
# Joachim Baumann - added timeout to subscribe.simple() | ||||||
|
||||||
""" | ||||||
This module provides some helper functions to allow straightforward subscribing | ||||||
|
@@ -22,6 +23,7 @@ | |||||
|
||||||
from .. import mqtt | ||||||
from . import client as paho | ||||||
from threading import Lock | ||||||
|
||||||
|
||||||
def _on_connect_v5(client, userdata, flags, rc, properties): | ||||||
|
@@ -35,17 +37,19 @@ def _on_connect_v5(client, userdata, flags, rc, properties): | |||||
else: | ||||||
client.subscribe(userdata['topics'], userdata['qos']) | ||||||
|
||||||
|
||||||
def _on_connect(client, userdata, flags, rc): | ||||||
"""Internal v5 callback""" | ||||||
_on_connect_v5(client, userdata, flags, rc, None) | ||||||
|
||||||
|
||||||
def _on_message_callback(client, userdata, message): | ||||||
"""Internal callback""" | ||||||
userdata['callback'](client, userdata['userdata'], message) | ||||||
userdata['callback'](client, userdata['userdata'], | ||||||
message, userdata['lock']) | ||||||
|
||||||
|
||||||
def _on_message_simple(client, userdata, message): | ||||||
def _on_message_simple(client, userdata, message, lock): | ||||||
"""Internal callback""" | ||||||
|
||||||
if userdata['msg_count'] == 0: | ||||||
|
@@ -60,22 +64,27 @@ def _on_message_simple(client, userdata, message): | |||||
if userdata['messages'] is None and userdata['msg_count'] == 0: | ||||||
userdata['messages'] = message | ||||||
client.disconnect() | ||||||
if lock: | ||||||
lock.release() | ||||||
return | ||||||
|
||||||
userdata['messages'].append(message) | ||||||
if userdata['msg_count'] == 0: | ||||||
client.disconnect() | ||||||
if lock: | ||||||
lock.release() | ||||||
|
||||||
|
||||||
def callback(callback, topics, qos=0, userdata=None, hostname="localhost", | ||||||
port=1883, client_id="", keepalive=60, will=None, auth=None, | ||||||
tls=None, protocol=paho.MQTTv311, transport="tcp", | ||||||
clean_session=True, proxy_args=None): | ||||||
clean_session=True, proxy_args=None, timeout=None): | ||||||
"""Subscribe to a list of topics and process them in a callback function. | ||||||
|
||||||
This function creates an MQTT client, connects to a broker and subscribes | ||||||
to a list of topics. Incoming messages are processed by the user provided | ||||||
callback. This is a blocking function and will never return. | ||||||
callback. This is a blocking function and will only return after the | ||||||
timeout. If no timeout is given, the function will never return. | ||||||
|
||||||
callback : function of the form "on_message(client, userdata, message)" for | ||||||
processing the messages received. | ||||||
|
@@ -132,16 +141,25 @@ def callback(callback, topics, qos=0, userdata=None, hostname="localhost", | |||||
Defaults to True. | ||||||
|
||||||
proxy_args: a dictionary that will be given to the client. | ||||||
|
||||||
timeout: the timeout value after which the client disconnects from the | ||||||
broker. If no timeout is given, the client disconnects only | ||||||
after "msg_count" messages have been received. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no msg_count in the subscribe.callback() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, I got this from the simple() function. I will remove this. |
||||||
""" | ||||||
|
||||||
if qos < 0 or qos > 2: | ||||||
raise ValueError('qos must be in the range 0-2') | ||||||
|
||||||
lock = None | ||||||
if timeout is not None: | ||||||
lock = Lock() | ||||||
|
||||||
callback_userdata = { | ||||||
'callback':callback, | ||||||
'topics':topics, | ||||||
'qos':qos, | ||||||
'userdata':userdata} | ||||||
'callback': callback, | ||||||
'topics': topics, | ||||||
'qos': qos, | ||||||
'lock': lock, | ||||||
'userdata': userdata} | ||||||
|
||||||
client = paho.Client(client_id=client_id, userdata=callback_userdata, | ||||||
protocol=protocol, transport=transport, | ||||||
|
@@ -180,18 +198,27 @@ def callback(callback, topics, qos=0, userdata=None, hostname="localhost", | |||||
client.tls_set_context(tls) | ||||||
|
||||||
client.connect(hostname, port, keepalive) | ||||||
client.loop_forever() | ||||||
|
||||||
if timeout == None: | ||||||
client.loop_forever() | ||||||
else: | ||||||
lock.acquire() | ||||||
client.loop_start() | ||||||
lock.acquire(timeout=timeout) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
And using a threading.Event() rather than threading.Lock(). An Event seems a better fit than an Lock There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is a one-on-one communication I believe that a Lock is actually modelling the relationship better. Do you have any additional use in mind e.g., an observer/debugger watching these events? Otherwise I would tend to see Lock as better. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel that an Event is even better at modelling this: the finishing processing is an event and we wait for that event. With the lock, we have two consecutive call to acquire within the same thread isn't usual way to work with a lock. |
||||||
client.loop_stop() | ||||||
client.disconnect() | ||||||
|
||||||
|
||||||
def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost", | ||||||
port=1883, client_id="", keepalive=60, will=None, auth=None, | ||||||
tls=None, protocol=paho.MQTTv311, transport="tcp", | ||||||
clean_session=True, proxy_args=None): | ||||||
clean_session=True, proxy_args=None, timeout=None): | ||||||
"""Subscribe to a list of topics and return msg_count messages. | ||||||
|
||||||
This function creates an MQTT client, connects to a broker and subscribes | ||||||
to a list of topics. Once "msg_count" messages have been received, it | ||||||
disconnects cleanly from the broker and returns the messages. | ||||||
to a list of topics. Once "msg_count" messages have been received or the | ||||||
timeout has been reached, it disconnects cleanly from the broker and | ||||||
returns the received messages. | ||||||
|
||||||
topics : either a string containing a single topic to subscribe to, or a | ||||||
list of topics to subscribe to. | ||||||
|
@@ -253,6 +280,10 @@ def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost", | |||||
Defaults to True. | ||||||
|
||||||
proxy_args: a dictionary that will be given to the client. | ||||||
|
||||||
timeout: the timeout value after which the client disconnects from the | ||||||
broker. If no timeout is given, the client disconnects only | ||||||
after "msg_count" messages have been received. | ||||||
""" | ||||||
|
||||||
if msg_count < 1: | ||||||
|
@@ -265,10 +296,11 @@ def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost", | |||||
else: | ||||||
messages = [] | ||||||
|
||||||
userdata = {'retained':retained, 'msg_count':msg_count, 'messages':messages} | ||||||
userdata = {'retained': retained, | ||||||
'msg_count': msg_count, 'messages': messages} | ||||||
|
||||||
callback(_on_message_simple, topics, qos, userdata, hostname, port, | ||||||
client_id, keepalive, will, auth, tls, protocol, transport, | ||||||
clean_session, proxy_args) | ||||||
clean_session, proxy_args, timeout) | ||||||
|
||||||
return userdata['messages'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change. Existing user of subscribe.callback will be required to change the signature of their callback function to accept an additional parameter "lock".
The idea I see to avoid any breaking change and still support this feature (while simple() still just call callback()) would be to add support for a special exception "_StopSubscriber" that the user callback could raise to disconnect. The callback "_on_message_simple" could then raise that exception, this would continue to make simple() a normal user of callback().
So here we could do:
I suggest to name it "_StopSubscriber" because I would like to kept the existence of this special exception undocumented/unsupported for now, because I think the true solution would be to call "client.disconnect()" instead of this exception. But doing the disconnect will not work because currently the received message is acknowledged after the on_message callback so disconnecting from inside the callback will cause the acknowledge to be dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I thought that since it is an internal function I could change the signature.
I could very simply determine whether userdata['lock'] is set, and if not call with the original signature instead. This would keep it compatible and would simply add the new functionality.
Using an exception here is not something I'd like to do. An exception always models something that is not part of the normal execution path, and in addition, is normally quite a bit slower. I think the approach with checking the value of 'lock' would be better. Or is there another motivation for using the exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not only an internal function or I did I missunderstood the code ? It's the callback passed to subscribe.callback() which is user provided.
In subscribe.simple() I agree it's internal, but we added timeout & the lock also on subscribe.callback().
I just realize we recently added a
user_data_get
on client. It's a bit broken in that use-case, since it will return the "internal" userdata instead of the user-userdata (userdata["userdata"]
). But that probably solve our issue. We can access touserdata["lock"]
by usingclient.user_data_get()["lock"]
. This avoid any change in callback signature.