Skip to content

Commit

Permalink
Merge pull request #2094 from meshtastic/yank-mqtt-queue
Browse files Browse the repository at this point in the history
Yank mqtt service envelope queue
  • Loading branch information
thebentern authored Jan 3, 2023
2 parents cab5fcf + 1cfc7b8 commit 8914d1a
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 56 deletions.
61 changes: 9 additions & 52 deletions src/mqtt/MQTT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ String statusTopic = "msh/2/stat/";
String cryptTopic = "msh/2/c/"; // msh/2/c/CHANNELID/NODEID
String jsonTopic = "msh/2/json/"; // msh/2/json/CHANNELID/NODEID

static MemoryDynamic<ServiceEnvelope> staticMqttPool;

Allocator<ServiceEnvelope> &mqttPool = staticMqttPool;

void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length)
{
mqtt->onPublish(topic, payload, length);
Expand Down Expand Up @@ -126,7 +122,7 @@ void mqttInit()
new MQTT();
}

MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_MQTT_QUEUE)
MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient)
{
if(moduleConfig.mqtt.enabled) {

Expand Down Expand Up @@ -253,36 +249,9 @@ int32_t MQTT::runOnce()
if (!pubSub.loop()) {
if (wantConnection) {
reconnect();

// If we succeeded, empty the queue one by one and start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely)
if (pubSub.connected()) {
if (!mqttQueue.isEmpty()) {
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
ServiceEnvelope *env = mqttQueue.dequeuePtr(0);
static uint8_t bytes[MeshPacket_size + 64];
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env);

String topic = cryptTopic + env->channel_id + "/" + owner.id;
LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes);


pubSub.publish(topic.c_str(), bytes, numBytes, false);

if (moduleConfig.mqtt.json_enabled) {
// handle json topic
auto jsonString = this->downstreamPacketToJson(env->packet);
if (jsonString.length() != 0) {
String topicJson = jsonTopic + env->channel_id + "/" + owner.id;
LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str());
pubSub.publish(topicJson.c_str(), jsonString.c_str(), false);
}
}
mqttPool.release(env);
}
return 20;
} else {
return 30000;
}

// If we succeeded, start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely)
return pubSub.connected() ? 20 : 30000;
} else
return 5000; // If we don't want connection now, check again in 5 secs
} else {
Expand All @@ -304,17 +273,17 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
if (ch.settings.uplink_enabled) {
const char *channelId = channels.getGlobalId(chIndex); // FIXME, for now we just use the human name for the channel

ServiceEnvelope *env = mqttPool.allocZeroed();
env->channel_id = (char *)channelId;
env->gateway_id = owner.id;
env->packet = (MeshPacket *)&mp;
ServiceEnvelope env = ServiceEnvelope_init_default;
env.channel_id = (char *)channelId;
env.gateway_id = owner.id;
env.packet = (MeshPacket *)&mp;

// don't bother sending if not connected...
if (pubSub.connected()) {

// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
static uint8_t bytes[MeshPacket_size + 64];
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env);
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, &env);

String topic = cryptTopic + channelId + "/" + owner.id;
LOG_DEBUG("publish %s, %u bytes\n", topic.c_str(), numBytes);
Expand All @@ -330,19 +299,7 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
pubSub.publish(topicJson.c_str(), jsonString.c_str(), false);
}
}
} else {
LOG_INFO("MQTT not connected, queueing packet\n");
if (mqttQueue.numFree() == 0) {
LOG_WARN("NOTE: MQTT queue is full, discarding oldest\n");
ServiceEnvelope *d = mqttQueue.dequeuePtr(0);
if (d)
mqttPool.release(d);
}
// make a copy of serviceEnvelope and queue it
ServiceEnvelope *copied = mqttPool.allocCopy(*env);
assert(mqttQueue.enqueue(copied, 0));
}
mqttPool.release(env);
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/mqtt/MQTT.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
#include <EthernetClient.h>
#endif

#define MAX_MQTT_QUEUE 32

/**
* Our wrapper/singleton for sending/receiving MQTT "udp" packets. This object isolates the MQTT protocol implementation from
* the two components that use it: MQTTPlugin and MQTTSimInterface.
Expand Down Expand Up @@ -55,10 +53,9 @@ class MQTT : private concurrency::OSThread
bool connected();

protected:
PointerQueue<ServiceEnvelope> mqttQueue;

int reconnectCount = 0;

virtual int32_t runOnce() override;

private:
Expand Down

0 comments on commit 8914d1a

Please sign in to comment.