Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabling Unclean Session Publish Re-Transmits #308

Merged
merged 38 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f79d06b
Update the APIs to check the connected flag and be thread safe
Sep 6, 2024
1e2cd7a
Add an intermediate connection state to handle disconnects due to net…
Sep 10, 2024
19fd967
Rectify Unit Tests Errors
Sep 11, 2024
5ead3f8
Add a new macro to detect transport error in core_mqtt_serializer.h
Sep 11, 2024
431b34f
Resolve Spellcheck and Doxygen Tests
Sep 11, 2024
f67688e
Resolve Formatting Test
Sep 11, 2024
15c6cbb
Fix Unit Tests
DakshitBabbar Sep 11, 2024
0eda275
Uncrustify: triggered by comment.
actions-user Sep 11, 2024
eb68f0c
Add new unit tests for 100% code coverage
DakshitBabbar Sep 12, 2024
022fcd1
Merge branch 'main' into coreMQTTThreadSafe
DakshitBabbar Sep 12, 2024
0d7dade
Resolve Formatting and PR Comments
Sep 12, 2024
5c3524e
Implement critical section without mutexTaken flag
DakshitBabbar Sep 13, 2024
2382195
Rectify conflicts
DakshitBabbar Sep 13, 2024
b485375
Resolve issues with failing unit tests on Mac and Memory Statistics
Sep 13, 2024
febdaa0
Merge branch 'FreeRTOS:main' into coreMQTTThreadSafe
DakshitBabbar Sep 19, 2024
8ca97ab
Resolve Formatting
Sep 19, 2024
dbef1fd
Enable the library to re-transmit publishes on unclean session connec…
Sep 23, 2024
ad42032
Add changes for new return status codes and make retransmit thread safe
Sep 24, 2024
02d3456
Update the case when re-transmits fail after connack is received
Sep 24, 2024
9e8c309
Uncrustify: triggered by comment.
actions-user Sep 24, 2024
8fa59a1
Update unit tests
Sep 24, 2024
86c6b9f
Updated existing unit tests
Sep 24, 2024
4c86ac2
Implement new unit tests for the changes related to publish retransmit
Sep 26, 2024
9413942
Merge branch 'FreeRTOS:main' into reTransmit
DakshitBabbar Sep 26, 2024
853b6f4
Merge main into reTransmit
Sep 27, 2024
e5834d8
Clear publish copy after state update on receiving ACK
Sep 30, 2024
0753992
Uncrustify: triggered by comment.
actions-user Sep 30, 2024
43aed2a
Resolve memory stats, spell check, doxygen example and comments
Sep 30, 2024
80b5931
Resolve formating issues
Oct 1, 2024
8e04527
Resolve PR comments
Oct 3, 2024
eba9592
Update doxygen comments with new return status codes
Oct 3, 2024
451e63c
Update new return codes
Oct 3, 2024
4905c90
Add opaque struct and functions to serialize it.
AniruddhaKanhere Oct 22, 2024
7808d02
Uncrustify: triggered by comment.
actions-user Oct 22, 2024
d3b3c8e
Add Doxygen comment
AniruddhaKanhere Oct 22, 2024
2eda149
Merge pull request #1 from AniruddhaKanhere/reTransmit
DakshitBabbar Oct 22, 2024
a643f7f
Add opaque struct and functions to serialize it.
AniruddhaKanhere Oct 22, 2024
74fa43f
Move the length of vector in the opaque struct
AniruddhaKanhere Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/doxygen/include/size_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
</tr>
<tr>
<td>core_mqtt.c</td>
<td><center>4.4K</center></td>
<td><center>3.8K</center></td>
<td><center>4.9K</center></td>
<td><center>4.2K</center></td>
</tr>
<tr>
<td>core_mqtt_state.c</td>
Expand All @@ -19,12 +19,12 @@
</tr>
<tr>
<td>core_mqtt_serializer.c</td>
<td><center>2.8K</center></td>
<td><center>2.2K</center></td>
<td><center>2.9K</center></td>
<td><center>2.3K</center></td>
</tr>
<tr>
<td><b>Total estimates</b></td>
<td><b><center>8.9K</center></b></td>
<td><b><center>7.3K</center></b></td>
<td><b><center>9.5K</center></b></td>
<td><b><center>7.8K</center></b></td>
</tr>
</table>
171 changes: 164 additions & 7 deletions source/core_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,11 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext );
* @brief Clears existing state records for a clean session.
*
* @param[in] pContext Initialized MQTT context.
*
* @return #MQTTPublishClearAllFailed if clearing all the copied publishes fails;
* #MQTTSuccess otherwise.
*/
static void handleCleanSession( MQTTContext_t * pContext );
static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext );

/**
* @brief Send the publish packet without copying the topic string and payload in
Expand All @@ -463,7 +466,7 @@ static void handleCleanSession( MQTTContext_t * pContext );
*/
static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
const uint8_t * pMqttHeader,
uint8_t * pMqttHeader,
size_t headerSize,
uint16_t packetId );

Expand Down Expand Up @@ -1597,6 +1600,16 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
}
}

if( ( ackType == MQTTPuback ) || ( ackType == MQTTPubrec ) )
{
if( ( status == MQTTSuccess ) &&
( pContext->clearFunction != NULL ) &&
( pContext->clearFunction( pContext, packetIdentifier ) != true ) )
{
LogWarn( ( "Failed to clear copied publish on receiving an ack.\n" ) );
}
}

if( status == MQTTSuccess )
{
/* Set fields of deserialized struct. */
Expand Down Expand Up @@ -2133,13 +2146,14 @@ static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,

static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
const uint8_t * pMqttHeader,
uint8_t * pMqttHeader,
size_t headerSize,
uint16_t packetId )
{
MQTTStatus_t status = MQTTSuccess;
size_t ioVectorLength;
size_t totalMessageLength;
bool dupFlagChanged = false;

/* Bytes required to encode the packet ID in an MQTT header according to
* the MQTT specification. */
Expand Down Expand Up @@ -2190,7 +2204,35 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
totalMessageLength += pPublishInfo->payloadLength;
}

if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength )
/* If not already set, set the dup flag before storing a copy of the publish
* this is because on retrieving back this copy we will get it in the form of an
* array of TransportOutVector_t that holds the data in a const pointer which cannot be
* changed after retrieving. */
if( pPublishInfo->dup != true )
{
MQTT_UpdateDuplicatePublishFlag( pMqttHeader, true );

dupFlagChanged = true;
}

/* store a copy of the publish for retransmission purposes */
if( ( pPublishInfo->qos > MQTTQoS0 ) &&
( pContext->storeFunction != NULL ) &&
( pContext->storeFunction( pContext, packetId, pIoVector, ioVectorLength ) != true ) )
{
status = MQTTPublishStoreFailed;
}

/* change the value of the dup flag to its original, if it was changed */
if( dupFlagChanged )
{
MQTT_UpdateDuplicatePublishFlag( pMqttHeader, false );

dupFlagChanged = false;
}

if( ( status == MQTTSuccess ) &&
( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength ) )
{
status = MQTTSendFailed;
}
Expand Down Expand Up @@ -2477,6 +2519,9 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
uint16_t packetId = MQTT_PACKET_ID_INVALID;
MQTTPublishState_t state = MQTTStateNull;
TransportOutVector_t * pIoVec, * pIoVectIterator;
size_t ioVecCount;
size_t totalMessageLength;

assert( pContext != NULL );

Expand All @@ -2492,11 +2537,56 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
}

if( ( status == MQTTSuccess ) &&
( pContext->retrieveFunction != NULL ) )
{
cursor = MQTT_STATE_CURSOR_INITIALIZER;

packetId = MQTT_PublishToResend( pContext, &cursor );

if( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( pContext->retrieveFunction( pContext, packetId, &pIoVec, &ioVecCount ) != true ) )
{
status = MQTTPublishRetrieveFailed;
}

/* Resend all the PUBLISH for which PUBACK/PUBREC is not received
* after session is reestablished. */
while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( status == MQTTSuccess ) )
{
totalMessageLength = 0;

for( pIoVectIterator = pIoVec; pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ); pIoVectIterator++ )
{
totalMessageLength += pIoVectIterator->iov_len;
}

MQTT_PRE_STATE_UPDATE_HOOK( pContext );

if( sendMessageVector( pContext, pIoVec, ioVecCount ) != ( int32_t ) totalMessageLength )
{
status = MQTTSendFailed;
}

MQTT_POST_STATE_UPDATE_HOOK( pContext );

packetId = MQTT_PublishToResend( pContext, &cursor );

if( pContext->retrieveFunction( pContext, packetId, &pIoVec, &ioVecCount ) != true )
{
status = MQTTPublishRetrieveFailed;
}
}
}

return status;
}

static void handleCleanSession( MQTTContext_t * pContext )
static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext )
{
MQTTStatus_t status = MQTTSuccess;

assert( pContext != NULL );

/* Reset the index and clear the buffer when a new session is established. */
Expand All @@ -2517,6 +2607,14 @@ static void handleCleanSession( MQTTContext_t * pContext )
0x00,
pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) );
}

if( ( pContext->clearAllFunction != NULL ) &&
( pContext->clearAllFunction( pContext ) != true ) )
{
status = MQTTPublishClearAllFailed;
}

return status;
}

static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
Expand Down Expand Up @@ -2681,6 +2779,53 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,

/*-----------------------------------------------------------*/

MQTTStatus_t MQTT_InitRetransmits( MQTTContext_t * pContext,
MQTTStorePacketForRetransmit storeFunction,
MQTTRetrievePacketForRetransmit retrieveFunction,
MQTTClearPacketForRetransmit clearFunction,
MQTTClearAllPacketsForRetransmit clearAllFunction )
AniruddhaKanhere marked this conversation as resolved.
Show resolved Hide resolved
{
MQTTStatus_t status = MQTTSuccess;

if( pContext == NULL )
{
LogError( ( "Argument cannot be NULL: pContext=%p\n",
( void * ) pContext ) );
status = MQTTBadParameter;
}
else if( storeFunction == NULL )
{
LogError( ( "Invalid parameter: storeFunction is NULL" ) );
status = MQTTBadParameter;
}
else if( retrieveFunction == NULL )
{
LogError( ( "Invalid parameter: retrieveFunction is NULL" ) );
status = MQTTBadParameter;
}
else if( clearFunction == NULL )
{
LogError( ( "Invalid parameter: clearFunction is NULL" ) );
status = MQTTBadParameter;
}
else if( clearAllFunction == NULL )
{
LogError( ( "Invalid parameter: clearAllFunction is NULL" ) );
status = MQTTBadParameter;
}
else
{
pContext->storeFunction = storeFunction;
pContext->retrieveFunction = retrieveFunction;
pContext->clearFunction = clearFunction;
pContext->clearAllFunction = clearAllFunction;
}

return status;
}

/*-----------------------------------------------------------*/

MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext,
uint16_t packetId )
{
Expand Down Expand Up @@ -2820,7 +2965,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,

if( ( status == MQTTSuccess ) && ( *pSessionPresent != true ) )
{
handleCleanSession( pContext );
status = handleCleanSession( pContext );
}

if( status == MQTTSuccess )
Expand All @@ -2837,7 +2982,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,

if( ( status == MQTTSuccess ) && ( *pSessionPresent == true ) )
{
/* Resend PUBRELs when reestablishing a session */
/* Resend PUBRELs and PUBLISHES when reestablishing a session */
status = handleUncleanSessionResumption( pContext );
}

Expand Down Expand Up @@ -3560,6 +3705,18 @@ const char * MQTT_Status_strerror( MQTTStatus_t status )
str = "MQTTStatusDisconnectPending";
break;

case MQTTPublishStoreFailed:
str = "MQTTPublishStoreFailed";
break;

case MQTTPublishRetrieveFailed:
str = "MQTTPublishRetrieveFailed";
break;

case MQTTPublishClearAllFailed:
str = "MQTTPublishClearAllFailed";
break;

default:
str = "Invalid MQTT Status code";
break;
Expand Down
32 changes: 32 additions & 0 deletions source/core_mqtt_serializer.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@
*/
#define UINT8_SET_BIT( x, position ) ( ( x ) = ( uint8_t ) ( ( x ) | ( 0x01U << ( position ) ) ) )

/**
* @brief Clear a bit in an 8-bit unsigned integer.
*/
#define UINT8_CLEAR_BIT( x, position ) ( ( x ) = ( uint8_t ) ( ( x ) & ( ~( 0x01U << ( position ) ) ) ) )

/**
* @brief Macro for checking if a bit is set in a 1-byte unsigned int.
*
Expand Down Expand Up @@ -2623,6 +2628,33 @@ MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( TransportRecv_t readFunc,

/*-----------------------------------------------------------*/

MQTTStatus_t MQTT_UpdateDuplicatePublishFlag( uint8_t * pHeader,
bool set )
{
MQTTStatus_t status = MQTTSuccess;

if( pHeader == NULL )
{
status = MQTTBadParameter;
}
else if( ( ( *pHeader ) & MQTT_PACKET_TYPE_PUBLISH ) == 0 )
AniruddhaKanhere marked this conversation as resolved.
Show resolved Hide resolved
{
status = MQTTBadParameter;
}
else if( set == true )
{
UINT8_SET_BIT( *pHeader, MQTT_PUBLISH_FLAG_DUP );
}
else
{
UINT8_CLEAR_BIT( *pHeader, MQTT_PUBLISH_FLAG_DUP );
}

return status;
}

/*-----------------------------------------------------------*/

MQTTStatus_t MQTT_ProcessIncomingPacketTypeAndLength( const uint8_t * pBuffer,
const size_t * pIndex,
MQTTPacketInfo_t * pIncomingPacket )
Expand Down
Loading
Loading