in source/core_mqtt.c [1916:1986]
MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
uint16_t packetId )
{
size_t headerSize = 0UL;
MQTTPublishState_t publishStatus = MQTTStateNull;
/* Validate arguments. */
MQTTStatus_t status = validatePublishParams( pContext, pPublishInfo, packetId );
if( status == MQTTSuccess )
{
/* Serialize PUBLISH packet. */
status = serializePublish( pContext,
pPublishInfo,
packetId,
&headerSize );
}
if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
{
/* Reserve state for publish message. Only to be done for QoS1 or QoS2. */
status = MQTT_ReserveState( pContext,
packetId,
pPublishInfo->qos );
/* State already exists for a duplicate packet.
* If a state doesn't exist, it will be handled as a new publish in
* state engine. */
if( ( status == MQTTStateCollision ) && ( pPublishInfo->dup == true ) )
{
status = MQTTSuccess;
}
}
if( status == MQTTSuccess )
{
/* Sends the serialized publish packet over network. */
status = sendPublish( pContext,
pPublishInfo,
headerSize );
}
if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) )
{
/* Update state machine after PUBLISH is sent.
* Only to be done for QoS1 or QoS2. */
status = MQTT_UpdateStatePublish( pContext,
packetId,
MQTT_SEND,
pPublishInfo->qos,
&publishStatus );
if( status != MQTTSuccess )
{
LogError( ( "Update state for publish failed with status %s."
" However PUBLISH packet was sent to the broker."
" Any further handling of ACKs for the packet Id"
" will fail.",
MQTT_Status_strerror( status ) ) );
}
}
if( status != MQTTSuccess )
{
LogError( ( "MQTT PUBLISH failed with status %s.",
MQTT_Status_strerror( status ) ) );
}
return status;
}