in source/core_mqtt_agent.c [531:593]
static MQTTStatus_t processCommand( MQTTAgentContext_t * pMqttAgentContext,
MQTTAgentCommand_t * pCommand,
bool * pEndLoop )
{
const MQTTAgentCommandFunc_t pCommandFunctionTable[ NUM_COMMANDS ] = MQTT_AGENT_FUNCTION_TABLE;
MQTTStatus_t operationStatus = MQTTSuccess;
bool ackAdded = false;
MQTTAgentCommandFunc_t commandFunction = NULL;
void * pCommandArgs = NULL;
const uint32_t processLoopTimeoutMs = 0U;
MQTTAgentCommandFuncReturns_t commandOutParams = { 0 };
assert( pMqttAgentContext != NULL );
assert( pEndLoop != NULL );
if( pCommand != NULL )
{
assert( pCommand->commandType < NUM_COMMANDS );
commandFunction = pCommandFunctionTable[ pCommand->commandType ];
pCommandArgs = pCommand->pArgs;
}
else
{
commandFunction = pCommandFunctionTable[ NONE ];
}
operationStatus = commandFunction( pMqttAgentContext, pCommandArgs, &commandOutParams );
if( ( operationStatus == MQTTSuccess ) &&
commandOutParams.addAcknowledgment &&
( commandOutParams.packetId != MQTT_PACKET_ID_INVALID ) )
{
operationStatus = addAwaitingOperation( pMqttAgentContext, commandOutParams.packetId, pCommand );
ackAdded = ( operationStatus == MQTTSuccess );
}
if( ( pCommand != NULL ) && ( ackAdded != true ) )
{
/* The command is complete, call the callback. */
concludeCommand( pMqttAgentContext, pCommand, operationStatus, NULL );
}
/* Run the process loop if there were no errors and the MQTT connection
* still exists. */
if( ( operationStatus == MQTTSuccess ) && commandOutParams.runProcessLoop )
{
do
{
pMqttAgentContext->packetReceivedInLoop = false;
if( ( operationStatus == MQTTSuccess ) &&
( pMqttAgentContext->mqttContext.connectStatus == MQTTConnected ) )
{
operationStatus = MQTT_ProcessLoop( &( pMqttAgentContext->mqttContext ), processLoopTimeoutMs );
}
} while( pMqttAgentContext->packetReceivedInLoop );
}
/* Set the flag to break from the command loop. */
*pEndLoop = ( commandOutParams.endLoop || ( operationStatus != MQTTSuccess ) );
return operationStatus;
}