in source/core_mqtt_agent.c [795:834]
static MQTTStatus_t resendPublishes( MQTTAgentContext_t * pMqttAgentContext )
{
MQTTStatus_t statusResult = MQTTSuccess;
MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
uint16_t packetId = MQTT_PACKET_ID_INVALID;
MQTTAgentAckInfo_t * pFoundAck = NULL;
MQTTPublishInfo_t * pOriginalPublish = NULL;
MQTTContext_t * pMqttContext;
assert( pMqttAgentContext != NULL );
pMqttContext = &( pMqttAgentContext->mqttContext );
packetId = MQTT_PublishToResend( pMqttContext, &cursor );
while( packetId != MQTT_PACKET_ID_INVALID )
{
/* Retrieve the operation but do not remove it from the list. */
pFoundAck = getAwaitingOperation( pMqttAgentContext, packetId );
if( pFoundAck != NULL )
{
/* Set the DUP flag. */
pOriginalPublish = ( MQTTPublishInfo_t * ) ( pFoundAck->pOriginalCommand->pArgs );
pOriginalPublish->dup = true;
statusResult = MQTT_Publish( pMqttContext, pOriginalPublish, packetId );
if( statusResult != MQTTSuccess )
{
concludeCommand( pMqttAgentContext, pFoundAck->pOriginalCommand, statusResult, NULL );
( void ) memset( pFoundAck, 0x00, sizeof( MQTTAgentAckInfo_t ) );
LogError( ( "Failed to resend publishes. Error code=%s\n", MQTT_Status_strerror( statusResult ) ) );
break;
}
}
packetId = MQTT_PublishToResend( pMqttContext, &cursor );
}
return statusResult;
}