in source/core_mqtt.c [1038:1148]
static MQTTStatus_t handleIncomingPublish( MQTTContext_t * pContext,
MQTTPacketInfo_t * pIncomingPacket )
{
MQTTStatus_t status = MQTTBadParameter;
MQTTPublishState_t publishRecordState = MQTTStateNull;
uint16_t packetIdentifier = 0U;
MQTTPublishInfo_t publishInfo;
MQTTDeserializedInfo_t deserializedInfo;
bool duplicatePublish = false;
assert( pContext != NULL );
assert( pIncomingPacket != NULL );
assert( pContext->appCallback != NULL );
status = MQTT_DeserializePublish( pIncomingPacket, &packetIdentifier, &publishInfo );
LogInfo( ( "De-serialized incoming PUBLISH packet: DeserializerResult=%s.",
MQTT_Status_strerror( status ) ) );
if( status == MQTTSuccess )
{
status = MQTT_UpdateStatePublish( pContext,
packetIdentifier,
MQTT_RECEIVE,
publishInfo.qos,
&publishRecordState );
if( status == MQTTSuccess )
{
LogInfo( ( "State record updated. New state=%s.",
MQTT_State_strerror( publishRecordState ) ) );
}
/* Different cases in which an incoming publish with duplicate flag is
* handled are as listed below.
* 1. No collision - This is the first instance of the incoming publish
* packet received or an earlier received packet state is lost. This
* will be handled as a new incoming publish for both QoS1 and QoS2
* publishes.
* 2. Collision - The incoming packet was received before and a state
* record is present in the state engine. For QoS1 and QoS2 publishes
* this case can happen at 2 different cases and handling is
* different.
* a. QoS1 - If a PUBACK is not successfully sent for the incoming
* publish due to a connection issue, it can result in broker
* sending out a duplicate publish with dup flag set, when a
* session is reestablished. It can result in a collision in
* state engine. This will be handled by processing the incoming
* publish as a new publish ignoring the
* #MQTTStateCollision status from the state engine. The publish
* data is not passed to the application.
* b. QoS2 - If a PUBREC is not successfully sent for the incoming
* publish or the PUBREC sent is not successfully received by the
* broker due to a connection issue, it can result in broker
* sending out a duplicate publish with dup flag set, when a
* session is reestablished. It can result in a collision in
* state engine. This will be handled by ignoring the
* #MQTTStateCollision status from the state engine. The publish
* data is not passed to the application. */
else if( status == MQTTStateCollision )
{
status = MQTTSuccess;
duplicatePublish = true;
/* Calculate the state for the ack packet that needs to be sent out
* for the duplicate incoming publish. */
publishRecordState = MQTT_CalculateStatePublish( MQTT_RECEIVE,
publishInfo.qos );
LogDebug( ( "Incoming publish packet with packet id %hu already exists.",
( unsigned short ) packetIdentifier ) );
if( publishInfo.dup == false )
{
LogError( ( "DUP flag is 0 for duplicate packet (MQTT-3.3.1.-1)." ) );
}
}
else
{
LogError( ( "Error in updating publish state for incoming publish with packet id %hu."
" Error is %s",
( unsigned short ) packetIdentifier,
MQTT_Status_strerror( status ) ) );
}
}
if( status == MQTTSuccess )
{
/* Set fields of deserialized struct. */
deserializedInfo.packetIdentifier = packetIdentifier;
deserializedInfo.pPublishInfo = &publishInfo;
deserializedInfo.deserializationResult = status;
/* Invoke application callback to hand the buffer over to application
* before sending acks.
* Application callback will be invoked for all publishes, except for
* duplicate incoming publishes. */
if( duplicatePublish == false )
{
pContext->appCallback( pContext,
pIncomingPacket,
&deserializedInfo );
}
/* Send PUBACK or PUBREC if necessary. */
status = sendPublishAcks( pContext,
packetIdentifier,
publishRecordState );
}
return status;
}