Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StoreForward updates #3194

Merged
merged 11 commits into from
Feb 14, 2024
93 changes: 51 additions & 42 deletions src/modules/esp32/StoreForwardModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,11 @@ int32_t StoreForwardModule::runOnce()
// Only send packets if the channel is less than 25% utilized.
if (airTime->isTxAllowedChannelUtil(true)) {
storeForwardModule->sendPayload(this->busyTo, this->packetHistoryTXQueue_index);
if (this->packetHistoryTXQueue_index == packetHistoryTXQueue_size) {
// Tell the client we're done sending
meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero;
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_PING;
storeForwardModule->sendMessage(this->busyTo, sf);
LOG_INFO("*** S&F - Done. (ROUTER_PING)\n");
if (this->packetHistoryTXQueue_index < packetHistoryTXQueue_size - 1) {
this->packetHistoryTXQueue_index++;
} else {
this->packetHistoryTXQueue_index = 0;
this->busy = false;
} else {
this->packetHistoryTXQueue_index++;
}
}
} else if ((millis() - lastHeartbeat > (heartbeatInterval * 1000)) && airTime->isTxAllowedChannelUtil(true)) {
Expand All @@ -56,7 +51,7 @@ int32_t StoreForwardModule::runOnce()
meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero;
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_HEARTBEAT;
sf.which_variant = meshtastic_StoreAndForward_heartbeat_tag;
sf.variant.heartbeat.period = 300;
sf.variant.heartbeat.period = heartbeatInterval;
sf.variant.heartbeat.secondary = 0; // TODO we always have one primary router for now
storeForwardModule->sendMessage(NODENUM_BROADCAST, sf);
}
Expand Down Expand Up @@ -101,10 +96,11 @@ void StoreForwardModule::populatePSRAM()
*
* @param msAgo The number of milliseconds ago from which to start sending messages.
* @param to The recipient ID to send the messages to.
* @param last_request_index The index in the packet history of the last request from this node.
*/
void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to)
void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to, uint32_t last_request_index)
{
uint32_t queueSize = storeForwardModule->historyQueueCreate(msAgo, to);
uint32_t queueSize = storeForwardModule->historyQueueCreate(msAgo, to, &last_request_index);

if (queueSize) {
LOG_INFO("*** S&F - Sending %u message(s)\n", queueSize);
Expand All @@ -118,39 +114,38 @@ void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to)
sf.which_variant = meshtastic_StoreAndForward_history_tag;
sf.variant.history.history_messages = queueSize;
sf.variant.history.window = msAgo;
sf.variant.history.last_request = last_request_index;
storeForwardModule->sendMessage(to, sf);
}

/**
* Creates a new history queue with messages that were received within the specified time frame.
*
* @param msAgo The number of milliseconds ago to start the history queue.
* @param to The maximum number of messages to include in the history queue.
* @param to The NodeNum of the recipient.
* @param last_request_index The index in the packet history of the last request from this node.
* @return The ID of the newly created history queue.
*/
uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to)
uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to, uint32_t *last_request_index)
{

this->packetHistoryTXQueue_size = 0;
// If our history was cleared, ignore what the client is telling us
uint32_t last_index = *last_request_index >= this->packetHistoryCurrent ? 0 : *last_request_index;

for (int i = 0; i < this->packetHistoryCurrent; i++) {
for (int i = last_index; i < this->packetHistoryCurrent; i++) {
/*
LOG_DEBUG("SF historyQueueCreate\n");
LOG_DEBUG("SF historyQueueCreate - time %d\n", this->packetHistory[i].time);
LOG_DEBUG("SF historyQueueCreate - millis %d\n", millis());
LOG_DEBUG("SF historyQueueCreate - math %d\n", (millis() - msAgo));
*/
if (this->packetHistory[i].time && (this->packetHistory[i].time < (millis() - msAgo))) {
LOG_DEBUG("*** SF historyQueueCreate - Time matches - ok\n");
/*
Copy the messages that were received by the router in the last msAgo
/* Copy the messages that were received by the router in the last msAgo
to the packetHistoryTXQueue structure.

TODO: The condition (this->packetHistory[i].to & NODENUM_BROADCAST) == to) is not tested since
I don't have an easy way to target a specific user. Will need to do this soon.
*/
if ((this->packetHistory[i].to & NODENUM_BROADCAST) == NODENUM_BROADCAST ||
((this->packetHistory[i].to & NODENUM_BROADCAST) == to)) {
Client not interested in packets from itself and only in broadcast packets or packets towards it. */
if (this->packetHistory[i].from != to &&
(this->packetHistory[i].to == NODENUM_BROADCAST || this->packetHistory[i].to == to)) {
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].time = this->packetHistory[i].time;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].to = this->packetHistory[i].to;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].from = this->packetHistory[i].from;
Expand All @@ -159,9 +154,10 @@ uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to)
memcpy(this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].payload, this->packetHistory[i].payload,
meshtastic_Constants_DATA_PAYLOAD_LEN);
this->packetHistoryTXQueue_size++;
*last_request_index = i + 1; // Set to one higher such that we don't send the same message again

LOG_DEBUG("*** PacketHistoryStruct time=%d\n", this->packetHistory[i].time);
LOG_DEBUG("*** PacketHistoryStruct msg=%s\n", this->packetHistory[i].payload);
LOG_DEBUG("*** PacketHistoryStruct time=%d, msg=%s\n", this->packetHistory[i].time,
this->packetHistory[i].payload);
}
}
}
Expand All @@ -177,15 +173,20 @@ void StoreForwardModule::historyAdd(const meshtastic_MeshPacket &mp)
{
const auto &p = mp.decoded;

this->packetHistory[this->packetHistoryCurrent].time = millis();
this->packetHistory[this->packetHistoryCurrent].to = mp.to;
this->packetHistory[this->packetHistoryCurrent].channel = mp.channel;
this->packetHistory[this->packetHistoryCurrent].from = mp.from;
this->packetHistory[this->packetHistoryCurrent].payload_size = p.payload.size;
memcpy(this->packetHistory[this->packetHistoryCurrent].payload, p.payload.bytes, meshtastic_Constants_DATA_PAYLOAD_LEN);
if (this->packetHistoryCurrent < this->records) {
this->packetHistory[this->packetHistoryCurrent].time = millis();
this->packetHistory[this->packetHistoryCurrent].to = mp.to;
this->packetHistory[this->packetHistoryCurrent].channel = mp.channel;
this->packetHistory[this->packetHistoryCurrent].from = mp.from;
this->packetHistory[this->packetHistoryCurrent].payload_size = p.payload.size;
memcpy(this->packetHistory[this->packetHistoryCurrent].payload, p.payload.bytes, meshtastic_Constants_DATA_PAYLOAD_LEN);

this->packetHistoryCurrent++;
this->packetHistoryMax++;
this->packetHistoryCurrent++;
this->packetHistoryMax++;
} else {
// TODO: Overwrite the oldest message in the history buffer when it is full.
LOG_WARN("*** S&F - PSRAM Full. Packet is not added to the history.\n");
}
}

meshtastic_MeshPacket *StoreForwardModule::allocReply()
Expand Down Expand Up @@ -213,10 +214,19 @@ void StoreForwardModule::sendPayload(NodeNum dest, uint32_t packetHistory_index)
// TODO: Make this configurable.
p->want_ack = false;

p->decoded.payload.size =
this->packetHistoryTXQueue[packetHistory_index].payload_size; // You must specify how many bytes are in the reply
memcpy(p->decoded.payload.bytes, this->packetHistoryTXQueue[packetHistory_index].payload,
meshtastic_StoreAndForward sf = meshtastic_StoreAndForward_init_zero;
sf.which_variant = meshtastic_StoreAndForward_text_tag;
sf.variant.text.size = this->packetHistoryTXQueue[packetHistory_index].payload_size;
memcpy(sf.variant.text.bytes, this->packetHistoryTXQueue[packetHistory_index].payload,
this->packetHistoryTXQueue[packetHistory_index].payload_size);
if (this->packetHistoryTXQueue[packetHistory_index].to == NODENUM_BROADCAST) {
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_TEXT_BROADCAST;
} else {
sf.rr = meshtastic_StoreAndForward_RequestResponse_ROUTER_TEXT_DIRECT;
}

p->decoded.payload.size =
pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes), &meshtastic_StoreAndForward_msg, &sf);

service.sendToMesh(p);
}
Expand Down Expand Up @@ -387,7 +397,9 @@ bool StoreForwardModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp,
LOG_INFO("*** S&F - Busy. Try again shortly.\n");
} else {
if ((p->which_variant == meshtastic_StoreAndForward_history_tag) && (p->variant.history.window > 0)) {
storeForwardModule->historySend(p->variant.history.window * 60000, getFrom(&mp)); // window is in minutes
// window is in minutes
storeForwardModule->historySend(p->variant.history.window * 60000, getFrom(&mp),
p->variant.history.last_request);
} else {
storeForwardModule->historySend(historyReturnWindow * 60000, getFrom(&mp)); // defaults to 4 hours
}
Expand All @@ -406,8 +418,7 @@ bool StoreForwardModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp,
case meshtastic_StoreAndForward_RequestResponse_CLIENT_PONG:
if (is_server) {
LOG_INFO("*** StoreAndForward_RequestResponse_CLIENT_PONG\n");
// The Client is alive, update NodeDB
nodeDB.updateFrom(mp);
// NodeDB is already updated
}
break;

Expand Down Expand Up @@ -546,9 +557,7 @@ StoreForwardModule::StoreForwardModule()
}

// Client
}
if ((config.device.role == meshtastic_Config_DeviceConfig_Role_CLIENT) ||
(config.device.role == meshtastic_Config_DeviceConfig_Role_ROUTER_CLIENT)) {
} else {
is_client = true;
LOG_INFO("*** Initializing Store & Forward Module in Client mode\n");
}
Expand Down
21 changes: 10 additions & 11 deletions src/modules/esp32/StoreForwardModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ struct PacketHistoryStruct {
uint32_t to;
uint32_t from;
uint8_t channel;
bool ack;
uint8_t payload[meshtastic_Constants_DATA_PAYLOAD_LEN];
pb_size_t payload_size;
};
Expand All @@ -32,7 +31,7 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
uint32_t packetHistoryTXQueue_size = 0;
uint32_t packetHistoryTXQueue_index = 0;

uint32_t packetTimeMax = 5000;
uint32_t packetTimeMax = 5000; // Interval between sending history packets as a server.

bool is_client = false;
bool is_server = false;
Expand All @@ -41,17 +40,17 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
StoreForwardModule();

unsigned long lastHeartbeat = 0;
uint32_t heartbeatInterval = 300;
uint32_t heartbeatInterval = default_broadcast_interval_secs;

/**
Update our local reference of when we last saw that node.
@return 0 if we have never seen that node before otherwise return the last time we saw the node.
*/
void historyAdd(const meshtastic_MeshPacket &mp);
void statsSend(uint32_t to);
void historySend(uint32_t msAgo, uint32_t to);
void historySend(uint32_t msAgo, uint32_t to, uint32_t last_request_index = 0);

uint32_t historyQueueCreate(uint32_t msAgo, uint32_t to);
uint32_t historyQueueCreate(uint32_t msAgo, uint32_t to, uint32_t *last_request_index);

/**
* Send our payload into the mesh
Expand Down Expand Up @@ -79,16 +78,16 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
void populatePSRAM();

// S&F Defaults
uint32_t historyReturnMax = 250; // 250 records
uint32_t historyReturnWindow = 240; // 4 hours
uint32_t historyReturnMax = 25; // Return maximum of 25 records by default.
uint32_t historyReturnWindow = 240; // Return history of last 4 hours by default.
uint32_t records = 0; // Calculated
bool heartbeat = false; // No heartbeat.

// stats
uint32_t requests = 0;
uint32_t requests_history = 0;
uint32_t requests = 0; // Number of times any client sent a request to the S&F.
uint32_t requests_history = 0; // Number of times the history was requested.

uint32_t retry_delay = 0;
uint32_t retry_delay = 0; // If server is busy, retry after this delay (in ms).

protected:
virtual int32_t runOnce() override;
Expand All @@ -102,4 +101,4 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
virtual bool handleReceivedProtobuf(const meshtastic_MeshPacket &mp, meshtastic_StoreAndForward *p);
};

extern StoreForwardModule *storeForwardModule;
extern StoreForwardModule *storeForwardModule;
Loading