static int32_t recvExact()

in source/core_mqtt.c [715:787]


static int32_t recvExact( const MQTTContext_t * pContext,
                          size_t bytesToRecv )
{
    uint8_t * pIndex = NULL;
    size_t bytesRemaining = bytesToRecv;
    int32_t totalBytesRecvd = 0, bytesRecvd;
    uint32_t lastDataRecvTimeMs = 0U, timeSinceLastRecvMs = 0U;
    TransportRecv_t recvFunc = NULL;
    MQTTGetCurrentTimeFunc_t getTimeStampMs = NULL;
    bool receiveError = false;

    assert( pContext != NULL );
    assert( bytesToRecv <= pContext->networkBuffer.size );
    assert( pContext->getTime != NULL );
    assert( pContext->transportInterface.recv != NULL );
    assert( pContext->networkBuffer.pBuffer != NULL );

    pIndex = pContext->networkBuffer.pBuffer;
    recvFunc = pContext->transportInterface.recv;
    getTimeStampMs = pContext->getTime;

    /* Part of the MQTT packet has been read before calling this function. */
    lastDataRecvTimeMs = getTimeStampMs();

    while( ( bytesRemaining > 0U ) && ( receiveError == false ) )
    {
        bytesRecvd = recvFunc( pContext->transportInterface.pNetworkContext,
                               pIndex,
                               bytesRemaining );

        if( bytesRecvd < 0 )
        {
            LogError( ( "Network error while receiving packet: ReturnCode=%ld.",
                        ( long int ) bytesRecvd ) );
            totalBytesRecvd = bytesRecvd;
            receiveError = true;
        }
        else if( bytesRecvd > 0 )
        {
            /* Reset the starting time as we have received some data from the network. */
            lastDataRecvTimeMs = getTimeStampMs();

            /* It is a bug in the application's transport receive implementation
             * if more bytes than expected are received. To avoid a possible
             * overflow in converting bytesRemaining from unsigned to signed,
             * this assert must exist after the check for bytesRecvd being
             * negative. */
            assert( ( size_t ) bytesRecvd <= bytesRemaining );

            bytesRemaining -= ( size_t ) bytesRecvd;
            totalBytesRecvd += ( int32_t ) bytesRecvd;
            pIndex += bytesRecvd;
            LogDebug( ( "BytesReceived=%ld, BytesRemaining=%lu, TotalBytesReceived=%ld.",
                        ( long int ) bytesRecvd,
                        ( unsigned long ) bytesRemaining,
                        ( long int ) totalBytesRecvd ) );
        }
        else
        {
            /* No bytes were read from the network. */
            timeSinceLastRecvMs = calculateElapsedTime( getTimeStampMs(), lastDataRecvTimeMs );

            /* Check for timeout if we have been waiting to receive any byte on the network. */
            if( timeSinceLastRecvMs >= MQTT_RECV_POLLING_TIMEOUT_MS )
            {
                LogError( ( "Unable to receive packet: Timed out in transport recv." ) );
                receiveError = true;
            }
        }
    }

    return totalBytesRecvd;
}