in device_firmware/libraries/c_sdk/standard/mqtt/src/iot_mqtt_api.c [1499:1749]
IotMqttError_t IotMqtt_Publish( IotMqttConnection_t mqttConnection,
const IotMqttPublishInfo_t * pPublishInfo,
uint32_t flags,
const IotMqttCallbackInfo_t * pCallbackInfo,
IotMqttOperation_t * pPublishOperation )
{
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );
_mqttOperation_t * pOperation = NULL;
uint8_t ** pPacketIdentifierHigh = NULL;
/* Default PUBLISH serializer function. */
IotMqttError_t ( * serializePublish )( const IotMqttPublishInfo_t *,
uint8_t **,
size_t *,
uint16_t *,
uint8_t ** ) = _IotMqtt_SerializePublish;
/* Check that the PUBLISH information is valid. */
if( _IotMqtt_ValidatePublish( mqttConnection->awsIotMqttMode,
pPublishInfo ) == false )
{
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check that no notification is requested for a QoS 0 publish. */
if( pPublishInfo->qos == IOT_MQTT_QOS_0 )
{
if( pCallbackInfo != NULL )
{
IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
else if( ( flags & IOT_MQTT_FLAG_WAITABLE ) != 0 )
{
IotLogError( "QoS 0 PUBLISH should not have notification parameters set." );
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
else
{
EMPTY_ELSE_MARKER;
}
if( pPublishOperation != NULL )
{
IotLogWarn( "Ignoring reference parameter for QoS 0 publish." );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check that a reference pointer is provided for a waitable operation. */
if( ( flags & IOT_MQTT_FLAG_WAITABLE ) == IOT_MQTT_FLAG_WAITABLE )
{
if( pPublishOperation == NULL )
{
IotLogError( "Reference must be provided for a waitable PUBLISH." );
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_BAD_PARAMETER );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Create a PUBLISH operation. */
status = _IotMqtt_CreateOperation( mqttConnection,
flags,
pCallbackInfo,
&pOperation );
if( status != IOT_MQTT_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check the PUBLISH operation data and set the operation type. */
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );
pOperation->u.operation.type = IOT_MQTT_PUBLISH_TO_SERVER;
/* Choose a PUBLISH serializer function. */
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
if( mqttConnection->pSerializer != NULL )
{
if( mqttConnection->pSerializer->serialize.publish != NULL )
{
serializePublish = mqttConnection->pSerializer->serialize.publish;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
/* In AWS IoT MQTT mode, a pointer to the packet identifier must be saved. */
if( mqttConnection->awsIotMqttMode == true )
{
pPacketIdentifierHigh = &( pOperation->u.operation.pPacketIdentifierHigh );
}
else
{
EMPTY_ELSE_MARKER;
}
/* Generate a PUBLISH packet from pPublishInfo. */
status = serializePublish( pPublishInfo,
&( pOperation->u.operation.pMqttPacket ),
&( pOperation->u.operation.packetSize ),
&( pOperation->u.operation.packetIdentifier ),
pPacketIdentifierHigh );
if( status != IOT_MQTT_SUCCESS )
{
IOT_GOTO_CLEANUP();
}
else
{
EMPTY_ELSE_MARKER;
}
/* Check the serialized MQTT packet. */
IotMqtt_Assert( pOperation->u.operation.pMqttPacket != NULL );
IotMqtt_Assert( pOperation->u.operation.packetSize > 0 );
/* Initialize PUBLISH retry if retryLimit is set. */
if( pPublishInfo->retryLimit > 0 )
{
/* A QoS 0 PUBLISH may not be retried. */
if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
{
pOperation->u.operation.retry.limit = pPublishInfo->retryLimit;
pOperation->u.operation.retry.nextPeriod = pPublishInfo->retryMs;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Set the reference, if provided. */
if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
{
if( pPublishOperation != NULL )
{
*pPublishOperation = pOperation;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
/* Add the PUBLISH operation to the send queue for network transmission. */
status = _IotMqtt_ScheduleOperation( pOperation,
_IotMqtt_ProcessSend,
0 );
if( status != IOT_MQTT_SUCCESS )
{
IotLogError( "(MQTT connection %p) Failed to enqueue PUBLISH for sending.",
mqttConnection );
/* Clear the previously set (and now invalid) reference. */
if( pPublishInfo->qos != IOT_MQTT_QOS_0 )
{
if( pPublishOperation != NULL )
{
*pPublishOperation = IOT_MQTT_OPERATION_INITIALIZER;
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
EMPTY_ELSE_MARKER;
}
IOT_GOTO_CLEANUP();
}
else
{
EMPTY_ELSE_MARKER;
}
/* Clean up the PUBLISH operation if this function fails. Otherwise, set the
* appropriate return code based on QoS. */
IOT_FUNCTION_CLEANUP_BEGIN();
if( status != IOT_MQTT_SUCCESS )
{
if( pOperation != NULL )
{
_IotMqtt_DestroyOperation( pOperation );
}
else
{
EMPTY_ELSE_MARKER;
}
}
else
{
if( pPublishInfo->qos > IOT_MQTT_QOS_0 )
{
status = IOT_MQTT_STATUS_PENDING;
}
else
{
EMPTY_ELSE_MARKER;
}
IotLogInfo( "(MQTT connection %p) MQTT PUBLISH operation queued.",
mqttConnection );
}
IOT_FUNCTION_CLEANUP_END();
}