Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More comprehensive client proxy queue guards #3414

Merged
merged 13 commits into from
Mar 15, 2024
16 changes: 16 additions & 0 deletions src/mesh/Channels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <assert.h>

#include "mqtt/MQTT.h"

/// 16 bytes of random PSK for our _public_ default channel that all devices power up on (AES128)
static const uint8_t defaultpsk[] = {0xd4, 0xf1, 0xbb, 0x3a, 0x20, 0x29, 0x07, 0x59,
0xf0, 0xbc, 0xff, 0xab, 0xcf, 0x4e, 0x69, 0x01};
Expand Down Expand Up @@ -193,6 +195,10 @@ void Channels::onConfigChanged()
if (ch.role == meshtastic_Channel_Role_PRIMARY)
primaryIndex = i;
}
if (channels.anyMqttEnabled() && mqtt && !mqtt->isEnabled()) {
LOG_DEBUG("MQTT is enabled on at least one channel, so set MQTT thread to run immediately\n");
mqtt->start();
}
}

meshtastic_Channel &Channels::getByIndex(ChannelIndex chIndex)
Expand Down Expand Up @@ -237,6 +243,16 @@ void Channels::setChannel(const meshtastic_Channel &c)
old = c; // slam in the new settings/role
}

bool Channels::anyMqttEnabled()
{
for (int i = 0; i < getNumChannels(); i++)
if (channelFile.channels[i].role != meshtastic_Channel_Role_DISABLED && channelFile.channels[i].has_settings &&
(channelFile.channels[i].settings.downlink_enabled || channelFile.channels[i].settings.uplink_enabled))
return true;

return false;
}

const char *Channels::getName(size_t chIndex)
{
// Convert the short "" representation for Default into a usable string
Expand Down
3 changes: 3 additions & 0 deletions src/mesh/Channels.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ class Channels
// Returns true if we can be reached via a channel with the default settings given a region and modem preset
bool hasDefaultChannel();

// Returns true if any of our channels have enabled MQTT uplink or downlink
bool anyMqttEnabled();

private:
/** Given a channel index, change to use the crypto key specified by that index
*
Expand Down
6 changes: 5 additions & 1 deletion src/mesh/PhoneAPI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,12 @@ bool PhoneAPI::handleToRadio(const uint8_t *buf, size_t bufLength)
break;
case meshtastic_ToRadio_mqttClientProxyMessage_tag:
LOG_INFO("Got MqttClientProxy message\n");
if (mqtt && moduleConfig.mqtt.proxy_to_client_enabled) {
if (mqtt && moduleConfig.mqtt.proxy_to_client_enabled && moduleConfig.mqtt.enabled &&
(channels.anyMqttEnabled() || moduleConfig.mqtt.map_reporting_enabled)) {
mqtt->onClientProxyReceive(toRadioScratch.mqttClientProxyMessage);
} else {
LOG_WARN("MqttClientProxy received but proxy is not enabled, no channels have up/downlink, or map reporting "
"not enabled\n");
}
break;
default:
Expand Down
21 changes: 4 additions & 17 deletions src/mqtt/MQTT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,9 @@ void MQTT::sendSubscriptions()

bool MQTT::wantsLink() const
{
bool hasChannelorMapReport = false;
bool hasChannelorMapReport =
moduleConfig.mqtt.enabled && (moduleConfig.mqtt.map_reporting_enabled || channels.anyMqttEnabled());

if (moduleConfig.mqtt.enabled) {
hasChannelorMapReport = moduleConfig.mqtt.map_reporting_enabled;
if (!hasChannelorMapReport) {
// No need for link if no channel needed it
size_t numChan = channels.getNumChannels();
for (size_t i = 0; i < numChan; i++) {
const auto &ch = channels.getByIndex(i);
if (ch.settings.uplink_enabled || ch.settings.downlink_enabled) {
hasChannelorMapReport = true;
break;
}
}
}
}
if (hasChannelorMapReport && moduleConfig.mqtt.proxy_to_client_enabled)
return true;

Expand All @@ -401,7 +388,7 @@ bool MQTT::wantsLink() const

int32_t MQTT::runOnce()
{
if (!moduleConfig.mqtt.enabled)
if (!moduleConfig.mqtt.enabled || !(moduleConfig.mqtt.map_reporting_enabled || channels.anyMqttEnabled()))
return disable();

bool wantConnection = wantsLink();
Expand Down Expand Up @@ -915,4 +902,4 @@ bool MQTT::isValidJsonEnvelope(JSONObject &json)
(json["from"]->AsNumber() == nodeDB.getNodeNum()) && // only accept message if the "from" is us
(json.find("type") != json.end()) && json["type"]->IsString() && // should specify a type
(json.find("payload") != json.end()); // should have a payload
}
}
4 changes: 4 additions & 0 deletions src/mqtt/MQTT.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class MQTT : private concurrency::OSThread

void onClientProxyReceive(meshtastic_MqttClientProxyMessage msg);

bool isEnabled() { return this->enabled; };

void start() { setIntervalFromNow(0); };

protected:
PointerQueue<meshtastic_ServiceEnvelope> mqttQueue;

Expand Down
Loading