diff --git a/src/mqtt/MQTT.cpp b/src/mqtt/MQTT.cpp index 1f7a067871..e405786801 100644 --- a/src/mqtt/MQTT.cpp +++ b/src/mqtt/MQTT.cpp @@ -24,6 +24,7 @@ #include #include #include +#include MQTT *mqtt; @@ -31,10 +32,6 @@ namespace { constexpr int reconnectMax = 5; -static MemoryDynamic staticMqttPool; - -Allocator &mqttPool = staticMqttPool; - // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets static uint8_t bytes[meshtastic_MqttClientProxyMessage_size + 30]; // 12 for channel name and 16 for nodeid @@ -528,39 +525,37 @@ void MQTT::publishNodeInfo() } void MQTT::publishQueuedMessages() { - if (!mqttQueue.isEmpty()) { - LOG_DEBUG("Publish enqueued MQTT message"); - meshtastic_ServiceEnvelope *env = mqttQueue.dequeuePtr(0); - size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env); - std::string topic; - if (env->packet->pki_encrypted) { - topic = cryptTopic + "PKI/" + owner.id; - } else { - topic = cryptTopic + env->channel_id + "/" + owner.id; - } - LOG_INFO("publish %s, %u bytes from queue", topic.c_str(), numBytes); + if (mqttQueue.isEmpty()) + return; - publish(topic.c_str(), bytes, numBytes, false); + LOG_DEBUG("Publish enqueued MQTT message"); + const std::unique_ptr entry(mqttQueue.dequeuePtr(0)); + LOG_INFO("publish %s, %u bytes from queue", entry->topic.c_str(), entry->envBytes.size()); + publish(entry->topic.c_str(), entry->envBytes.data(), entry->envBytes.size(), false); #if !defined(ARCH_NRF52) || \ defined(NRF52_USE_JSON) // JSON is not supported on nRF52, see issue #2804 ### Fixed by using ArduinoJson ### - if (moduleConfig.mqtt.json_enabled) { - // handle json topic - auto jsonString = MeshPacketSerializer::JsonSerialize(env->packet); - if (jsonString.length() != 0) { - std::string topicJson; - if (env->packet->pki_encrypted) { - topicJson = jsonTopic + "PKI/" + owner.id; - } else { - topicJson = jsonTopic + env->channel_id + "/" + owner.id; - } - LOG_INFO("JSON publish message to %s, %u bytes: %s", topicJson.c_str(), jsonString.length(), jsonString.c_str()); - publish(topicJson.c_str(), jsonString.c_str(), false); - } - } -#endif // ARCH_NRF52 NRF52_USE_JSON - mqttPool.release(env); + if (!moduleConfig.mqtt.json_enabled) + return; + + // handle json topic + const DecodedServiceEnvelope env(entry->envBytes.data(), entry->envBytes.size()); + if (!env.validDecode || env.packet == NULL || env.channel_id == NULL) + return; + + auto jsonString = MeshPacketSerializer::JsonSerialize(env.packet); + if (jsonString.length() == 0) + return; + + std::string topicJson; + if (env.packet->pki_encrypted) { + topicJson = jsonTopic + "PKI/" + owner.id; + } else { + topicJson = jsonTopic + env.channel_id + "/" + owner.id; } + LOG_INFO("JSON publish message to %s, %u bytes: %s", topicJson.c_str(), jsonString.length(), jsonString.c_str()); + publish(topicJson.c_str(), jsonString.c_str(), false); +#endif // ARCH_NRF52 NRF52_USE_JSON } void MQTT::onSend(const meshtastic_MeshPacket &mp_encrypted, const meshtastic_MeshPacket &mp_decoded, ChannelIndex chIndex) @@ -599,59 +594,56 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp_encrypted, const meshtastic_Me // Either encrypted packet (we couldn't decrypt) is marked as pki_encrypted, or we could decode the PKI encrypted packet bool isPKIEncrypted = mp_encrypted.pki_encrypted || mp_decoded.pki_encrypted; // If it was to a channel, check uplink enabled, else must be pki_encrypted - if ((ch.settings.uplink_enabled && !isPKIEncrypted) || isPKIEncrypted) { - const char *channelId = isPKIEncrypted ? "PKI" : channels.getGlobalId(chIndex); - - meshtastic_ServiceEnvelope *env = mqttPool.allocZeroed(); - env->channel_id = (char *)channelId; - env->gateway_id = owner.id; - - LOG_DEBUG("MQTT onSend - Publish "); - if (moduleConfig.mqtt.encryption_enabled) { - env->packet = (meshtastic_MeshPacket *)&mp_encrypted; - LOG_DEBUG("encrypted message"); - } else if (mp_decoded.which_payload_variant == meshtastic_MeshPacket_decoded_tag) { - env->packet = (meshtastic_MeshPacket *)&mp_decoded; - LOG_DEBUG("portnum %i message", env->packet->decoded.portnum); - } else { - LOG_DEBUG("nothing, pkt not decrypted"); - mqttPool.release(env); - return; // Don't upload a still-encrypted PKI packet if not encryption_enabled - } + if (!(ch.settings.uplink_enabled || isPKIEncrypted)) + return; + const char *channelId = isPKIEncrypted ? "PKI" : channels.getGlobalId(chIndex); + + LOG_DEBUG("MQTT onSend - Publish "); + const meshtastic_MeshPacket *p; + if (moduleConfig.mqtt.encryption_enabled) { + p = &mp_encrypted; + LOG_DEBUG("encrypted message"); + } else if (mp_decoded.which_payload_variant == meshtastic_MeshPacket_decoded_tag) { + p = &mp_decoded; + LOG_DEBUG("portnum %i message", mp_decoded.decoded.portnum); + } else { + LOG_DEBUG("nothing, pkt not decrypted"); + return; // Don't upload a still-encrypted PKI packet if not encryption_enabled + } - if (moduleConfig.mqtt.proxy_to_client_enabled || this->isConnectedDirectly()) { - size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env); - std::string topic = cryptTopic + channelId + "/" + owner.id; - LOG_DEBUG("MQTT Publish %s, %u bytes", topic.c_str(), numBytes); + const meshtastic_ServiceEnvelope env = { + .packet = const_cast(p), .channel_id = const_cast(channelId), .gateway_id = owner.id}; + size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, &env); + std::string topic = cryptTopic + channelId + "/" + owner.id; - publish(topic.c_str(), bytes, numBytes, false); + if (moduleConfig.mqtt.proxy_to_client_enabled || this->isConnectedDirectly()) { + LOG_DEBUG("MQTT Publish %s, %u bytes", topic.c_str(), numBytes); + publish(topic.c_str(), bytes, numBytes, false); #if !defined(ARCH_NRF52) || \ defined(NRF52_USE_JSON) // JSON is not supported on nRF52, see issue #2804 ### Fixed by using ArduinoJson ### - if (moduleConfig.mqtt.json_enabled) { - // handle json topic - auto jsonString = MeshPacketSerializer::JsonSerialize((meshtastic_MeshPacket *)&mp_decoded); - if (jsonString.length() != 0) { - std::string topicJson = jsonTopic + channelId + "/" + owner.id; - LOG_INFO("JSON publish message to %s, %u bytes: %s", topicJson.c_str(), jsonString.length(), - jsonString.c_str()); - publish(topicJson.c_str(), jsonString.c_str(), false); - } - } + if (!moduleConfig.mqtt.json_enabled) + return; + // handle json topic + auto jsonString = MeshPacketSerializer::JsonSerialize(&mp_decoded); + if (jsonString.length() == 0) + return; + std::string topicJson = jsonTopic + channelId + "/" + owner.id; + LOG_INFO("JSON publish message to %s, %u bytes: %s", topicJson.c_str(), jsonString.length(), jsonString.c_str()); + publish(topicJson.c_str(), jsonString.c_str(), false); #endif // ARCH_NRF52 NRF52_USE_JSON + } else { + LOG_INFO("MQTT not connected, queue packet"); + QueueEntry *entry; + if (mqttQueue.numFree() == 0) { + LOG_WARN("MQTT queue is full, discard oldest"); + entry = mqttQueue.dequeuePtr(0); } else { - LOG_INFO("MQTT not connected, queue packet"); - if (mqttQueue.numFree() == 0) { - LOG_WARN("MQTT queue is full, discard oldest"); - meshtastic_ServiceEnvelope *d = mqttQueue.dequeuePtr(0); - if (d) - mqttPool.release(d); - } - // make a copy of serviceEnvelope and queue it - meshtastic_ServiceEnvelope *copied = mqttPool.allocCopy(*env); - assert(mqttQueue.enqueue(copied, 0)); + entry = new QueueEntry; } - mqttPool.release(env); + entry->topic = std::move(topic); + entry->envBytes.assign(bytes, numBytes); + assert(mqttQueue.enqueue(entry, 0)); } } @@ -660,73 +652,70 @@ void MQTT::perhapsReportToMap() if (!moduleConfig.mqtt.map_reporting_enabled || !(moduleConfig.mqtt.proxy_to_client_enabled || isConnectedDirectly())) return; - if (Throttle::isWithinTimespanMs(last_report_to_map, map_publish_interval_msecs)) { + if (Throttle::isWithinTimespanMs(last_report_to_map, map_publish_interval_msecs)) return; - } else { - if (map_position_precision == 0 || (localPosition.latitude_i == 0 && localPosition.longitude_i == 0)) { - last_report_to_map = millis(); - if (map_position_precision == 0) - LOG_WARN("MQTT Map report enabled, but precision is 0"); - if (localPosition.latitude_i == 0 && localPosition.longitude_i == 0) - LOG_WARN("MQTT Map report enabled, but no position available"); - return; - } - // Allocate ServiceEnvelope and fill it - meshtastic_ServiceEnvelope *se = mqttPool.allocZeroed(); - se->channel_id = (char *)channels.getGlobalId(channels.getPrimaryIndex()); // Use primary channel as the channel_id - se->gateway_id = owner.id; - - // Allocate MeshPacket and fill it - meshtastic_MeshPacket *mp = packetPool.allocZeroed(); - mp->which_payload_variant = meshtastic_MeshPacket_decoded_tag; - mp->from = nodeDB->getNodeNum(); - mp->to = NODENUM_BROADCAST; - mp->decoded.portnum = meshtastic_PortNum_MAP_REPORT_APP; - - // Fill MapReport message - meshtastic_MapReport mapReport = meshtastic_MapReport_init_default; - memcpy(mapReport.long_name, owner.long_name, sizeof(owner.long_name)); - memcpy(mapReport.short_name, owner.short_name, sizeof(owner.short_name)); - mapReport.role = config.device.role; - mapReport.hw_model = owner.hw_model; - strncpy(mapReport.firmware_version, optstr(APP_VERSION), sizeof(mapReport.firmware_version)); - mapReport.region = config.lora.region; - mapReport.modem_preset = config.lora.modem_preset; - mapReport.has_default_channel = channels.hasDefaultChannel(); - - // Set position with precision (same as in PositionModule) - if (map_position_precision < 32 && map_position_precision > 0) { - mapReport.latitude_i = localPosition.latitude_i & (UINT32_MAX << (32 - map_position_precision)); - mapReport.longitude_i = localPosition.longitude_i & (UINT32_MAX << (32 - map_position_precision)); - mapReport.latitude_i += (1 << (31 - map_position_precision)); - mapReport.longitude_i += (1 << (31 - map_position_precision)); - } else { - mapReport.latitude_i = localPosition.latitude_i; - mapReport.longitude_i = localPosition.longitude_i; - } - mapReport.altitude = localPosition.altitude; - mapReport.position_precision = map_position_precision; + if (map_position_precision == 0 || (localPosition.latitude_i == 0 && localPosition.longitude_i == 0)) { + last_report_to_map = millis(); + if (map_position_precision == 0) + LOG_WARN("MQTT Map report enabled, but precision is 0"); + if (localPosition.latitude_i == 0 && localPosition.longitude_i == 0) + LOG_WARN("MQTT Map report enabled, but no position available"); + return; + } - mapReport.num_online_local_nodes = nodeDB->getNumOnlineMeshNodes(true); + // Allocate MeshPacket and fill it + meshtastic_MeshPacket *mp = packetPool.allocZeroed(); + mp->which_payload_variant = meshtastic_MeshPacket_decoded_tag; + mp->from = nodeDB->getNodeNum(); + mp->to = NODENUM_BROADCAST; + mp->decoded.portnum = meshtastic_PortNum_MAP_REPORT_APP; + + // Fill MapReport message + meshtastic_MapReport mapReport = meshtastic_MapReport_init_default; + memcpy(mapReport.long_name, owner.long_name, sizeof(owner.long_name)); + memcpy(mapReport.short_name, owner.short_name, sizeof(owner.short_name)); + mapReport.role = config.device.role; + mapReport.hw_model = owner.hw_model; + strncpy(mapReport.firmware_version, optstr(APP_VERSION), sizeof(mapReport.firmware_version)); + mapReport.region = config.lora.region; + mapReport.modem_preset = config.lora.modem_preset; + mapReport.has_default_channel = channels.hasDefaultChannel(); + + // Set position with precision (same as in PositionModule) + if (map_position_precision < 32 && map_position_precision > 0) { + mapReport.latitude_i = localPosition.latitude_i & (UINT32_MAX << (32 - map_position_precision)); + mapReport.longitude_i = localPosition.longitude_i & (UINT32_MAX << (32 - map_position_precision)); + mapReport.latitude_i += (1 << (31 - map_position_precision)); + mapReport.longitude_i += (1 << (31 - map_position_precision)); + } else { + mapReport.latitude_i = localPosition.latitude_i; + mapReport.longitude_i = localPosition.longitude_i; + } + mapReport.altitude = localPosition.altitude; + mapReport.position_precision = map_position_precision; - // Encode MapReport message and set it to MeshPacket in ServiceEnvelope - mp->decoded.payload.size = pb_encode_to_bytes(mp->decoded.payload.bytes, sizeof(mp->decoded.payload.bytes), - &meshtastic_MapReport_msg, &mapReport); - se->packet = mp; + mapReport.num_online_local_nodes = nodeDB->getNumOnlineMeshNodes(true); - size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, se); + // Encode MapReport message into the MeshPacket + mp->decoded.payload.size = + pb_encode_to_bytes(mp->decoded.payload.bytes, sizeof(mp->decoded.payload.bytes), &meshtastic_MapReport_msg, &mapReport); - LOG_INFO("MQTT Publish map report to %s", mapTopic.c_str()); - publish(mapTopic.c_str(), bytes, numBytes, false); + // Encode the MeshPacket into a binary ServiceEnvelope and publish + const meshtastic_ServiceEnvelope se = { + .packet = mp, + .channel_id = (char *)channels.getGlobalId(channels.getPrimaryIndex()), // Use primary channel as the channel_id + .gateway_id = owner.id}; + size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, &se); - // Release the allocated memory for ServiceEnvelope and MeshPacket - mqttPool.release(se); - packetPool.release(mp); + LOG_INFO("MQTT Publish map report to %s", mapTopic.c_str()); + publish(mapTopic.c_str(), bytes, numBytes, false); - // Update the last report time - last_report_to_map = millis(); - } + // Release the allocated memory for MeshPacket + packetPool.release(mp); + + // Update the last report time + last_report_to_map = millis(); } bool MQTT::isPrivateIpAddress(const char address[]) diff --git a/src/mqtt/MQTT.h b/src/mqtt/MQTT.h index dc82c1a747..9db54ea4be 100644 --- a/src/mqtt/MQTT.h +++ b/src/mqtt/MQTT.h @@ -78,7 +78,11 @@ class MQTT : private concurrency::OSThread void start() { setIntervalFromNow(0); }; protected: - PointerQueue mqttQueue; + struct QueueEntry { + std::string topic; + std::basic_string envBytes; // binary/pb_encode_to_bytes ServiceEnvelope + }; + PointerQueue mqttQueue; int reconnectCount = 0;