Skip to content

Commit

Permalink
Fix clash of commits that resulted in some QoS2 packets being sent twice
Browse files Browse the repository at this point in the history
  • Loading branch information
icraggs committed Feb 4, 2022
1 parent 837e80f commit 017e0b4
Showing 1 changed file with 9 additions and 15 deletions.
24 changes: 9 additions & 15 deletions src/MQTTProtocolClient.c
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,11 @@ int MQTTProtocol_handlePublishes(void* pack, SOCKET sock)

if (publish->header.bits.qos == 1)
{
/* if we get a socket error from sending the puback, should we ignore the publication? */
Protocol_processPublication(publish, client, 1);

if (socketHasPendingWrites)
rc = MQTTProtocol_queueAck(client, PUBACK, publish->msgId);
else
/* send puback before processing the publications because a lot of return publications could fill up the socket buffer */
rc = MQTTPacket_send_puback(publish->MQTTVersion, publish->msgId, &client->net, client->clientID);
}
else if (publish->header.bits.qos == 2)
Expand Down Expand Up @@ -382,11 +380,6 @@ int MQTTProtocol_handlePublishes(void* pack, SOCKET sock)
} else
ListAppend(client->inboundMsgs, m, sizeof(Messages) + len);

if (socketHasPendingWrites)
rc = MQTTProtocol_queueAck(client, PUBREC, publish->msgId);
else
rc = MQTTPacket_send_pubrec(publish->MQTTVersion, publish->msgId, &client->net, client->clientID);

if (m->MQTTVersion >= MQTTVERSION_5 && already_received == 0)
{
Publish publish1;
Expand Down Expand Up @@ -416,7 +409,10 @@ int MQTTProtocol_handlePublishes(void* pack, SOCKET sock)
}
memcpy(m->publish->payload, temp, m->publish->payloadlen);
}
rc = MQTTPacket_send_pubrec(publish->MQTTVersion, publish->msgId, &client->net, client->clientID);
if (socketHasPendingWrites)
rc = MQTTProtocol_queueAck(client, PUBREC, publish->msgId);
else
rc = MQTTPacket_send_pubrec(publish->MQTTVersion, publish->msgId, &client->net, client->clientID);
publish->topic = NULL;
}
exit:
Expand Down Expand Up @@ -580,12 +576,6 @@ int MQTTProtocol_handlePubrels(void* pack, SOCKET sock)

memset(&publish, '\0', sizeof(publish));

/* send pubcomp before processing the publications because a lot of return publications could fill up the socket buffer */
if (!Socket_noPendingWrites(sock))
rc = MQTTProtocol_queueAck(client, PUBCOMP, pubrel->msgId);
else
rc = MQTTPacket_send_pubcomp(pubrel->MQTTVersion, pubrel->msgId, &client->net, client->clientID);

publish.header.bits.qos = m->qos;
publish.header.bits.retain = m->retain;
publish.msgId = m->msgid;
Expand All @@ -612,7 +602,11 @@ int MQTTProtocol_handlePubrels(void* pack, SOCKET sock)
ListRemove(&(state.publications), m->publish);
ListRemove(client->inboundMsgs, m);
++(state.msgs_received);
rc = MQTTPacket_send_pubcomp(pubrel->MQTTVersion, pubrel->msgId, &client->net, client->clientID);

if (!Socket_noPendingWrites(sock))
rc = MQTTProtocol_queueAck(client, PUBCOMP, pubrel->msgId);
else
rc = MQTTPacket_send_pubcomp(pubrel->MQTTVersion, pubrel->msgId, &client->net, client->clientID);
}
}
if (pubrel->MQTTVersion >= MQTTVERSION_5)
Expand Down

0 comments on commit 017e0b4

Please sign in to comment.