in source/core_mqtt_agent.c [631:698]
static void mqttEventCallback( MQTTContext_t * pMqttContext,
MQTTPacketInfo_t * pPacketInfo,
MQTTDeserializedInfo_t * pDeserializedInfo )
{
MQTTAgentAckInfo_t * pAckInfo;
uint16_t packetIdentifier = pDeserializedInfo->packetIdentifier;
MQTTAgentContext_t * pAgentContext;
const uint8_t upperNibble = ( uint8_t ) 0xF0;
assert( pMqttContext != NULL );
assert( pPacketInfo != NULL );
pAgentContext = getAgentFromMQTTContext( pMqttContext );
/* This callback executes from within MQTT_ProcessLoop(). Setting this flag
* indicates that the callback executed so the caller of MQTT_ProcessLoop() knows
* it should call it again as there may be more data to process. */
pAgentContext->packetReceivedInLoop = true;
/* Handle incoming publish. The lower 4 bits of the publish packet type is used
* for the dup, QoS, and retain flags. Hence masking out the lower bits to check
* if the packet is publish. */
if( ( pPacketInfo->type & upperNibble ) == MQTT_PACKET_TYPE_PUBLISH )
{
pAgentContext->pIncomingCallback( pAgentContext, packetIdentifier, pDeserializedInfo->pPublishInfo );
}
else
{
/* Handle other packets. */
switch( pPacketInfo->type )
{
case MQTT_PACKET_TYPE_PUBACK:
case MQTT_PACKET_TYPE_PUBCOMP:
case MQTT_PACKET_TYPE_SUBACK:
case MQTT_PACKET_TYPE_UNSUBACK:
pAckInfo = getAwaitingOperation( pAgentContext, packetIdentifier );
if( pAckInfo != NULL )
{
/* This function will also clear the memory associated with
* the ack list entry. */
handleAcks( pAgentContext,
pPacketInfo,
pDeserializedInfo,
pAckInfo,
pPacketInfo->type );
}
else
{
LogError( ( "No operation found matching packet id %u.\n", packetIdentifier ) );
}
break;
/* Nothing to do for these packets since they don't indicate command completion. */
case MQTT_PACKET_TYPE_PUBREC:
case MQTT_PACKET_TYPE_PUBREL:
break;
/* Any other packet type is invalid. */
case MQTT_PACKET_TYPE_PINGRESP:
default:
LogError( ( "Unknown packet type received:(%02x).\n",
pPacketInfo->type ) );
break;
}
}
}