in source/core_mqtt.c [715:787]
static int32_t recvExact( const MQTTContext_t * pContext,
size_t bytesToRecv )
{
uint8_t * pIndex = NULL;
size_t bytesRemaining = bytesToRecv;
int32_t totalBytesRecvd = 0, bytesRecvd;
uint32_t lastDataRecvTimeMs = 0U, timeSinceLastRecvMs = 0U;
TransportRecv_t recvFunc = NULL;
MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
bool receiveError = false;
assert( pContext != NULL );
assert( bytesToRecv <= pContext->networkBuffer.size );
assert( pContext->getTime != NULL );
assert( pContext->transportInterface.recv != NULL );
assert( pContext->networkBuffer.pBuffer != NULL );
pIndex = pContext->networkBuffer.pBuffer;
recvFunc = pContext->transportInterface.recv;
getTimeStampMs = pContext->getTime;
/* Part of the MQTT packet has been read before calling this function. */
lastDataRecvTimeMs = getTimeStampMs();
while( ( bytesRemaining > 0U ) && ( receiveError == false ) )
{
bytesRecvd = recvFunc( pContext->transportInterface.pNetworkContext,
pIndex,
bytesRemaining );
if( bytesRecvd < 0 )
{
LogError( ( "Network error while receiving packet: ReturnCode=%ld.",
( long int ) bytesRecvd ) );
totalBytesRecvd = bytesRecvd;
receiveError = true;
}
else if( bytesRecvd > 0 )
{
/* Reset the starting time as we have received some data from the network. */
lastDataRecvTimeMs = getTimeStampMs();
/* It is a bug in the application's transport receive implementation
* if more bytes than expected are received. To avoid a possible
* overflow in converting bytesRemaining from unsigned to signed,
* this assert must exist after the check for bytesRecvd being
* negative. */
assert( ( size_t ) bytesRecvd <= bytesRemaining );
bytesRemaining -= ( size_t ) bytesRecvd;
totalBytesRecvd += ( int32_t ) bytesRecvd;
pIndex += bytesRecvd;
LogDebug( ( "BytesReceived=%ld, BytesRemaining=%lu, TotalBytesReceived=%ld.",
( long int ) bytesRecvd,
( unsigned long ) bytesRemaining,
( long int ) totalBytesRecvd ) );
}
else
{
/* No bytes were read from the network. */
timeSinceLastRecvMs = calculateElapsedTime( getTimeStampMs(), lastDataRecvTimeMs );
/* Check for timeout if we have been waiting to receive any byte on the network. */
if( timeSinceLastRecvMs >= MQTT_RECV_POLLING_TIMEOUT_MS )
{
LogError( ( "Unable to receive packet: Timed out in transport recv." ) );
receiveError = true;
}
}
}
return totalBytesRecvd;
}