static MQTTStatus_t addAwaitingOperation()

in source/core_mqtt_agent.c [66:304]


static MQTTStatus_t addAwaitingOperation( MQTTAgentContext_t * pAgentContext,
                                          uint16_t packetId,
                                          MQTTAgentCommand_t * pCommand );

/**
 * @brief Retrieve an operation from the list of pending acks, and optionally
 * remove it from the list.
 *
 * @param[in] pAgentContext Agent context for the MQTT connection.
 * @param[in] incomingPacketId Packet ID of incoming ack.
 *
 * @return Pointer to stored information about the operation awaiting the ack.
 * Returns NULL if the packet ID is zero or original command does not exist.
 */
static MQTTAgentAckInfo_t * getAwaitingOperation( MQTTAgentContext_t * pAgentContext,
                                                  uint16_t incomingPacketId );

/**
 * @brief Populate the parameters of a #MQTTAgentCommand struct.
 *
 * @param[in] commandType Type of command.  For example, publish or subscribe.
 * @param[in] pMqttAgentContext Pointer to MQTT context to use for command.
 * @param[in] pMqttInfoParam Pointer to MQTTPublishInfo_t or MQTTSubscribeInfo_t.
 * @param[in] commandCompleteCallback Callback for when command completes.
 * @param[in] pCommandCompleteCallbackContext Context and necessary structs for command.
 * @param[out] pCommand Pointer to initialized command.
 *
 * @return #MQTTSuccess if all necessary fields for the command are passed,
 * else an enumerated error code.
 */
static MQTTStatus_t createCommand( MQTTAgentCommandType_t commandType,
                                   const MQTTAgentContext_t * pMqttAgentContext,
                                   void * pMqttInfoParam,
                                   MQTTAgentCommandCallback_t commandCompleteCallback,
                                   MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
                                   MQTTAgentCommand_t * pCommand );

/**
 * @brief Add a command to the global command queue.
 *
 * @param[in] pAgentContext Agent context for the MQTT connection.
 * @param[in] pCommand Pointer to command to copy to queue.
 * @param[in] blockTimeMs The maximum amount of time to milliseconds to wait in the
 * Blocked state (so not consuming any CPU time) for the command to be posted to the
 * queue should the queue already be full.
 *
 * @return #MQTTSuccess if the command was added to the queue, else an enumerated
 * error code.
 */
static MQTTStatus_t addCommandToQueue( const MQTTAgentContext_t * pAgentContext,
                                       MQTTAgentCommand_t * pCommand,
                                       uint32_t blockTimeMs );

/**
 * @brief Process a #MQTTAgentCommand struct.
 *
 * @note This agent does not check existing subscriptions before sending a
 * SUBSCRIBE or UNSUBSCRIBE packet. If a subscription already exists, then
 * a SUBSCRIBE packet will be sent anyway, and if multiple tasks are subscribed
 * to a topic filter, then they will all be unsubscribed after an UNSUBSCRIBE.
 *
 * @param[in] pMqttAgentContext Agent context for MQTT connection.
 * @param[in] pCommand Pointer to command to process.
 * @param[out] pEndLoop Whether the command loop should terminate.
 *
 * @return status of MQTT library API call.
 */
static MQTTStatus_t processCommand( MQTTAgentContext_t * pMqttAgentContext,
                                    MQTTAgentCommand_t * pCommand,
                                    bool * pEndLoop );

/**
 * @brief Dispatch incoming publishes and acks to their various handler functions.
 *
 * @param[in] pMqttContext MQTT Context
 * @param[in] pPacketInfo Pointer to incoming packet.
 * @param[in] pDeserializedInfo Pointer to deserialized information from
 * the incoming packet.
 */
static void mqttEventCallback( MQTTContext_t * pMqttContext,
                               MQTTPacketInfo_t * pPacketInfo,
                               MQTTDeserializedInfo_t * pDeserializedInfo );

/**
 * @brief Mark a command as complete after receiving an acknowledgment packet.
 *
 * @param[in] pAgentContext Agent context for the MQTT connection.
 * @param[in] pPacketInfo Pointer to incoming packet.
 * @param[in] pDeserializedInfo Pointer to deserialized information from
 * the incoming packet.
 * @param[in] pAckInfo Pointer to stored information for the original operation
 * resulting in the received packet.
 * @param[in] packetType The type of the incoming packet, either SUBACK, UNSUBACK,
 * PUBACK, or PUBCOMP.
 */
static void handleAcks( const MQTTAgentContext_t * pAgentContext,
                        const MQTTPacketInfo_t * pPacketInfo,
                        const MQTTDeserializedInfo_t * pDeserializedInfo,
                        MQTTAgentAckInfo_t * pAckInfo,
                        uint8_t packetType );

/**
 * @brief Retrieve a pointer to an agent context given an MQTT context.
 *
 * @param[in] pMQTTContext MQTT Context to search for.
 *
 * @return Pointer to agent context, or NULL.
 */
static MQTTAgentContext_t * getAgentFromMQTTContext( MQTTContext_t * pMQTTContext );

/**
 * @brief Helper function for creating a command and adding it to the command
 * queue.
 *
 * @param[in] commandType Type of command.
 * @param[in] pMqttAgentContext Handle of the MQTT connection to use.
 * @param[in] pCommandCompleteCallbackContext Context and necessary structs for command.
 * @param[in] commandCompleteCallback Callback for when command completes.
 * @param[in] pMqttInfoParam Pointer to command argument.
 * @param[in] blockTimeMs Maximum amount of time in milliseconds to wait (in the
 * Blocked state, so not consuming any CPU time) for the command to be posted to the
 * MQTT agent should the MQTT agent's event queue be full.
 *
 * @return #MQTTSuccess if the command was posted to the MQTT agent's event queue.
 * Otherwise an enumerated error code.
 */
static MQTTStatus_t createAndAddCommand( MQTTAgentCommandType_t commandType,
                                         const MQTTAgentContext_t * pMqttAgentContext,
                                         void * pMqttInfoParam,
                                         MQTTAgentCommandCallback_t commandCompleteCallback,
                                         MQTTAgentCommandContext_t * pCommandCompleteCallbackContext,
                                         uint32_t blockTimeMs );

/**
 * @brief Helper function to mark a command as complete and invoke its callback.
 * This function calls the releaseCommand callback.
 *
 * @param[in] pAgentContext Agent context for the MQTT connection.
 * @param[in] pCommand Command to complete.
 * @param[in] returnCode Return status of command.
 * @param[in] pSubackCodes Pointer to suback array, if command is a SUBSCRIBE.
 */
static void concludeCommand( const MQTTAgentContext_t * pAgentContext,
                             MQTTAgentCommand_t * pCommand,
                             MQTTStatus_t returnCode,
                             uint8_t * pSubackCodes );

/**
 * @brief Resend QoS 1 and 2 publishes after resuming a session.
 *
 * @param[in] pMqttAgentContext Agent context for the MQTT connection.
 *
 * @return #MQTTSuccess if all publishes resent successfully, else error code
 * from #MQTT_Publish.
 */
static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext );

/**
 * @brief Clears the list of pending acknowledgments by invoking each callback
 * with #MQTTRecvFailed either for ALL operations in the list OR only for
 * Subscribe/Unsubscribe operations.
 *
 * @param[in] pMqttAgentContext Agent context of the MQTT connection.
 * @param[in] clearOnlySubUnsubEntries Flag indicating whether all entries OR
 * entries pertaining to only Subscribe/Unsubscribe operations should be cleaned
 * from the list.
 */
static void clearPendingAcknowledgments( MQTTAgentContext_t * pMqttAgentContext,
                                         bool clearOnlySubUnsubEntries );

/**
 * @brief Validate an #MQTTAgentContext_t and a #MQTTAgentCommandInfo_t from API
 * functions.
 *
 * @param[in] pMqttAgentContext #MQTTAgentContext_t to validate.
 * @param[in] pCommandInfo #MQTTAgentCommandInfo_t to validate.
 *
 * @return `true` if parameters are valid, else `false`.
 */
static bool validateStruct( const MQTTAgentContext_t * pMqttAgentContext,
                            const MQTTAgentCommandInfo_t * pCommandInfo );

/**
 * @brief Validate the parameters for a CONNECT, SUBSCRIBE, UNSUBSCRIBE
 * or PUBLISH.
 *
 * @param[in] commandType CONNECT, SUBSCRIBE, UNSUBSCRIBE, or PUBLISH.
 * @param[in] pParams Parameter structure to validate.
 *
 * @return `true` if parameter structure is valid, else `false`.
 */
static bool validateParams( MQTTAgentCommandType_t commandType,
                            const void * pParams );

/**
 * @brief Called before accepting any PUBLISH or SUBSCRIBE messages to check
 * there is space in the pending ACK list for the outgoing PUBLISH or SUBSCRIBE.
 *
 * @note Because the MQTT agent is inherently multi threaded, and this function
 * is called from the context of the application task and not the MQTT agent
 * task, this function can only return a best effort result.  It can definitely
 * say if there is space for a new pending ACK when the function is called, but
 * the case of space being exhausted when the agent executes a command that
 * results in an ACK must still be handled.
 *
 * @param[in] pAgentContext Pointer to the context for the MQTT connection to
 * which the PUBLISH or SUBSCRIBE message is to be sent.
 *
 * @return true if there is space in that MQTT connection's ACK list, otherwise
 * false;
 */
static bool isSpaceInPendingAckList( const MQTTAgentContext_t * pAgentContext );

/*-----------------------------------------------------------*/

static bool isSpaceInPendingAckList( const MQTTAgentContext_t * pAgentContext )
{
    const MQTTAgentAckInfo_t * pendingAcks;
    bool spaceFound = false;
    size_t i;

    assert( pAgentContext != NULL );

    pendingAcks = pAgentContext->pPendingAcks;

    /* Are there any open slots? */
    for( i = 0; i < MQTT_AGENT_MAX_OUTSTANDING_ACKS; i++ )
    {
        /* If the packetId is MQTT_PACKET_ID_INVALID then the array space is
         * not in use. */
        if( pendingAcks[ i ].packetId == MQTT_PACKET_ID_INVALID )
        {
            spaceFound = true;
            break;
        }
    }

    return spaceFound;
}