in source/core_mqtt.c [53:421]
static int32_t sendPacket( MQTTContext_t * pContext,
const uint8_t * pBufferToSend,
size_t bytesToSend );
/**
* @brief Calculate the interval between two millisecond timestamps, including
* when the later value has overflowed.
*
* @note In C, the operands are promoted to signed integers in subtraction.
* Using this function avoids the need to cast the result of subtractions back
* to uint32_t.
*
* @param[in] later The later time stamp, in milliseconds.
* @param[in] start The earlier time stamp, in milliseconds.
*
* @return later - start.
*/
static uint32_t calculateElapsedTime( uint32_t later,
uint32_t start );
/**
* @brief Convert a byte indicating a publish ack type to an #MQTTPubAckType_t.
*
* @param[in] packetType First byte of fixed header.
*
* @return Type of ack.
*/
static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType );
/**
* @brief Receive bytes into the network buffer.
*
* @param[in] pContext Initialized MQTT Context.
* @param[in] bytesToRecv Number of bytes to receive.
*
* @note This operation calls the transport receive function
* repeatedly to read bytes from the network until either:
* 1. The requested number of bytes @a bytesToRecv are read.
* OR
* 2. No data is received from the network for MQTT_RECV_POLLING_TIMEOUT_MS duration.
*
* OR
* 3. There is an error in reading from the network.
*
*
* @return Number of bytes received, or negative number on network error.
*/
static int32_t recvExact( const MQTTContext_t * pContext,
size_t bytesToRecv );
/**
* @brief Discard a packet from the transport interface.
*
* @param[in] pContext MQTT Connection context.
* @param[in] remainingLength Remaining length of the packet to dump.
* @param[in] timeoutMs Time remaining to discard the packet.
*
* @return #MQTTRecvFailed or #MQTTNoDataAvailable.
*/
static MQTTStatus_t discardPacket( const MQTTContext_t * pContext,
size_t remainingLength,
uint32_t timeoutMs );
/**
* @brief Receive a packet from the transport interface.
*
* @param[in] pContext MQTT Connection context.
* @param[in] incomingPacket packet struct with remaining length.
* @param[in] remainingTimeMs Time remaining to receive the packet.
*
* @return #MQTTSuccess or #MQTTRecvFailed.
*/
static MQTTStatus_t receivePacket( const MQTTContext_t * pContext,
MQTTPacketInfo_t incomingPacket,
uint32_t remainingTimeMs );
/**
* @brief Get the correct ack type to send.
*
* @param[in] state Current state of publish.
*
* @return Packet Type byte of PUBACK, PUBREC, PUBREL, or PUBCOMP if one of
* those should be sent, else 0.
*/
static uint8_t getAckTypeToSend( MQTTPublishState_t state );
/**
* @brief Send acks for received QoS 1/2 publishes.
*
* @param[in] pContext MQTT Connection context.
* @param[in] packetId packet ID of original PUBLISH.
* @param[in] publishState Current publish state in record.
*
* @return #MQTTSuccess, #MQTTIllegalState or #MQTTSendFailed.
*/
static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext,
uint16_t packetId,
MQTTPublishState_t publishState );
/**
* @brief Send a keep alive PINGREQ if the keep alive interval has elapsed.
*
* @param[in] pContext Initialized MQTT Context.
*
* @return #MQTTKeepAliveTimeout if a PINGRESP is not received in time,
* #MQTTSendFailed if the PINGREQ cannot be sent, or #MQTTSuccess.
*/
static MQTTStatus_t handleKeepAlive( MQTTContext_t * pContext );
/**
* @brief Handle received MQTT PUBLISH packet.
*
* @param[in] pContext MQTT Connection context.
* @param[in] pIncomingPacket Incoming packet.
*
* @return MQTTSuccess, MQTTIllegalState or deserialization error.
*/
static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
MQTTPacketInfo_t * pIncomingPacket );
/**
* @brief Handle received MQTT publish acks.
*
* @param[in] pContext MQTT Connection context.
* @param[in] pIncomingPacket Incoming packet.
*
* @return MQTTSuccess, MQTTIllegalState, or deserialization error.
*/
static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
MQTTPacketInfo_t * pIncomingPacket );
/**
* @brief Handle received MQTT ack.
*
* @param[in] pContext MQTT Connection context.
* @param[in] pIncomingPacket Incoming packet.
* @param[in] manageKeepAlive Flag indicating if PINGRESPs should not be given
* to the application
*
* @return MQTTSuccess, MQTTIllegalState, or deserialization error.
*/
static MQTTStatus_t handleIncomingAck( MQTTContext_t * pContext,
MQTTPacketInfo_t * pIncomingPacket,
bool manageKeepAlive );
/**
* @brief Run a single iteration of the receive loop.
*
* @param[in] pContext MQTT Connection context.
* @param[in] remainingTimeMs Remaining time for the loop in milliseconds.
* @param[in] manageKeepAlive Flag indicating if keep alive should be handled.
*
* @return #MQTTRecvFailed if a network error occurs during reception;
* #MQTTSendFailed if a network error occurs while sending an ACK or PINGREQ;
* #MQTTBadResponse if an invalid packet is received;
* #MQTTKeepAliveTimeout if the server has not sent a PINGRESP before
* #MQTT_PINGRESP_TIMEOUT_MS milliseconds;
* #MQTTIllegalState if an incoming QoS 1/2 publish or ack causes an
* invalid transition for the internal state machine;
* #MQTTSuccess on success.
*/
static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext,
uint32_t remainingTimeMs,
bool manageKeepAlive );
/**
* @brief Validates parameters of #MQTT_Subscribe or #MQTT_Unsubscribe.
*
* @param[in] pContext Initialized MQTT context.
* @param[in] pSubscriptionList List of MQTT subscription info.
* @param[in] subscriptionCount The number of elements in pSubscriptionList.
* @param[in] packetId Packet identifier.
*
* @return #MQTTBadParameter if invalid parameters are passed;
* #MQTTSuccess otherwise.
*/
static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pContext,
const MQTTSubscribeInfo_t * pSubscriptionList,
size_t subscriptionCount,
uint16_t packetId );
/**
* @brief Send serialized publish packet using transport send.
*
* @brief param[in] pContext Initialized MQTT context.
* @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
* @brief param[in] headerSize Header size of the PUBLISH packet.
*
* @return #MQTTSendFailed if transport write failed;
* #MQTTSuccess otherwise.
*/
static MQTTStatus_t sendPublish( MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
size_t headerSize );
/**
* @brief Receives a CONNACK MQTT packet.
*
* @param[in] pContext Initialized MQTT context.
* @param[in] timeoutMs Timeout for waiting for CONNACK packet.
* @param[in] cleanSession Clean session flag set by application.
* @param[out] pIncomingPacket List of MQTT subscription info.
* @param[out] pSessionPresent Whether a previous session was present.
* Only relevant if not establishing a clean session.
*
* @return #MQTTBadResponse if a bad response is received;
* #MQTTNoDataAvailable if no data available for transport recv;
* ##MQTTRecvFailed if transport recv failed;
* #MQTTSuccess otherwise.
*/
static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext,
uint32_t timeoutMs,
bool cleanSession,
MQTTPacketInfo_t * pIncomingPacket,
bool * pSessionPresent );
/**
* @brief Resends pending acks for a re-established MQTT session, or
* clears existing state records for a clean session.
*
* @param[in] pContext Initialized MQTT context.
* @param[in] sessionPresent Session present flag received from the MQTT broker.
*
* @return #MQTTSendFailed if transport send during resend failed;
* #MQTTSuccess otherwise.
*/
static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext,
bool sessionPresent );
/**
* @brief Serializes a PUBLISH message.
*
* @brief param[in] pContext Initialized MQTT context.
* @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
* @brief param[in] packetId Packet Id of the publish packet.
* @brief param[out] pHeaderSize Size of the serialized PUBLISH header.
*
* @return #MQTTNoMemory if pBuffer is too small to hold the MQTT packet;
* #MQTTBadParameter if invalid parameters are passed;
* #MQTTSuccess otherwise.
*/
static MQTTStatus_t serializePublish( const MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
uint16_t packetId,
size_t * const pHeaderSize );
/**
* @brief Function to validate #MQTT_Publish parameters.
*
* @brief param[in] pContext Initialized MQTT context.
* @brief param[in] pPublishInfo MQTT PUBLISH packet parameters.
* @brief param[in] packetId Packet Id for the MQTT PUBLISH packet.
*
* @return #MQTTBadParameter if invalid parameters are passed;
* #MQTTSuccess otherwise.
*/
static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
uint16_t packetId );
/**
* @brief Performs matching for special cases when a topic filter ends
* with a wildcard character.
*
* When the topic name has been consumed but there are remaining characters to
* to match in topic filter, this function handles the following 2 cases:
* - When the topic filter ends with "/+" or "/#" characters, but the topic
* name only ends with '/'.
* - When the topic filter ends with "/#" characters, but the topic name
* ends at the parent level.
*
* @note This function ASSUMES that the topic name been consumed in linear
* matching with the topic filer, but the topic filter has remaining characters
* to be matched.
*
* @param[in] pTopicFilter The topic filter containing the wildcard.
* @param[in] topicFilterLength Length of the topic filter being examined.
* @param[in] filterIndex Index of the topic filter being examined.
*
* @return Returns whether the topic filter and the topic name match.
*/
static bool matchEndWildcardsSpecialCases( const char * pTopicFilter,
uint16_t topicFilterLength,
uint16_t filterIndex );
/**
* @brief Attempt to match topic name with a topic filter starting with a wildcard.
*
* If the topic filter starts with a '+' (single-level) wildcard, the function
* advances the @a pNameIndex by a level in the topic name.
* If the topic filter starts with a '#' (multi-level) wildcard, the function
* concludes that both the topic name and topic filter match.
*
* @param[in] pTopicName The topic name to match.
* @param[in] topicNameLength Length of the topic name.
* @param[in] pTopicFilter The topic filter to match.
* @param[in] topicFilterLength Length of the topic filter.
* @param[in,out] pNameIndex Current index in the topic name being examined. It is
* advanced by one level for `+` wildcards.
* @param[in, out] pFilterIndex Current index in the topic filter being examined.
* It is advanced to position of '/' level separator for '+' wildcard.
* @param[out] pMatch Whether the topic filter and topic name match.
*
* @return `true` if the caller of this function should exit; `false` if the
* caller should continue parsing the topics.
*/
static bool matchWildcards( const char * pTopicName,
uint16_t topicNameLength,
const char * pTopicFilter,
uint16_t topicFilterLength,
uint16_t * pNameIndex,
uint16_t * pFilterIndex,
bool * pMatch );
/**
* @brief Match a topic name and topic filter allowing the use of wildcards.
*
* @param[in] pTopicName The topic name to check.
* @param[in] topicNameLength Length of the topic name.
* @param[in] pTopicFilter The topic filter to check.
* @param[in] topicFilterLength Length of topic filter.
*
* @return `true` if the topic name and topic filter match; `false` otherwise.
*/
static bool matchTopicFilter( const char * pTopicName,
uint16_t topicNameLength,
const char * pTopicFilter,
uint16_t topicFilterLength );
/*-----------------------------------------------------------*/
static bool matchEndWildcardsSpecialCases( const char * pTopicFilter,
uint16_t topicFilterLength,
uint16_t filterIndex )
{
bool matchFound = false;
assert( pTopicFilter != NULL );
assert( topicFilterLength != 0 );
/* Check if the topic filter has 2 remaining characters and it ends in
* "/#". This check handles the case to match filter "sport/#" with topic
* "sport". The reason is that the '#' wildcard represents the parent and
* any number of child levels in the topic name.*/
if( ( topicFilterLength >= 3U ) &&
( filterIndex == ( topicFilterLength - 3U ) ) &&
( pTopicFilter[ filterIndex + 1U ] == '/' ) &&
( pTopicFilter[ filterIndex + 2U ] == '#' ) )
{
matchFound = true;
}
/* Check if the next character is "#" or "+" and the topic filter ends in
* "/#" or "/+". This check handles the cases to match:
*
* - Topic filter "sport/+" with topic "sport/".
* - Topic filter "sport/#" with topic "sport/".
*/
if( ( filterIndex == ( topicFilterLength - 2U ) ) &&
( pTopicFilter[ filterIndex ] == '/' ) )
{
/* Check that the last character is a wildcard. */
matchFound = ( ( pTopicFilter[ filterIndex + 1U ] == '+' ) ||
( pTopicFilter[ filterIndex + 1U ] == '#' ) ) ? true : false;
}
return matchFound;
}