Skip to content

Commit

Permalink
Use encoded ServiceEnvelope in mqttQueue (#5619)
Browse files Browse the repository at this point in the history
  • Loading branch information
esev authored Dec 20, 2024
1 parent e1de439 commit 658459a
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 137 deletions.
261 changes: 125 additions & 136 deletions src/mqtt/MQTT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,14 @@
#include <Throttle.h>
#include <assert.h>
#include <pb_decode.h>
#include <utility>

MQTT *mqtt;

namespace
{
constexpr int reconnectMax = 5;

static MemoryDynamic<meshtastic_ServiceEnvelope> staticMqttPool;

Allocator<meshtastic_ServiceEnvelope> &mqttPool = staticMqttPool;

// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
static uint8_t bytes[meshtastic_MqttClientProxyMessage_size + 30]; // 12 for channel name and 16 for nodeid

Expand Down Expand Up @@ -528,39 +525,37 @@ void MQTT::publishNodeInfo()
}
void MQTT::publishQueuedMessages()
{
if (!mqttQueue.isEmpty()) {
LOG_DEBUG("Publish enqueued MQTT message");
meshtastic_ServiceEnvelope *env = mqttQueue.dequeuePtr(0);
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env);
std::string topic;
if (env->packet->pki_encrypted) {
topic = cryptTopic + "PKI/" + owner.id;
} else {
topic = cryptTopic + env->channel_id + "/" + owner.id;
}
LOG_INFO("publish %s, %u bytes from queue", topic.c_str(), numBytes);
if (mqttQueue.isEmpty())
return;

publish(topic.c_str(), bytes, numBytes, false);
LOG_DEBUG("Publish enqueued MQTT message");
const std::unique_ptr<QueueEntry> entry(mqttQueue.dequeuePtr(0));
LOG_INFO("publish %s, %u bytes from queue", entry->topic.c_str(), entry->envBytes.size());
publish(entry->topic.c_str(), entry->envBytes.data(), entry->envBytes.size(), false);

#if !defined(ARCH_NRF52) || \
defined(NRF52_USE_JSON) // JSON is not supported on nRF52, see issue #2804 ### Fixed by using ArduinoJson ###
if (moduleConfig.mqtt.json_enabled) {
// handle json topic
auto jsonString = MeshPacketSerializer::JsonSerialize(env->packet);
if (jsonString.length() != 0) {
std::string topicJson;
if (env->packet->pki_encrypted) {
topicJson = jsonTopic + "PKI/" + owner.id;
} else {
topicJson = jsonTopic + env->channel_id + "/" + owner.id;
}
LOG_INFO("JSON publish message to %s, %u bytes: %s", topicJson.c_str(), jsonString.length(), jsonString.c_str());
publish(topicJson.c_str(), jsonString.c_str(), false);
}
}
#endif // ARCH_NRF52 NRF52_USE_JSON
mqttPool.release(env);
if (!moduleConfig.mqtt.json_enabled)
return;

// handle json topic
const DecodedServiceEnvelope env(entry->envBytes.data(), entry->envBytes.size());
if (!env.validDecode || env.packet == NULL || env.channel_id == NULL)
return;

auto jsonString = MeshPacketSerializer::JsonSerialize(env.packet);
if (jsonString.length() == 0)
return;

std::string topicJson;
if (env.packet->pki_encrypted) {
topicJson = jsonTopic + "PKI/" + owner.id;
} else {
topicJson = jsonTopic + env.channel_id + "/" + owner.id;
}
LOG_INFO("JSON publish message to %s, %u bytes: %s", topicJson.c_str(), jsonString.length(), jsonString.c_str());
publish(topicJson.c_str(), jsonString.c_str(), false);
#endif // ARCH_NRF52 NRF52_USE_JSON
}

void MQTT::onSend(const meshtastic_MeshPacket &mp_encrypted, const meshtastic_MeshPacket &mp_decoded, ChannelIndex chIndex)
Expand Down Expand Up @@ -599,59 +594,56 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp_encrypted, const meshtastic_Me
// Either encrypted packet (we couldn't decrypt) is marked as pki_encrypted, or we could decode the PKI encrypted packet
bool isPKIEncrypted = mp_encrypted.pki_encrypted || mp_decoded.pki_encrypted;
// If it was to a channel, check uplink enabled, else must be pki_encrypted
if ((ch.settings.uplink_enabled && !isPKIEncrypted) || isPKIEncrypted) {
const char *channelId = isPKIEncrypted ? "PKI" : channels.getGlobalId(chIndex);

meshtastic_ServiceEnvelope *env = mqttPool.allocZeroed();
env->channel_id = (char *)channelId;
env->gateway_id = owner.id;

LOG_DEBUG("MQTT onSend - Publish ");
if (moduleConfig.mqtt.encryption_enabled) {
env->packet = (meshtastic_MeshPacket *)&mp_encrypted;
LOG_DEBUG("encrypted message");
} else if (mp_decoded.which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
env->packet = (meshtastic_MeshPacket *)&mp_decoded;
LOG_DEBUG("portnum %i message", env->packet->decoded.portnum);
} else {
LOG_DEBUG("nothing, pkt not decrypted");
mqttPool.release(env);
return; // Don't upload a still-encrypted PKI packet if not encryption_enabled
}
if (!(ch.settings.uplink_enabled || isPKIEncrypted))
return;
const char *channelId = isPKIEncrypted ? "PKI" : channels.getGlobalId(chIndex);

LOG_DEBUG("MQTT onSend - Publish ");
const meshtastic_MeshPacket *p;
if (moduleConfig.mqtt.encryption_enabled) {
p = &mp_encrypted;
LOG_DEBUG("encrypted message");
} else if (mp_decoded.which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
p = &mp_decoded;
LOG_DEBUG("portnum %i message", mp_decoded.decoded.portnum);
} else {
LOG_DEBUG("nothing, pkt not decrypted");
return; // Don't upload a still-encrypted PKI packet if not encryption_enabled
}

if (moduleConfig.mqtt.proxy_to_client_enabled || this->isConnectedDirectly()) {
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env);
std::string topic = cryptTopic + channelId + "/" + owner.id;
LOG_DEBUG("MQTT Publish %s, %u bytes", topic.c_str(), numBytes);
const meshtastic_ServiceEnvelope env = {
.packet = const_cast<meshtastic_MeshPacket *>(p), .channel_id = const_cast<char *>(channelId), .gateway_id = owner.id};
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, &env);
std::string topic = cryptTopic + channelId + "/" + owner.id;

publish(topic.c_str(), bytes, numBytes, false);
if (moduleConfig.mqtt.proxy_to_client_enabled || this->isConnectedDirectly()) {
LOG_DEBUG("MQTT Publish %s, %u bytes", topic.c_str(), numBytes);
publish(topic.c_str(), bytes, numBytes, false);

#if !defined(ARCH_NRF52) || \
defined(NRF52_USE_JSON) // JSON is not supported on nRF52, see issue #2804 ### Fixed by using ArduinoJson ###
if (moduleConfig.mqtt.json_enabled) {
// handle json topic
auto jsonString = MeshPacketSerializer::JsonSerialize((meshtastic_MeshPacket *)&mp_decoded);
if (jsonString.length() != 0) {
std::string topicJson = jsonTopic + channelId + "/" + owner.id;
LOG_INFO("JSON publish message to %s, %u bytes: %s", topicJson.c_str(), jsonString.length(),
jsonString.c_str());
publish(topicJson.c_str(), jsonString.c_str(), false);
}
}
if (!moduleConfig.mqtt.json_enabled)
return;
// handle json topic
auto jsonString = MeshPacketSerializer::JsonSerialize(&mp_decoded);
if (jsonString.length() == 0)
return;
std::string topicJson = jsonTopic + channelId + "/" + owner.id;
LOG_INFO("JSON publish message to %s, %u bytes: %s", topicJson.c_str(), jsonString.length(), jsonString.c_str());
publish(topicJson.c_str(), jsonString.c_str(), false);
#endif // ARCH_NRF52 NRF52_USE_JSON
} else {
LOG_INFO("MQTT not connected, queue packet");
QueueEntry *entry;
if (mqttQueue.numFree() == 0) {
LOG_WARN("MQTT queue is full, discard oldest");
entry = mqttQueue.dequeuePtr(0);
} else {
LOG_INFO("MQTT not connected, queue packet");
if (mqttQueue.numFree() == 0) {
LOG_WARN("MQTT queue is full, discard oldest");
meshtastic_ServiceEnvelope *d = mqttQueue.dequeuePtr(0);
if (d)
mqttPool.release(d);
}
// make a copy of serviceEnvelope and queue it
meshtastic_ServiceEnvelope *copied = mqttPool.allocCopy(*env);
assert(mqttQueue.enqueue(copied, 0));
entry = new QueueEntry;
}
mqttPool.release(env);
entry->topic = std::move(topic);
entry->envBytes.assign(bytes, numBytes);
assert(mqttQueue.enqueue(entry, 0));
}
}

Expand All @@ -660,73 +652,70 @@ void MQTT::perhapsReportToMap()
if (!moduleConfig.mqtt.map_reporting_enabled || !(moduleConfig.mqtt.proxy_to_client_enabled || isConnectedDirectly()))
return;

if (Throttle::isWithinTimespanMs(last_report_to_map, map_publish_interval_msecs)) {
if (Throttle::isWithinTimespanMs(last_report_to_map, map_publish_interval_msecs))
return;
} else {
if (map_position_precision == 0 || (localPosition.latitude_i == 0 && localPosition.longitude_i == 0)) {
last_report_to_map = millis();
if (map_position_precision == 0)
LOG_WARN("MQTT Map report enabled, but precision is 0");
if (localPosition.latitude_i == 0 && localPosition.longitude_i == 0)
LOG_WARN("MQTT Map report enabled, but no position available");
return;
}

// Allocate ServiceEnvelope and fill it
meshtastic_ServiceEnvelope *se = mqttPool.allocZeroed();
se->channel_id = (char *)channels.getGlobalId(channels.getPrimaryIndex()); // Use primary channel as the channel_id
se->gateway_id = owner.id;

// Allocate MeshPacket and fill it
meshtastic_MeshPacket *mp = packetPool.allocZeroed();
mp->which_payload_variant = meshtastic_MeshPacket_decoded_tag;
mp->from = nodeDB->getNodeNum();
mp->to = NODENUM_BROADCAST;
mp->decoded.portnum = meshtastic_PortNum_MAP_REPORT_APP;

// Fill MapReport message
meshtastic_MapReport mapReport = meshtastic_MapReport_init_default;
memcpy(mapReport.long_name, owner.long_name, sizeof(owner.long_name));
memcpy(mapReport.short_name, owner.short_name, sizeof(owner.short_name));
mapReport.role = config.device.role;
mapReport.hw_model = owner.hw_model;
strncpy(mapReport.firmware_version, optstr(APP_VERSION), sizeof(mapReport.firmware_version));
mapReport.region = config.lora.region;
mapReport.modem_preset = config.lora.modem_preset;
mapReport.has_default_channel = channels.hasDefaultChannel();

// Set position with precision (same as in PositionModule)
if (map_position_precision < 32 && map_position_precision > 0) {
mapReport.latitude_i = localPosition.latitude_i & (UINT32_MAX << (32 - map_position_precision));
mapReport.longitude_i = localPosition.longitude_i & (UINT32_MAX << (32 - map_position_precision));
mapReport.latitude_i += (1 << (31 - map_position_precision));
mapReport.longitude_i += (1 << (31 - map_position_precision));
} else {
mapReport.latitude_i = localPosition.latitude_i;
mapReport.longitude_i = localPosition.longitude_i;
}
mapReport.altitude = localPosition.altitude;
mapReport.position_precision = map_position_precision;
if (map_position_precision == 0 || (localPosition.latitude_i == 0 && localPosition.longitude_i == 0)) {
last_report_to_map = millis();
if (map_position_precision == 0)
LOG_WARN("MQTT Map report enabled, but precision is 0");
if (localPosition.latitude_i == 0 && localPosition.longitude_i == 0)
LOG_WARN("MQTT Map report enabled, but no position available");
return;
}

mapReport.num_online_local_nodes = nodeDB->getNumOnlineMeshNodes(true);
// Allocate MeshPacket and fill it
meshtastic_MeshPacket *mp = packetPool.allocZeroed();
mp->which_payload_variant = meshtastic_MeshPacket_decoded_tag;
mp->from = nodeDB->getNodeNum();
mp->to = NODENUM_BROADCAST;
mp->decoded.portnum = meshtastic_PortNum_MAP_REPORT_APP;

// Fill MapReport message
meshtastic_MapReport mapReport = meshtastic_MapReport_init_default;
memcpy(mapReport.long_name, owner.long_name, sizeof(owner.long_name));
memcpy(mapReport.short_name, owner.short_name, sizeof(owner.short_name));
mapReport.role = config.device.role;
mapReport.hw_model = owner.hw_model;
strncpy(mapReport.firmware_version, optstr(APP_VERSION), sizeof(mapReport.firmware_version));
mapReport.region = config.lora.region;
mapReport.modem_preset = config.lora.modem_preset;
mapReport.has_default_channel = channels.hasDefaultChannel();

// Set position with precision (same as in PositionModule)
if (map_position_precision < 32 && map_position_precision > 0) {
mapReport.latitude_i = localPosition.latitude_i & (UINT32_MAX << (32 - map_position_precision));
mapReport.longitude_i = localPosition.longitude_i & (UINT32_MAX << (32 - map_position_precision));
mapReport.latitude_i += (1 << (31 - map_position_precision));
mapReport.longitude_i += (1 << (31 - map_position_precision));
} else {
mapReport.latitude_i = localPosition.latitude_i;
mapReport.longitude_i = localPosition.longitude_i;
}
mapReport.altitude = localPosition.altitude;
mapReport.position_precision = map_position_precision;

// Encode MapReport message and set it to MeshPacket in ServiceEnvelope
mp->decoded.payload.size = pb_encode_to_bytes(mp->decoded.payload.bytes, sizeof(mp->decoded.payload.bytes),
&meshtastic_MapReport_msg, &mapReport);
se->packet = mp;
mapReport.num_online_local_nodes = nodeDB->getNumOnlineMeshNodes(true);

size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, se);
// Encode MapReport message into the MeshPacket
mp->decoded.payload.size =
pb_encode_to_bytes(mp->decoded.payload.bytes, sizeof(mp->decoded.payload.bytes), &meshtastic_MapReport_msg, &mapReport);

LOG_INFO("MQTT Publish map report to %s", mapTopic.c_str());
publish(mapTopic.c_str(), bytes, numBytes, false);
// Encode the MeshPacket into a binary ServiceEnvelope and publish
const meshtastic_ServiceEnvelope se = {
.packet = mp,
.channel_id = (char *)channels.getGlobalId(channels.getPrimaryIndex()), // Use primary channel as the channel_id
.gateway_id = owner.id};
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, &se);

// Release the allocated memory for ServiceEnvelope and MeshPacket
mqttPool.release(se);
packetPool.release(mp);
LOG_INFO("MQTT Publish map report to %s", mapTopic.c_str());
publish(mapTopic.c_str(), bytes, numBytes, false);

// Update the last report time
last_report_to_map = millis();
}
// Release the allocated memory for MeshPacket
packetPool.release(mp);

// Update the last report time
last_report_to_map = millis();
}

bool MQTT::isPrivateIpAddress(const char address[])
Expand Down
6 changes: 5 additions & 1 deletion src/mqtt/MQTT.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ class MQTT : private concurrency::OSThread
void start() { setIntervalFromNow(0); };

protected:
PointerQueue<meshtastic_ServiceEnvelope> mqttQueue;
struct QueueEntry {
std::string topic;
std::basic_string<uint8_t> envBytes; // binary/pb_encode_to_bytes ServiceEnvelope
};
PointerQueue<QueueEntry> mqttQueue;

int reconnectCount = 0;

Expand Down

0 comments on commit 658459a

Please sign in to comment.