diff --git a/src/MQTTProtocolClient.c b/src/MQTTProtocolClient.c index 3e834ecb..5cc4eafd 100644 --- a/src/MQTTProtocolClient.c +++ b/src/MQTTProtocolClient.c @@ -338,13 +338,11 @@ int MQTTProtocol_handlePublishes(void* pack, SOCKET sock) if (publish->header.bits.qos == 1) { - /* if we get a socket error from sending the puback, should we ignore the publication? */ Protocol_processPublication(publish, client, 1); if (socketHasPendingWrites) rc = MQTTProtocol_queueAck(client, PUBACK, publish->msgId); else - /* send puback before processing the publications because a lot of return publications could fill up the socket buffer */ rc = MQTTPacket_send_puback(publish->MQTTVersion, publish->msgId, &client->net, client->clientID); } else if (publish->header.bits.qos == 2) @@ -382,11 +380,6 @@ int MQTTProtocol_handlePublishes(void* pack, SOCKET sock) } else ListAppend(client->inboundMsgs, m, sizeof(Messages) + len); - if (socketHasPendingWrites) - rc = MQTTProtocol_queueAck(client, PUBREC, publish->msgId); - else - rc = MQTTPacket_send_pubrec(publish->MQTTVersion, publish->msgId, &client->net, client->clientID); - if (m->MQTTVersion >= MQTTVERSION_5 && already_received == 0) { Publish publish1; @@ -416,7 +409,10 @@ int MQTTProtocol_handlePublishes(void* pack, SOCKET sock) } memcpy(m->publish->payload, temp, m->publish->payloadlen); } - rc = MQTTPacket_send_pubrec(publish->MQTTVersion, publish->msgId, &client->net, client->clientID); + if (socketHasPendingWrites) + rc = MQTTProtocol_queueAck(client, PUBREC, publish->msgId); + else + rc = MQTTPacket_send_pubrec(publish->MQTTVersion, publish->msgId, &client->net, client->clientID); publish->topic = NULL; } exit: @@ -580,12 +576,6 @@ int MQTTProtocol_handlePubrels(void* pack, SOCKET sock) memset(&publish, '\0', sizeof(publish)); - /* send pubcomp before processing the publications because a lot of return publications could fill up the socket buffer */ - if (!Socket_noPendingWrites(sock)) - rc = MQTTProtocol_queueAck(client, PUBCOMP, pubrel->msgId); - else - rc = MQTTPacket_send_pubcomp(pubrel->MQTTVersion, pubrel->msgId, &client->net, client->clientID); - publish.header.bits.qos = m->qos; publish.header.bits.retain = m->retain; publish.msgId = m->msgid; @@ -612,7 +602,11 @@ int MQTTProtocol_handlePubrels(void* pack, SOCKET sock) ListRemove(&(state.publications), m->publish); ListRemove(client->inboundMsgs, m); ++(state.msgs_received); - rc = MQTTPacket_send_pubcomp(pubrel->MQTTVersion, pubrel->msgId, &client->net, client->clientID); + + if (!Socket_noPendingWrites(sock)) + rc = MQTTProtocol_queueAck(client, PUBCOMP, pubrel->msgId); + else + rc = MQTTPacket_send_pubcomp(pubrel->MQTTVersion, pubrel->msgId, &client->net, client->clientID); } } if (pubrel->MQTTVersion >= MQTTVERSION_5)