Skip to content

Commit

Permalink
Mark packets received via MQTT and add option to ignore them (#3117)
Browse files Browse the repository at this point in the history
* Mark packets received via MQTT and add option to ignore them

* Don't send packets received via MQTT back into MQTT
Generate implicit ACK for packets we as an MQTT gateway sent

---------

Co-authored-by: Ben Meadors <[email protected]>
  • Loading branch information
GUVWAF and thebentern authored Jan 20, 2024
1 parent 4f76239 commit 8f6a283
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/mesh/NodeDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ void NodeDB::installDefaultConfig()
config.lora.region = meshtastic_Config_LoRaConfig_RegionCode_UNSET;
config.lora.modem_preset = meshtastic_Config_LoRaConfig_ModemPreset_LONG_FAST;
config.lora.hop_limit = HOP_RELIABLE;
config.lora.ignore_mqtt = false;
#ifdef PIN_GPS_EN
config.position.gps_en_gpio = PIN_GPS_EN;
#endif
Expand Down
15 changes: 7 additions & 8 deletions src/mesh/RadioInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,15 +287,14 @@ void printPacket(const char *prefix, const meshtastic_MeshPacket *p)
out += " encrypted";
}

if (p->rx_time != 0) {
if (p->rx_time != 0)
out += DEBUG_PORT.mt_sprintf(" rxtime=%u", p->rx_time);
}
if (p->rx_snr != 0.0) {
if (p->rx_snr != 0.0)
out += DEBUG_PORT.mt_sprintf(" rxSNR=%g", p->rx_snr);
}
if (p->rx_rssi != 0) {
if (p->rx_rssi != 0)
out += DEBUG_PORT.mt_sprintf(" rxRSSI=%i", p->rx_rssi);
}
if (p->via_mqtt != 0)
out += DEBUG_PORT.mt_sprintf(" via MQTT");
if (p->priority != 0)
out += DEBUG_PORT.mt_sprintf(" priority=%d", p->priority);

Expand Down Expand Up @@ -554,7 +553,7 @@ size_t RadioInterface::beginSending(meshtastic_MeshPacket *p)
LOG_WARN("hop limit %d is too high, setting to %d\n", p->hop_limit, HOP_RELIABLE);
p->hop_limit = HOP_RELIABLE;
}
h->flags = p->hop_limit | (p->want_ack ? PACKET_FLAGS_WANT_ACK_MASK : 0);
h->flags = p->hop_limit | (p->want_ack ? PACKET_FLAGS_WANT_ACK_MASK : 0) | (p->via_mqtt ? PACKET_FLAGS_VIA_MQTT_MASK : 0);

// if the sender nodenum is zero, that means uninitialized
assert(h->from);
Expand All @@ -563,4 +562,4 @@ size_t RadioInterface::beginSending(meshtastic_MeshPacket *p)

sendingPacket = p;
return p->encrypted.size + sizeof(PacketHeader);
}
}
3 changes: 2 additions & 1 deletion src/mesh/RadioInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#define PACKET_FLAGS_HOP_MASK 0x07
#define PACKET_FLAGS_WANT_ACK_MASK 0x08
#define PACKET_FLAGS_VIA_MQTT_MASK 0x10

/**
* This structure has to exactly match the wire layout when sent over the radio link. Used to keep compatibility
Expand Down Expand Up @@ -223,4 +224,4 @@ class RadioInterface
};

/// Debug printing for packets
void printPacket(const char *prefix, const meshtastic_MeshPacket *p);
void printPacket(const char *prefix, const meshtastic_MeshPacket *p);
3 changes: 2 additions & 1 deletion src/mesh/RadioLibInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ void RadioLibInterface::handleReceiveInterrupt()
assert(HOP_MAX <= PACKET_FLAGS_HOP_MASK); // If hopmax changes, carefully check this code
mp->hop_limit = h->flags & PACKET_FLAGS_HOP_MASK;
mp->want_ack = !!(h->flags & PACKET_FLAGS_WANT_ACK_MASK);
mp->via_mqtt = !!(h->flags & PACKET_FLAGS_VIA_MQTT_MASK);

addReceiveMetadata(mp);

Expand Down Expand Up @@ -406,4 +407,4 @@ void RadioLibInterface::startSend(meshtastic_MeshPacket *txp)
// bits
enableInterrupt(isrTxLevel0);
}
}
}
6 changes: 3 additions & 3 deletions src/mesh/Router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,10 @@ void Router::handleReceived(meshtastic_MeshPacket *p, RxSource src)
void Router::perhapsHandleReceived(meshtastic_MeshPacket *p)
{
// assert(radioConfig.has_preferences);
bool ignore = is_in_repeated(config.lora.ignore_incoming, p->from);
bool ignore = is_in_repeated(config.lora.ignore_incoming, p->from) || (config.lora.ignore_mqtt && p->via_mqtt);

if (ignore) {
LOG_DEBUG("Ignoring incoming message, 0x%x is in our ignore list\n", p->from);
LOG_DEBUG("Ignoring incoming message, 0x%x is in our ignore list or came via MQTT\n", p->from);
} else if (ignore |= shouldFilterReceived(p)) {
LOG_DEBUG("Incoming message was filtered 0x%x\n", p->from);
}
Expand All @@ -481,4 +481,4 @@ void Router::perhapsHandleReceived(meshtastic_MeshPacket *p)
handleReceived(p);

packetPool.release(p);
}
}
2 changes: 1 addition & 1 deletion src/modules/TraceRouteModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,4 @@ TraceRouteModule::TraceRouteModule()
: ProtobufModule("traceroute", meshtastic_PortNum_TRACEROUTE_APP, &meshtastic_RouteDiscovery_msg)
{
ourPortNum = meshtastic_PortNum_TRACEROUTE_APP;
}
}
11 changes: 11 additions & 0 deletions src/mqtt/MQTT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "mesh/Router.h"
#include "mesh/generated/meshtastic/mqtt.pb.h"
#include "mesh/generated/meshtastic/telemetry.pb.h"
#include "modules/RoutingModule.h"
#if defined(ARCH_ESP32)
#include "../mesh/generated/meshtastic/paxcount.pb.h"
#endif
Expand Down Expand Up @@ -142,6 +143,7 @@ void MQTT::onReceive(char *topic, byte *payload, size_t length)
if (strcmp(e.channel_id, channels.getGlobalId(ch.index)) == 0 && e.packet && ch.settings.downlink_enabled) {
LOG_INFO("Received MQTT topic %s, len=%u\n", topic, length);
meshtastic_MeshPacket *p = packetPool.allocCopy(*e.packet);
p->via_mqtt = true; // Mark that the packet was received via MQTT

if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
p->channel = ch.index;
Expand Down Expand Up @@ -463,6 +465,9 @@ void MQTT::publishQueuedMessages()

void MQTT::onSend(const meshtastic_MeshPacket &mp, ChannelIndex chIndex)
{
if (mp.via_mqtt)
return; // Don't send messages that came from MQTT back into MQTT

auto &ch = channels.getByIndex(chIndex);

if (&mp.decoded && strcmp(moduleConfig.mqtt.address, default_mqtt_address) == 0 &&
Expand Down Expand Up @@ -501,6 +506,12 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, ChannelIndex chIndex)
publish(topicJson.c_str(), jsonString.c_str(), false);
}
}

// Generate an implicit ACK towards ourselves (handled and processed only locally!) for this message.
// We do this because packets are not rebroadcasted back into MQTT anymore and we assume that at least one node
// receives it when we're connected to the broker. Then we'll stop our retransmissions.
if (getFrom(&mp) == nodeDB.getNodeNum())
routingModule->sendAckNak(meshtastic_Routing_Error_NONE, getFrom(&mp), mp.id, chIndex);
} else {
LOG_INFO("MQTT not connected, queueing packet\n");
if (mqttQueue.numFree() == 0) {
Expand Down

0 comments on commit 8f6a283

Please sign in to comment.