Skip to content

Commit

Permalink
Add opaque struct and functions to serialize it.
Browse files Browse the repository at this point in the history
  • Loading branch information
AniruddhaKanhere committed Oct 24, 2024
1 parent 2eda149 commit 6d7affb
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 265 deletions.
2 changes: 2 additions & 0 deletions .github/.cSpellWords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ DLIBRARY
DNDEBUG
DUNITTEST
DUNITY
getbytesinmqttvec
getpacketid
isystem
lcov
Expand All @@ -34,6 +35,7 @@ NONDET
pylint
pytest
pyyaml
serializemqttvec
sinclude
UNACKED
unpadded
Expand Down
100 changes: 47 additions & 53 deletions source/core_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

/* Include config defaults header to get default values of configs. */
#include "core_mqtt_config_defaults.h"
#include "include/core_mqtt.h"

#ifndef MQTT_PRE_SEND_HOOK

Expand Down Expand Up @@ -92,7 +93,7 @@

struct MQTTVec
{
TransportOutVector_t pVector; /**< Pointer to transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */
TransportOutVector_t pVector; /**< Pointer to transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */
};

/*-----------------------------------------------------------*/
Expand Down Expand Up @@ -450,8 +451,7 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext );
*
* @param[in] pContext Initialized MQTT context.
*
* @return #MQTTPublishClearAllFailed if clearing all the copied publishes fails;
* #MQTTSuccess otherwise.
* @return #MQTTSuccess always otherwise.
*/
static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext );

Expand Down Expand Up @@ -1608,10 +1608,9 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
if( ( ackType == MQTTPuback ) || ( ackType == MQTTPubrec ) )
{
if( ( status == MQTTSuccess ) &&
( pContext->clearFunction != NULL ) &&
( pContext->clearFunction( pContext, packetIdentifier ) != true ) )
( pContext->clearFunction != NULL ) )
{
LogWarn( ( "Failed to clear copied publish on receiving an ack.\n" ) );
pContext->clearFunction( pContext, packetIdentifier );
}
}

Expand Down Expand Up @@ -2222,10 +2221,14 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,

/* store a copy of the publish for retransmission purposes */
if( ( pPublishInfo->qos > MQTTQoS0 ) &&
( pContext->storeFunction != NULL ) &&
( pContext->storeFunction( pContext, packetId, pIoVector, ioVectorLength ) != true ) )
( pContext->storeFunction != NULL ) )
{
status = MQTTPublishStoreFailed;
MQTTVec_t * pMqttVec = ( MQTTVec_t * ) pIoVector;

if( pContext->storeFunction( pContext, packetId, pMqttVec, ioVectorLength ) != true )
{
status = MQTTPublishStoreFailed;
}
}

/* change the value of the dup flag to its original, if it was changed */
Expand Down Expand Up @@ -2524,9 +2527,8 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
uint16_t packetId = MQTT_PACKET_ID_INVALID;
MQTTPublishState_t state = MQTTStateNull;
TransportOutVector_t * pIoVec, * pIoVectIterator;
size_t ioVecCount;
size_t totalMessageLength;
uint8_t * pMqttPacket;

assert( pContext != NULL );

Expand All @@ -2547,42 +2549,31 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
{
cursor = MQTT_STATE_CURSOR_INITIALIZER;

packetId = MQTT_PublishToResend( pContext, &cursor );

if( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( pContext->retrieveFunction( pContext, packetId, &pIoVec, &ioVecCount ) != true ) )
{
status = MQTTPublishRetrieveFailed;
}

/* Resend all the PUBLISH for which PUBACK/PUBREC is not received
* after session is reestablished. */
while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( status == MQTTSuccess ) )
do
{
totalMessageLength = 0;

for( pIoVectIterator = pIoVec; pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ); pIoVectIterator++ )
{
totalMessageLength += pIoVectIterator->iov_len;
}

MQTT_PRE_STATE_UPDATE_HOOK( pContext );
packetId = MQTT_PublishToResend( pContext, &cursor );

if( sendMessageVector( pContext, pIoVec, ioVecCount ) != ( int32_t ) totalMessageLength )
if( packetId != MQTT_PACKET_ID_INVALID )
{
status = MQTTSendFailed;
}
if( pContext->retrieveFunction( pContext, packetId, &pMqttPacket, &totalMessageLength ) != true )
{
status = MQTTPublishRetrieveFailed;
break;
}

MQTT_POST_STATE_UPDATE_HOOK( pContext );
MQTT_PRE_STATE_UPDATE_HOOK( pContext );

packetId = MQTT_PublishToResend( pContext, &cursor );
if( sendBuffer( pContext, pMqttPacket, totalMessageLength ) != ( int32_t ) totalMessageLength )
{
status = MQTTSendFailed;
}

if( pContext->retrieveFunction( pContext, packetId, &pIoVec, &ioVecCount ) != true )
{
status = MQTTPublishRetrieveFailed;
MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
}
} while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( status == MQTTSuccess ) );
}

return status;
Expand All @@ -2591,6 +2582,8 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext )
{
MQTTStatus_t status = MQTTSuccess;
MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
uint16_t packetId = MQTT_PACKET_ID_INVALID;

assert( pContext != NULL );

Expand All @@ -2613,10 +2606,22 @@ static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext )
pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) );
}

if( ( pContext->clearAllFunction != NULL ) &&
( pContext->clearAllFunction( pContext ) != true ) )
if( pContext->clearFunction != NULL )
{
status = MQTTPublishClearAllFailed;
cursor = MQTT_STATE_CURSOR_INITIALIZER;

/* Resend all the PUBLISH for which PUBACK/PUBREC is not received
* after session is reestablished. */
do
{
packetId = MQTT_PublishToResend( pContext, &cursor );

if( packetId != MQTT_PACKET_ID_INVALID )
{
pContext->clearFunction( pContext, packetId );
}
} while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( status == MQTTSuccess ) );
}

return status;
Expand Down Expand Up @@ -2787,8 +2792,7 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,
MQTTStatus_t MQTT_InitRetransmits( MQTTContext_t * pContext,
MQTTStorePacketForRetransmit storeFunction,
MQTTRetrievePacketForRetransmit retrieveFunction,
MQTTClearPacketForRetransmit clearFunction,
MQTTClearAllPacketsForRetransmit clearAllFunction )
MQTTClearPacketForRetransmit clearFunction )
{
MQTTStatus_t status = MQTTSuccess;

Expand All @@ -2813,17 +2817,11 @@ MQTTStatus_t MQTT_InitRetransmits( MQTTContext_t * pContext,
LogError( ( "Invalid parameter: clearFunction is NULL" ) );
status = MQTTBadParameter;
}
else if( clearAllFunction == NULL )
{
LogError( ( "Invalid parameter: clearAllFunction is NULL" ) );
status = MQTTBadParameter;
}
else
{
pContext->storeFunction = storeFunction;
pContext->retrieveFunction = retrieveFunction;
pContext->clearFunction = clearFunction;
pContext->clearAllFunction = clearAllFunction;
}

return status;
Expand Down Expand Up @@ -3718,10 +3716,6 @@ const char * MQTT_Status_strerror( MQTTStatus_t status )
str = "MQTTPublishRetrieveFailed";
break;

case MQTTPublishClearAllFailed:
str = "MQTTPublishClearAllFailed";
break;

default:
str = "Invalid MQTT Status code";
break;
Expand Down
78 changes: 31 additions & 47 deletions source/include/core_mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ struct MQTTPubAckInfo;
struct MQTTContext;
struct MQTTDeserializedInfo;

/**
* @ingroup mqtt_struct_types
* @brief An opaque structure provided by the library to the #MQTTStorePacketForRetransmit function when using #MQTTStorePacketForRetransmit.
*/
typedef struct MQTTVec MQTTVec_t;

/**
* @ingroup mqtt_callback_types
* @brief Application provided function to query the time elapsed since a given
Expand Down Expand Up @@ -107,16 +113,18 @@ typedef void (* MQTTEventCallback_t )( struct MQTTContext * pContext,
*
* @param[in] pContext Initialised MQTT Context.
* @param[in] packetId Outgoing publish packet identifier.
* @param[in] pIoVec Pointer to the outgoing publish packet in form of array of Tansport Vectors.
* @param[in] ioVecCount Number of transport vectors in the pIoVec array.
* @param[in] pMqttVec Pointer to the opaque mqtt vector structure. Users should use MQTT_SerializeMQTTVec
* and MQTT_GetBytesInMQTTVec functions to get the memory required and to serialize the
* MQTTVec_t in the provided memory respectively.
* @param[in] mqttVecCount Number of transport vectors in the pIoVec array.
*
* @return True if the copy is successful else false.
*/
/* @[define_mqtt_retransmitstorepacket] */
typedef bool ( * MQTTStorePacketForRetransmit)( struct MQTTContext * pContext,
uint16_t packetId,
TransportOutVector_t * pIoVec,
size_t ioVecCount );
MQTTVec_t * pMqttVec,
size_t mqttVecCount );
/* @[define_mqtt_retransmitstorepacket] */

/**
Expand All @@ -125,16 +133,19 @@ typedef bool ( * MQTTStorePacketForRetransmit)( struct MQTTContext * pContext,
*
* @param[in] pContext Initialised MQTT Context.
* @param[in] packetId Copied publish packet identifier.
* @param[out] pIoVec Output parameter to store the pointer to the copied publish packet form of array of Tansport Vectors.
* @param[out] ioVecCount Output parameter to store the number of transport vectors in the pIoVec array.
* @param[out] pSerializedMqttVec Output parameter to store the pointer to the serialized MQTTVec_t
* using MQTT_SerializeMQTTVec.
* @param[out] pSerializedMqttVecLen Output parameter to return the number of bytes used to store the
* MQTTVec_t. This value should be the same as the one received from MQTT_GetBytesInMQTTVec
* when storing the packet.
*
* @return True if the retreive is successful else false.
*/
/* @[define_mqtt_retransmitretrievepacket] */
typedef bool ( * MQTTRetrievePacketForRetransmit)( struct MQTTContext * pContext,
uint16_t packetId,
TransportOutVector_t ** pIoVec,
size_t * ioVecCount );
uint8_t ** pSerializedMqttVec,
size_t * pSerializedMqttVecLen );
/* @[define_mqtt_retransmitretrievepacket] */

/**
Expand All @@ -147,22 +158,10 @@ typedef bool ( * MQTTRetrievePacketForRetransmit)( struct MQTTContext * pContext
* @return True if the clear is successful else false.
*/
/* @[define_mqtt_retransmitclearpacket] */
typedef bool (* MQTTClearPacketForRetransmit)( struct MQTTContext * pContext,
typedef void (* MQTTClearPacketForRetransmit)( struct MQTTContext * pContext,
uint16_t packetId );
/* @[define_mqtt_retransmitclearpacket] */

/**
* @brief User defined callback used to clear all copied publish packets. Used to
* when connecting with a clean session.
*
* @param[in] pContext Initialised MQTT Context.
*
* @return True if the clear all is successful else false.
*/
/* @[define_mqtt_retransmitclearallpackets] */
typedef bool (* MQTTClearAllPacketsForRetransmit)( struct MQTTContext * pContext );
/* @[define_mqtt_retransmitclearallpackets] */

/**
* @ingroup mqtt_enum_types
* @brief Values indicating if an MQTT connection exists.
Expand Down Expand Up @@ -324,11 +323,6 @@ typedef struct MQTTContext
* @brief User defined API used to clear a particular copied publish packet.
*/
MQTTClearPacketForRetransmit clearFunction;

/**
* @brief User defined API used to clear all copied publish packets.
*/
MQTTClearAllPacketsForRetransmit clearAllFunction;
} MQTTContext_t;

/**
Expand All @@ -343,12 +337,6 @@ typedef struct MQTTDeserializedInfo
MQTTStatus_t deserializationResult; /**< @brief Return code of deserialization. */
} MQTTDeserializedInfo_t;

/**
* @ingroup mqtt_struct_types
* @brief An opaque structure provided by the library to the #MQTTStorePacketForRetransmit function when using #MQTTStorePacketForRetransmit.
*/
typedef struct MQTTVec MQTTVec_t;

/**
* @brief Initialize an MQTT context.
*
Expand Down Expand Up @@ -512,7 +500,6 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,
* @param[in] storeFunction User defined API used to store outgoing publishes.
* @param[in] retrieveFunction User defined API used to retreive a copied publish for resend operation.
* @param[in] clearFunction User defined API used to clear a particular copied publish packet.
* @param[in] clearAllFunction User defined API used to clear a particular copied publish packet.
*
* @return #MQTTBadParameter if invalid parameters are passed;
* #MQTTSuccess otherwise.
Expand All @@ -535,7 +522,7 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,
* // User defined callback used to store outgoing publishes
* bool publishStoreCallback(struct MQTTContext* pContext,
* uint16_t packetId,
* TransportOutVector_t* pIoVec,
* MQTTVec_t* pIoVec,
* size_t ioVecCount);
* // User defined callback used to retreive a copied publish for resend operation
* bool publishRetrieveCallback(struct MQTTContext* pContext,
Expand Down Expand Up @@ -594,8 +581,7 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,
MQTTStatus_t MQTT_InitRetransmits( MQTTContext_t * pContext,
MQTTStorePacketForRetransmit storeFunction,
MQTTRetrievePacketForRetransmit retrieveFunction,
MQTTClearPacketForRetransmit clearFunction,
MQTTClearAllPacketsForRetransmit clearAllFunction );
MQTTClearPacketForRetransmit clearFunction );
/* @[declare_mqtt_initretransmits] */

/**
Expand Down Expand Up @@ -657,10 +643,8 @@ MQTTStatus_t MQTT_CheckConnectStatus( MQTTContext_t * pContext );
* #MQTTStatusConnected if the connection is already established
* #MQTTStatusDisconnectPending if the user is expected to call MQTT_Disconnect
* before calling any other API
* MQTTPublishClearAllFailed if on a clean session connection, clearing all the
* previously copied publishes fails
* MQTTPublishRetrieveFailed if on an unclean session connection, the copied
* publishes are not retrieved successfuly for retransmission
* publishes are not retrieved successfully for retransmission
* #MQTTSuccess otherwise.
*
* @note This API may spend more time than provided in the timeoutMS parameters in
Expand Down Expand Up @@ -1247,24 +1231,24 @@ const char * MQTT_Status_strerror( MQTTStatus_t status );
/* @[declare_mqtt_status_strerror] */

/**
* @brief Get the bytes in an array of #MQTTVec_t which can store the whole array as a an MQTT packet when calling MQTT_SerializeMQTTVec( void * pAllocatedMem, MQTTVec_t *pVec, size_t len ) function.
* @brief Get the bytes in an array of #MQTTVec which can store the whole array as a an MQTT packet when calling MQTT_SerializeMQTTVec( void * pAllocatedMem, MQTTVec_t *pVec, size_t len ) function.
*
* @param[in] pVec The #MQTTVec_t array.
* @param[in] len The length of the #MQTTVec_t array.
* @param[in] pVec The #MQTTVec array.
* @param[in] len The length of the #MQTTVec array.
*
* @return The bytes in the provided MQTTVec_t array which can then be used to set aside memory to be used with MQTT_SerializeMQTTVec( void * pAllocatedMem, MQTTVec_t *pVec, size_t len ) function.
* @return The bytes in the provided #MQTTVec array which can then be used to set aside memory to be used with MQTT_SerializeMQTTVec( void * pAllocatedMem, MQTTVec_t *pVec, size_t len ) function.
*/
/* @[declare_mqtt_getbytesinmqttvec] */
size_t MQTT_GetBytesInMQTTVec( MQTTVec_t * pVec,
size_t len );
/* @[declare_mqtt_getbytesinmqttvec] */

/**
* @brief Serialize the bytes in an array of #MQTTVec_t in the provided \p pAllocatedMem
* @brief Serialize the bytes in an array of #MQTTVec in the provided \p pAllocatedMem
*
* @param[in] pAllocatedMem Memory in which to serialize the data in the #MQTTVec_t array. It must be of size provided by MQTT_GetBytesInMQTTVec( MQTTVec_t *pVec, size_t len ).
* @param[in] pVec The #MQTTVec_t array.
* @param[in] len The length of the #MQTTVec_t array.
* @param[in] pAllocatedMem Memory in which to serialize the data in the #MQTTVec array. It must be of size provided by MQTT_GetBytesInMQTTVec( MQTTVec_t *pVec, size_t len ).
* @param[in] pVec The #MQTTVec array.
* @param[in] len The length of the #MQTTVec array.
*/
/* @[declare_mqtt_serializemqttvec] */
void MQTT_SerializeMQTTVec( uint8_t * pAllocatedMem,
Expand Down
Loading

0 comments on commit 6d7affb

Please sign in to comment.