in source/core_mqtt_agent.c [66:304]
static MQTTStatus_t addAwaitingOperation( MQTTAgentContext_t * pAgentContext,
uint16_t packetId,
MQTTAgentCommand_t * pCommand );
/**
* @brief Retrieve an operation from the list of pending acks, and optionally
* remove it from the list.
*
* @param[in] pAgentContext Agent context for the MQTT connection.
* @param[in] incomingPacketId Packet ID of incoming ack.
*
* @return Pointer to stored information about the operation awaiting the ack.
* Returns NULL if the packet ID is zero or original command does not exist.
*/
static MQTTAgentAckInfo_t * getAwaitingOperation( MQTTAgentContext_t * pAgentContext,
uint16_t incomingPacketId );
/**
* @brief Populate the parameters of a #MQTTAgentCommand struct.
*
* @param[in] commandType Type of command. For example, publish or subscribe.
* @param[in] pMqttAgentContext Pointer to MQTT context to use for command.
* @param[in] pMqttInfoParam Pointer to MQTTPublishInfo_t or MQTTSubscribeInfo_t.
* @param[in] commandCompleteCallback Callback for when command completes.
* @param[in] pCommandCompleteCallbackContext Context and necessary structs for command.
* @param[out] pCommand Pointer to initialized command.
*
* @return #MQTTSuccess if all necessary fields for the command are passed,
* else an enumerated error code.
*/
static MQTTStatus_t createCommand( MQTTAgentCommandType_t commandType,
const MQTTAgentContext_t * pMqttAgentContext,
void * pMqttInfoParam,
MQTTAgentCommandCallback_t commandCompleteCallback,
MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
MQTTAgentCommand_t * pCommand );
/**
* @brief Add a command to the global command queue.
*
* @param[in] pAgentContext Agent context for the MQTT connection.
* @param[in] pCommand Pointer to command to copy to queue.
* @param[in] blockTimeMs The maximum amount of time to milliseconds to wait in the
* Blocked state (so not consuming any CPU time) for the command to be posted to the
* queue should the queue already be full.
*
* @return #MQTTSuccess if the command was added to the queue, else an enumerated
* error code.
*/
static MQTTStatus_t addCommandToQueue( const MQTTAgentContext_t * pAgentContext,
MQTTAgentCommand_t * pCommand,
uint32_t blockTimeMs );
/**
* @brief Process a #MQTTAgentCommand struct.
*
* @note This agent does not check existing subscriptions before sending a
* SUBSCRIBE or UNSUBSCRIBE packet. If a subscription already exists, then
* a SUBSCRIBE packet will be sent anyway, and if multiple tasks are subscribed
* to a topic filter, then they will all be unsubscribed after an UNSUBSCRIBE.
*
* @param[in] pMqttAgentContext Agent context for MQTT connection.
* @param[in] pCommand Pointer to command to process.
* @param[out] pEndLoop Whether the command loop should terminate.
*
* @return status of MQTT library API call.
*/
static MQTTStatus_t processCommand( MQTTAgentContext_t * pMqttAgentContext,
MQTTAgentCommand_t * pCommand,
bool * pEndLoop );
/**
* @brief Dispatch incoming publishes and acks to their various handler functions.
*
* @param[in] pMqttContext MQTT Context
* @param[in] pPacketInfo Pointer to incoming packet.
* @param[in] pDeserializedInfo Pointer to deserialized information from
* the incoming packet.
*/
static void mqttEventCallback( MQTTContext_t * pMqttContext,
MQTTPacketInfo_t * pPacketInfo,
MQTTDeserializedInfo_t * pDeserializedInfo );
/**
* @brief Mark a command as complete after receiving an acknowledgment packet.
*
* @param[in] pAgentContext Agent context for the MQTT connection.
* @param[in] pPacketInfo Pointer to incoming packet.
* @param[in] pDeserializedInfo Pointer to deserialized information from
* the incoming packet.
* @param[in] pAckInfo Pointer to stored information for the original operation
* resulting in the received packet.
* @param[in] packetType The type of the incoming packet, either SUBACK, UNSUBACK,
* PUBACK, or PUBCOMP.
*/
static void handleAcks( const MQTTAgentContext_t * pAgentContext,
const MQTTPacketInfo_t * pPacketInfo,
const MQTTDeserializedInfo_t * pDeserializedInfo,
MQTTAgentAckInfo_t * pAckInfo,
uint8_t packetType );
/**
* @brief Retrieve a pointer to an agent context given an MQTT context.
*
* @param[in] pMQTTContext MQTT Context to search for.
*
* @return Pointer to agent context, or NULL.
*/
static MQTTAgentContext_t * getAgentFromMQTTContext( MQTTContext_t * pMQTTContext );
/**
* @brief Helper function for creating a command and adding it to the command
* queue.
*
* @param[in] commandType Type of command.
* @param[in] pMqttAgentContext Handle of the MQTT connection to use.
* @param[in] pCommandCompleteCallbackContext Context and necessary structs for command.
* @param[in] commandCompleteCallback Callback for when command completes.
* @param[in] pMqttInfoParam Pointer to command argument.
* @param[in] blockTimeMs Maximum amount of time in milliseconds to wait (in the
* Blocked state, so not consuming any CPU time) for the command to be posted to the
* MQTT agent should the MQTT agent's event queue be full.
*
* @return #MQTTSuccess if the command was posted to the MQTT agent's event queue.
* Otherwise an enumerated error code.
*/
static MQTTStatus_t createAndAddCommand( MQTTAgentCommandType_t commandType,
const MQTTAgentContext_t * pMqttAgentContext,
void * pMqttInfoParam,
MQTTAgentCommandCallback_t commandCompleteCallback,
MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
uint32_t blockTimeMs );
/**
* @brief Helper function to mark a command as complete and invoke its callback.
* This function calls the releaseCommand callback.
*
* @param[in] pAgentContext Agent context for the MQTT connection.
* @param[in] pCommand Command to complete.
* @param[in] returnCode Return status of command.
* @param[in] pSubackCodes Pointer to suback array, if command is a SUBSCRIBE.
*/
static void concludeCommand( const MQTTAgentContext_t * pAgentContext,
MQTTAgentCommand_t * pCommand,
MQTTStatus_t returnCode,
uint8_t * pSubackCodes );
/**
* @brief Resend QoS 1 and 2 publishes after resuming a session.
*
* @param[in] pMqttAgentContext Agent context for the MQTT connection.
*
* @return #MQTTSuccess if all publishes resent successfully, else error code
* from #MQTT_Publish.
*/
static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext );
/**
* @brief Clears the list of pending acknowledgments by invoking each callback
* with #MQTTRecvFailed either for ALL operations in the list OR only for
* Subscribe/Unsubscribe operations.
*
* @param[in] pMqttAgentContext Agent context of the MQTT connection.
* @param[in] clearOnlySubUnsubEntries Flag indicating whether all entries OR
* entries pertaining to only Subscribe/Unsubscribe operations should be cleaned
* from the list.
*/
static void clearPendingAcknowledgments( MQTTAgentContext_t * pMqttAgentContext,
bool clearOnlySubUnsubEntries );
/**
* @brief Validate an #MQTTAgentContext_t and a #MQTTAgentCommandInfo_t from API
* functions.
*
* @param[in] pMqttAgentContext #MQTTAgentContext_t to validate.
* @param[in] pCommandInfo #MQTTAgentCommandInfo_t to validate.
*
* @return `true` if parameters are valid, else `false`.
*/
static bool validateStruct( const MQTTAgentContext_t * pMqttAgentContext,
const MQTTAgentCommandInfo_t * pCommandInfo );
/**
* @brief Validate the parameters for a CONNECT, SUBSCRIBE, UNSUBSCRIBE
* or PUBLISH.
*
* @param[in] commandType CONNECT, SUBSCRIBE, UNSUBSCRIBE, or PUBLISH.
* @param[in] pParams Parameter structure to validate.
*
* @return `true` if parameter structure is valid, else `false`.
*/
static bool validateParams( MQTTAgentCommandType_t commandType,
const void * pParams );
/**
* @brief Called before accepting any PUBLISH or SUBSCRIBE messages to check
* there is space in the pending ACK list for the outgoing PUBLISH or SUBSCRIBE.
*
* @note Because the MQTT agent is inherently multi threaded, and this function
* is called from the context of the application task and not the MQTT agent
* task, this function can only return a best effort result. It can definitely
* say if there is space for a new pending ACK when the function is called, but
* the case of space being exhausted when the agent executes a command that
* results in an ACK must still be handled.
*
* @param[in] pAgentContext Pointer to the context for the MQTT connection to
* which the PUBLISH or SUBSCRIBE message is to be sent.
*
* @return true if there is space in that MQTT connection's ACK list, otherwise
* false;
*/
static bool isSpaceInPendingAckList( const MQTTAgentContext_t * pAgentContext );
/*-----------------------------------------------------------*/
static bool isSpaceInPendingAckList( const MQTTAgentContext_t * pAgentContext )
{
const MQTTAgentAckInfo_t * pendingAcks;
bool spaceFound = false;
size_t i;
assert( pAgentContext != NULL );
pendingAcks = pAgentContext->pPendingAcks;
/* Are there any open slots? */
for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
{
/* If the packetId is MQTT_PACKET_ID_INVALID then the array space is
* not in use. */
if( pendingAcks[ i ].packetId == MQTT_PACKET_ID_INVALID )
{
spaceFound = true;
break;
}
}
return spaceFound;
}