Skip to content

Commit

Permalink
More comprehensive client proxy queue guards (#3414)
Browse files Browse the repository at this point in the history
* More comprehensive MQTT thread and queue guards

* Consolidate logic

* Remove channel check

* Check for map_reporting_enabled as well

* Update message

* Remove channel check from here as well

* One liner

* Start the mqtt thread back up when channels change and we want mqtt
  • Loading branch information
thebentern authored Mar 15, 2024
1 parent 4d0d82f commit 52cfec2
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 18 deletions.
16 changes: 16 additions & 0 deletions src/mesh/Channels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <assert.h>

#include "mqtt/MQTT.h"

/// 16 bytes of random PSK for our _public_ default channel that all devices power up on (AES128)
static const uint8_t defaultpsk[] = {0xd4, 0xf1, 0xbb, 0x3a, 0x20, 0x29, 0x07, 0x59,
0xf0, 0xbc, 0xff, 0xab, 0xcf, 0x4e, 0x69, 0x01};
Expand Down Expand Up @@ -193,6 +195,10 @@ void Channels::onConfigChanged()
if (ch.role == meshtastic_Channel_Role_PRIMARY)
primaryIndex = i;
}
if (channels.anyMqttEnabled() && mqtt && !mqtt->isEnabled()) {
LOG_DEBUG("MQTT is enabled on at least one channel, so set MQTT thread to run immediately\n");
mqtt->start();
}
}

meshtastic_Channel &Channels::getByIndex(ChannelIndex chIndex)
Expand Down Expand Up @@ -237,6 +243,16 @@ void Channels::setChannel(const meshtastic_Channel &c)
old = c; // slam in the new settings/role
}

bool Channels::anyMqttEnabled()
{
for (int i = 0; i < getNumChannels(); i++)
if (channelFile.channels[i].role != meshtastic_Channel_Role_DISABLED && channelFile.channels[i].has_settings &&
(channelFile.channels[i].settings.downlink_enabled || channelFile.channels[i].settings.uplink_enabled))
return true;

return false;
}

const char *Channels::getName(size_t chIndex)
{
// Convert the short "" representation for Default into a usable string
Expand Down
3 changes: 3 additions & 0 deletions src/mesh/Channels.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ class Channels
// Returns true if we can be reached via a channel with the default settings given a region and modem preset
bool hasDefaultChannel();

// Returns true if any of our channels have enabled MQTT uplink or downlink
bool anyMqttEnabled();

private:
/** Given a channel index, change to use the crypto key specified by that index
*
Expand Down
6 changes: 5 additions & 1 deletion src/mesh/PhoneAPI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,12 @@ bool PhoneAPI::handleToRadio(const uint8_t *buf, size_t bufLength)
break;
case meshtastic_ToRadio_mqttClientProxyMessage_tag:
LOG_INFO("Got MqttClientProxy message\n");
if (mqtt && moduleConfig.mqtt.proxy_to_client_enabled) {
if (mqtt && moduleConfig.mqtt.proxy_to_client_enabled && moduleConfig.mqtt.enabled &&
(channels.anyMqttEnabled() || moduleConfig.mqtt.map_reporting_enabled)) {
mqtt->onClientProxyReceive(toRadioScratch.mqttClientProxyMessage);
} else {
LOG_WARN("MqttClientProxy received but proxy is not enabled, no channels have up/downlink, or map reporting "
"not enabled\n");
}
break;
default:
Expand Down
21 changes: 4 additions & 17 deletions src/mqtt/MQTT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,9 @@ void MQTT::sendSubscriptions()

bool MQTT::wantsLink() const
{
bool hasChannelorMapReport = false;
bool hasChannelorMapReport =
moduleConfig.mqtt.enabled && (moduleConfig.mqtt.map_reporting_enabled || channels.anyMqttEnabled());

if (moduleConfig.mqtt.enabled) {
hasChannelorMapReport = moduleConfig.mqtt.map_reporting_enabled;
if (!hasChannelorMapReport) {
// No need for link if no channel needed it
size_t numChan = channels.getNumChannels();
for (size_t i = 0; i < numChan; i++) {
const auto &ch = channels.getByIndex(i);
if (ch.settings.uplink_enabled || ch.settings.downlink_enabled) {
hasChannelorMapReport = true;
break;
}
}
}
}
if (hasChannelorMapReport && moduleConfig.mqtt.proxy_to_client_enabled)
return true;

Expand All @@ -401,7 +388,7 @@ bool MQTT::wantsLink() const

int32_t MQTT::runOnce()
{
if (!moduleConfig.mqtt.enabled)
if (!moduleConfig.mqtt.enabled || !(moduleConfig.mqtt.map_reporting_enabled || channels.anyMqttEnabled()))
return disable();

bool wantConnection = wantsLink();
Expand Down Expand Up @@ -915,4 +902,4 @@ bool MQTT::isValidJsonEnvelope(JSONObject &json)
(json["from"]->AsNumber() == nodeDB.getNodeNum()) && // only accept message if the "from" is us
(json.find("type") != json.end()) && json["type"]->IsString() && // should specify a type
(json.find("payload") != json.end()); // should have a payload
}
}
4 changes: 4 additions & 0 deletions src/mqtt/MQTT.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class MQTT : private concurrency::OSThread

void onClientProxyReceive(meshtastic_MqttClientProxyMessage msg);

bool isEnabled() { return this->enabled; };

void start() { setIntervalFromNow(0); };

protected:
PointerQueue<meshtastic_ServiceEnvelope> mqttQueue;

Expand Down

0 comments on commit 52cfec2

Please sign in to comment.