-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to recognize "In state connection received CLIENT ALERT" #853
Comments
If the broker is dropping the connection then the I would expect the |
We could not observe this issue for some time now. Perhaps it is/was a problem with the EQMX browser. I close the issue due to the fact that I can not provide more information at the moment. If we observe it again I will check the on_disconnect() callback (which for sure is implemented ;-) ) and perhaps reopen the issue. Thanks for supporting! |
As normal, something happens right after you closed an issue. Today we get a new error message with tls_alert:
Slightly different as you mention @MattBrittan the |
Thanks @buhln,
With the info provided to date I'm unsure if there is an issue in this library (it's not attempting to reconnect), an exception is being thrown (caught by your code but loop not restarted), or if it's an issue on the EMQX side (and the client is attempting to reconnect but the server is not responding/functioning). Are you able to provide some code showing how you initialise the library (connection options etc) and which callbacks are in place? Full logs would also be very helpful (but I understand they can be difficult to produce in a production environment with an issue that occurs so infrequently). Even just enabling error logging in the client might produce some useful info. Unfortunately issues like this can be difficult to trace. The more info you can provide, the more likely it is that someone will be able to find the cause. |
@MattBrittan thank you for you fast reply! Here the code of my mqtt class which is controlling the MQTT connection. The callbacks were created dynamically by using the function from paho.mqtt import client as mqtt_client
import random
import os
class mqttClient:
def __init__(self):
self.BROKER = os.getenv('MQTT_Server')
self.PORT = int(os.getenv('MQTT_Port'))
self.USERNAME = os.getenv('MQTT_User')
self.PASSWORD = os.getenv('MQTT_Password')
self.CLIENT_ID = os.getenv('MachineNo') + f'-{random.randint(0, 1000)}'
self.client = self.connect_mqtt()
self._reconnect_flag = False
self._topics = []
def on_connect(self, client, userdata, flags, reason_code, properties):
if reason_code == 0 and self.client.is_connected():
# Resubscribe to all topics
if self._reconnect_flag:
logPes.info("Reconnected to MQTT Broker")
logPes.debug("Resubscribe on all topics")
for topic in self._topics:
self.subscribe(topic[0], topic[1], topic[2])
else:
logPes.info("Connected to MQTT Broker")
if not self._reconnect_flag: self._reconnect_flag = True
else:
logPes.error(f'Failed to connect, reason code {reason_code}')
def on_disconnect(self, client, userdata, flags, reason_code, properties):
logPes.warning(f'MQTT Broker disconnected, reason code {reason_code}')
# This is legacy. Reason_code 16 does not occur in the current version of the Paho MQTT library
if reason_code == 16:
logPes.error(f'Fatal disconnect, kill application')
exit('Fatal MQTT disconnect: reason_code=16') # Docker will restart application
def connect_mqtt(self):
logPes.info('Connecting to MQTT broker ' + self.BROKER + ':' + str(self.PORT))
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2, self.CLIENT_ID)
client.tls_set(ca_certs='./ca.pem')
client.username_pw_set(self.USERNAME, self.PASSWORD)
client.connect(self.BROKER, self.PORT, keepalive=45)
client.on_connect = self.on_connect
client.on_disconnect = self.on_disconnect
return client
def subscribe(self, topic, qos, callback):
"""Subscribe on topics"""
self.client.subscribe(topic, qos)
self.client.message_callback_add(topic, callback)
if [topic, qos, callback] not in self._topics:
self._topics.append([topic, qos, callback])
logPes.debug(f"Subscribed on `{topic}` topic")
##-> END MQTT code
def main():
# Connect to MQTT broker
try:
mqtt = mqttClient()
except Exception as error:
logging.error("An error in MQTT connection occurred: {}".format(error))
exit("An error in MQTT connection occurred: {}".format(error))
#
# Some instances of classes were created here which are using the mqtt client...
# [...]
#
try:
# Start infinit loop to run mqtt client
mqtt.client.loop_forever()
except Exception as error:
logging.error("An error in MQTT connection occurred: {}".format(error))
mqtt.client.disconnect()
except KeyboardInterrupt:
logging.debug("Programm stopped by user")
mqtt.client.disconnect()
if __name__ == "__main__":
main() After reconnecting, the ( Due to the fact that in all error messages the tag But if you have some ideas and tips to improve the robustness of my code I am looking forward to read it! :-) Thanks! |
This is a tricky one, I can't see anything obviously wrong with your code and am still not clear if the issue is with this library or EMQX.
That would be correct in so far as In fact I'm not even sure if the above is fully correct, it's possible that the error logged by EMQX relates to a different connection and whatever happens when the error is processed leads to some EMQX subsystems failing which results in the loss of subscriptions on the server side (this is very much guesswork!). In order to determine if this is an issue with the paho client I think we are going to need debug logs (unless someone else raises a similar issue), I understand that these would be difficut to get given how infrequently this occurs! Anothe option would be to add some form of watchdog in your app (i.e. if no message received in a minute then exit so Docker will restart the app OR call SUBSCRIBE again). This is not ideal but might be the pragmatic solution. |
After updating to EMQX to v5.7.2 this issue looks like it is gone away. I will monitor it with loglevel DEBUG of the paho client. |
It looks like that the client disconnects now but is able to reconnect again:
I think the interessting line is:
Here the corresponding line in the log of the broker (EMQX v5.7.2):
The problem looks like to be located on the client side. Whats quiet weird is that we run the application for other machines in the same Docker Swarm (same image) without observing this issue. Any ideas? |
Issue #691 looks similar, but this is quite old and minimal information was provided. Had a quick look and found this issue that looks similar but was only replicable when using I'll remove the more_info_needed tag as I believe there is now enough info for this to be investigated further, however I suspect it's going to be difficult to resolve without a reproducable example (as you say this is only happening on one of your nodes. Will need someone with more knowledge of Python networking code to step in here! |
I checkted the same logs again and perhaps the following infos can be helpfull. I recognized, that just a blink before the issue occours a big message was send (message id 84). We call this kind of file Operation Instruction and this is part of the functionality of the system. The message is 1456152 bytes in size and contains a JSON object with metadata and a PDF encoded as a base64 string. The broker accepts message sizes up to 30MB. This message is send as QoS1 but the PUBACK message for this big message is never recieved. Perhaps this incoming PUBACK message causes the issue? The debug log of the broker shows now problems here.
Theoreticaly I could change the outgoing big message to QoS 0 and wait for the issue again, but perhaps we are able to fix the issue in a more clean way. |
Are you able to test sending the large message over the same link with another tool (e.g. |
Now, the real tricky part starts... As discuessed the issue happens only on one node and not all the time. On other nodes we are transmitting even bigger PDFs up to 15MB without problems. The PDF is send to a frontend in REACT. I assume that the operator, who is using this frontend on this node has the function of the frontend all the time open which requesting automatically the PDF file from the backend when an other kind of message is recieved from the backend by the frontend ( So, the only difference I see at the moment between the affected node and all the other nodes without the issue is the load on the mqtt loop just right in this moment. The load in number of messages is not really higher than on the other nodes in this moment of the functionality of the backend. The difference is just, that one additional big messages comes on top. My understanding is, that the client forwards messages to the loop and these arescheduled by the loop. So, load on the loop should not be the problem in general? |
This morning we had a variation of the error which leads to a disconnect to the broker. The paho lib didn't recognized the discconect and therefore it didn't reconnect. The loop didn't crash.
Again, it was in the context of sending the big message 223 (1698928 bytes). I now reduced QoS for this message from 1 to 0. Again it looks like that the PUBACK msg of this big messages was the first not recieved one. In this issue #637 (comment) it is mention it could be a kind of "race of condition" on the socket. Espacially because the timing of the order the mqtt messages are sent could be influenced by the operators behaviour on the frontend side. This could be the reason why we have this issue on one node only. Perhaps I have to mention that our application is also publishing with the |
Today we had this following error message and a disconnect from the broker. The client did not reconnect to the broker.
Some ideas how to catch this error and call the connect callback? My code is available in one of the earlier posts. |
@MattBrittan I was able to create a codesnipped to reproduce the issue reliable on my dev system. The example considers some of the "features" of our program like multithreading, subscriptions and big files. Python version: 3.12.1 import logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG)
from paho.mqtt import client as mqtt_client
import paho.mqtt.publish as MQTTpublish
import random
import os
import base64
import threading
from dotenv import load_dotenv
load_dotenv()
from time import sleep
logPes = logging.getLogger('PES')
logMqtt = logging.getLogger('paho.mqtt.client')
logMqtt.setLevel(level=logging.DEBUG)
def read_and_encode_file(filename):
with open(filename, 'rb') as f:
file_content = f.read()
encoded_content = base64.b64encode(file_content)
return encoded_content.decode('utf-8')
class mqttClient:
def __init__(self):
self.BROKER = os.getenv('MQTT_Server')
self.PORT = int(os.getenv('MQTT_Port'))
self.USERNAME = os.getenv('MQTT_User')
self.PASSWORD = os.getenv('MQTT_Password')
self.CLIENT_ID = 'Testmachine_' + f'-{random.randint(0, 1000)}'
self.client = self.connect_mqtt()
self._reconnect_flag = False
self._topics = []
def on_connect(self, client, userdata, flags, reason_code, properties):
if reason_code == 0 and self.client.is_connected():
# Resubscribe to all topics
if self._reconnect_flag:
logPes.warning("PES reconnected to MQTT Broker")
logPes.debug("Resubscribe on all topics")
for topic in self._topics:
self.subscribe(topic[0], topic[1], topic[2])
else:
logPes.info("PES connected to MQTT Broker")
if not self._reconnect_flag: self._reconnect_flag = True
else:
logPes.error(f'PES failed to connect, reason code {reason_code}')
def on_disconnect(self, client, userdata, flags, reason_code, properties):
logPes.warning(f'MQTT Broker disconnected, reason code {reason_code}')
if reason_code == 16:
logPes.error(f'Fatal disconnect, kill application')
exit('Fatal MQTT disconnect: reason_code=16')
def connect_mqtt(self):
logPes.info('PES is connecting to MQTT broker ' + self.BROKER + ':' + str(self.PORT))
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2, self.CLIENT_ID)
client.enable_logger(logMqtt)
client.tls_set(ca_certs='./ca.pem')
client.username_pw_set(self.USERNAME, self.PASSWORD)
client.connect(self.BROKER, self.PORT, keepalive=45)
client.on_connect = self.on_connect
client.on_disconnect = self.on_disconnect
return client
def subscribe(self, topic, qos, callback):
"""Subscribe on topics"""
self.client.subscribe(topic, qos)
self.client.message_callback_add(topic, callback)
if [topic, qos, callback] not in self._topics:
self._topics.append([topic, qos, callback])
logPes.debug(f"Subscribed on `{topic}` topic")
##-> END MQTT code
class mqtt_load:
def __init__(self, mqtt):
self.mqtt = mqtt
self.data = self.base64_string(10)
t1 = threading.Thread(target=self.publish_message, args=(self.mqtt, self.data))
t1.start()
# Create big base64 string
def base64_string(self, size_in_mb):
size_in_bytes = size_in_mb * 1024 * 1024
byte_array = bytearray(b'1' * size_in_bytes)
encoded_string = base64.b64encode(byte_array).decode('utf-8')
return encoded_string
# Function to publish messages
def publish_message(self, mqtt, data):
sleep(1)
for n in range(1,3): # -> set range to (1, 2) for no ssl error
mqtt.client.publish('test/test', payload='Test' + str(n), qos=0, retain=False)
mqtt.client.publish('test/data', payload=data, qos=2, retain=False)
def incoming_msg(client, userdata, msg):
logPes.debug(f"Received message on topic `{msg.topic}`")
def main():
# Connect to MQTT broker
try:
mqtt = mqttClient()
# Subscribe on topics
topics=[
['test/test', 2, incoming_msg],
['test/data', 2, incoming_msg]
]
for topic in topics:
mqtt.subscribe(topic[0], topic[1], topic[2])
except Exception as error:
logging.error("An error in MQTT connection occurred: {}".format(error))
exit("An error in MQTT connection occurred: {}".format(error))
mqtt_load(mqtt)
try:
# Start infinit loop to run mqtt client
mqtt.client.loop_forever()
except Exception as error:
logging.error("An error in MQTT connection occurred: {}".format(error))
mqtt.client.disconnect()
except KeyboardInterrupt:
logging.debug("Programm stopped by user")
mqtt.client.disconnect()
if __name__ == "__main__":
main() And here the log messages with the ssl bad length error:
It looks to be connected to the thread which publishes the messages. If you reduce the number of messages in the for-loop to one loop, the error disapears: This creates the error: # Function to publish messages
def publish_message(self, mqtt, data):
sleep(1)
for n in range(1,3):
mqtt.client.publish('test/test', payload='Test' + str(n), qos=0, retain=False)
mqtt.client.publish('test/data', payload=data, qos=2, retain=False) no error: # Function to publish messages
def publish_message(self, mqtt, data):
sleep(1)
for n in range(1,2):
mqtt.client.publish('test/test', payload='Test' + str(n), qos=0, retain=False)
mqtt.client.publish('test/data', payload=data, qos=2, retain=False) Would be great if you can test the code multiple times to check if you get the ssl error, too. |
Sorry @buhln I'm flat out with with other work currently so have not had time to look at this (or a PR's on the projects I'm leading). Will try to find time but it may be a while - sorry. |
No problem. I think I found a workaround for the problem. I am sending the big files with the |
I suspect there is a data race somewhere that's causing this, having the proof of concept should make it a lot easier to find so thanks for that (still have not had a chance to actually run it on my setup yet). |
Perhaps it is a good idea to rename the issue? |
I observed the following issue: The communication between the client and server stopped without a disconnect on the client side. The broker log shows the follwing line:
[MQTT] socket_error: {tls_alert,{internal_error,"TLS server: In state connection received CLIENT ALERT: Fatal - Internal Error\n"}}
Due to the fact that we are using a quiet outdated emqx broker I do not want to call it a bug but I can not easily update the broker by myself due to the fact that it is part of a bigger infrastructure. So I have to look for a workaround, like reconnecting my client by force.
Do you have an idea how to recognize this situation from the client side?
Thanks in advance!
Reproduction
Not reproduceable
Environment
The text was updated successfully, but these errors were encountered: