From 8f6a2836b8ad5f8ba63a257586e888beab8297fb Mon Sep 17 00:00:00 2001 From: GUVWAF <78759985+GUVWAF@users.noreply.github.com> Date: Sat, 20 Jan 2024 21:22:09 +0100 Subject: [PATCH] Mark packets received via MQTT and add option to ignore them (#3117) * Mark packets received via MQTT and add option to ignore them * Don't send packets received via MQTT back into MQTT Generate implicit ACK for packets we as an MQTT gateway sent --------- Co-authored-by: Ben Meadors --- src/mesh/NodeDB.cpp | 1 + src/mesh/RadioInterface.cpp | 15 +++++++-------- src/mesh/RadioInterface.h | 3 ++- src/mesh/RadioLibInterface.cpp | 3 ++- src/mesh/Router.cpp | 6 +++--- src/modules/TraceRouteModule.cpp | 2 +- src/mqtt/MQTT.cpp | 11 +++++++++++ 7 files changed, 27 insertions(+), 14 deletions(-) diff --git a/src/mesh/NodeDB.cpp b/src/mesh/NodeDB.cpp index 19ba7eb8b5..5649bfd7a5 100644 --- a/src/mesh/NodeDB.cpp +++ b/src/mesh/NodeDB.cpp @@ -173,6 +173,7 @@ void NodeDB::installDefaultConfig() config.lora.region = meshtastic_Config_LoRaConfig_RegionCode_UNSET; config.lora.modem_preset = meshtastic_Config_LoRaConfig_ModemPreset_LONG_FAST; config.lora.hop_limit = HOP_RELIABLE; + config.lora.ignore_mqtt = false; #ifdef PIN_GPS_EN config.position.gps_en_gpio = PIN_GPS_EN; #endif diff --git a/src/mesh/RadioInterface.cpp b/src/mesh/RadioInterface.cpp index 91bd93bc5f..fe39f9b554 100644 --- a/src/mesh/RadioInterface.cpp +++ b/src/mesh/RadioInterface.cpp @@ -287,15 +287,14 @@ void printPacket(const char *prefix, const meshtastic_MeshPacket *p) out += " encrypted"; } - if (p->rx_time != 0) { + if (p->rx_time != 0) out += DEBUG_PORT.mt_sprintf(" rxtime=%u", p->rx_time); - } - if (p->rx_snr != 0.0) { + if (p->rx_snr != 0.0) out += DEBUG_PORT.mt_sprintf(" rxSNR=%g", p->rx_snr); - } - if (p->rx_rssi != 0) { + if (p->rx_rssi != 0) out += DEBUG_PORT.mt_sprintf(" rxRSSI=%i", p->rx_rssi); - } + if (p->via_mqtt != 0) + out += DEBUG_PORT.mt_sprintf(" via MQTT"); if (p->priority != 0) out += DEBUG_PORT.mt_sprintf(" priority=%d", p->priority); @@ -554,7 +553,7 @@ size_t RadioInterface::beginSending(meshtastic_MeshPacket *p) LOG_WARN("hop limit %d is too high, setting to %d\n", p->hop_limit, HOP_RELIABLE); p->hop_limit = HOP_RELIABLE; } - h->flags = p->hop_limit | (p->want_ack ? PACKET_FLAGS_WANT_ACK_MASK : 0); + h->flags = p->hop_limit | (p->want_ack ? PACKET_FLAGS_WANT_ACK_MASK : 0) | (p->via_mqtt ? PACKET_FLAGS_VIA_MQTT_MASK : 0); // if the sender nodenum is zero, that means uninitialized assert(h->from); @@ -563,4 +562,4 @@ size_t RadioInterface::beginSending(meshtastic_MeshPacket *p) sendingPacket = p; return p->encrypted.size + sizeof(PacketHeader); -} \ No newline at end of file +} diff --git a/src/mesh/RadioInterface.h b/src/mesh/RadioInterface.h index 85ce116dce..83c5dae645 100644 --- a/src/mesh/RadioInterface.h +++ b/src/mesh/RadioInterface.h @@ -12,6 +12,7 @@ #define PACKET_FLAGS_HOP_MASK 0x07 #define PACKET_FLAGS_WANT_ACK_MASK 0x08 +#define PACKET_FLAGS_VIA_MQTT_MASK 0x10 /** * This structure has to exactly match the wire layout when sent over the radio link. Used to keep compatibility @@ -223,4 +224,4 @@ class RadioInterface }; /// Debug printing for packets -void printPacket(const char *prefix, const meshtastic_MeshPacket *p); \ No newline at end of file +void printPacket(const char *prefix, const meshtastic_MeshPacket *p); diff --git a/src/mesh/RadioLibInterface.cpp b/src/mesh/RadioLibInterface.cpp index 4f0c52e678..8a2bc53e5a 100644 --- a/src/mesh/RadioLibInterface.cpp +++ b/src/mesh/RadioLibInterface.cpp @@ -362,6 +362,7 @@ void RadioLibInterface::handleReceiveInterrupt() assert(HOP_MAX <= PACKET_FLAGS_HOP_MASK); // If hopmax changes, carefully check this code mp->hop_limit = h->flags & PACKET_FLAGS_HOP_MASK; mp->want_ack = !!(h->flags & PACKET_FLAGS_WANT_ACK_MASK); + mp->via_mqtt = !!(h->flags & PACKET_FLAGS_VIA_MQTT_MASK); addReceiveMetadata(mp); @@ -406,4 +407,4 @@ void RadioLibInterface::startSend(meshtastic_MeshPacket *txp) // bits enableInterrupt(isrTxLevel0); } -} \ No newline at end of file +} diff --git a/src/mesh/Router.cpp b/src/mesh/Router.cpp index ff657fd11f..977a1215a1 100644 --- a/src/mesh/Router.cpp +++ b/src/mesh/Router.cpp @@ -467,10 +467,10 @@ void Router::handleReceived(meshtastic_MeshPacket *p, RxSource src) void Router::perhapsHandleReceived(meshtastic_MeshPacket *p) { // assert(radioConfig.has_preferences); - bool ignore = is_in_repeated(config.lora.ignore_incoming, p->from); + bool ignore = is_in_repeated(config.lora.ignore_incoming, p->from) || (config.lora.ignore_mqtt && p->via_mqtt); if (ignore) { - LOG_DEBUG("Ignoring incoming message, 0x%x is in our ignore list\n", p->from); + LOG_DEBUG("Ignoring incoming message, 0x%x is in our ignore list or came via MQTT\n", p->from); } else if (ignore |= shouldFilterReceived(p)) { LOG_DEBUG("Incoming message was filtered 0x%x\n", p->from); } @@ -481,4 +481,4 @@ void Router::perhapsHandleReceived(meshtastic_MeshPacket *p) handleReceived(p); packetPool.release(p); -} \ No newline at end of file +} diff --git a/src/modules/TraceRouteModule.cpp b/src/modules/TraceRouteModule.cpp index bf7eaa0cd8..311e211f3c 100644 --- a/src/modules/TraceRouteModule.cpp +++ b/src/modules/TraceRouteModule.cpp @@ -85,4 +85,4 @@ TraceRouteModule::TraceRouteModule() : ProtobufModule("traceroute", meshtastic_PortNum_TRACEROUTE_APP, &meshtastic_RouteDiscovery_msg) { ourPortNum = meshtastic_PortNum_TRACEROUTE_APP; -} +} \ No newline at end of file diff --git a/src/mqtt/MQTT.cpp b/src/mqtt/MQTT.cpp index ccde031474..c91bdef29d 100644 --- a/src/mqtt/MQTT.cpp +++ b/src/mqtt/MQTT.cpp @@ -7,6 +7,7 @@ #include "mesh/Router.h" #include "mesh/generated/meshtastic/mqtt.pb.h" #include "mesh/generated/meshtastic/telemetry.pb.h" +#include "modules/RoutingModule.h" #if defined(ARCH_ESP32) #include "../mesh/generated/meshtastic/paxcount.pb.h" #endif @@ -142,6 +143,7 @@ void MQTT::onReceive(char *topic, byte *payload, size_t length) if (strcmp(e.channel_id, channels.getGlobalId(ch.index)) == 0 && e.packet && ch.settings.downlink_enabled) { LOG_INFO("Received MQTT topic %s, len=%u\n", topic, length); meshtastic_MeshPacket *p = packetPool.allocCopy(*e.packet); + p->via_mqtt = true; // Mark that the packet was received via MQTT if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) { p->channel = ch.index; @@ -463,6 +465,9 @@ void MQTT::publishQueuedMessages() void MQTT::onSend(const meshtastic_MeshPacket &mp, ChannelIndex chIndex) { + if (mp.via_mqtt) + return; // Don't send messages that came from MQTT back into MQTT + auto &ch = channels.getByIndex(chIndex); if (&mp.decoded && strcmp(moduleConfig.mqtt.address, default_mqtt_address) == 0 && @@ -501,6 +506,12 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, ChannelIndex chIndex) publish(topicJson.c_str(), jsonString.c_str(), false); } } + + // Generate an implicit ACK towards ourselves (handled and processed only locally!) for this message. + // We do this because packets are not rebroadcasted back into MQTT anymore and we assume that at least one node + // receives it when we're connected to the broker. Then we'll stop our retransmissions. + if (getFrom(&mp) == nodeDB.getNodeNum()) + routingModule->sendAckNak(meshtastic_Routing_Error_NONE, getFrom(&mp), mp.id, chIndex); } else { LOG_INFO("MQTT not connected, queueing packet\n"); if (mqttQueue.numFree() == 0) {