ctsTask ctsIoPattern::CreateNewTask()

in ctsTraffic/ctsIOPattern.cpp [550:743]


ctsTask ctsIoPattern::CreateNewTask(ctsTaskAction action, uint32_t maxTransfer) noexcept
{
    //
    // with TCP, we need to calculate the buffer size based off bytes remaining
    // with UDP, we're always posting the same size buffer
    //

    // first: calculate the next buffer size assuming no max ceiling specified by the protocol
    const auto remainingTransfer = m_patternState.GetRemainingTransfer();
    const auto nextBufferSize = ctsConfig::GetBufferSize();
    const auto minBufferSize = min<uint64_t>(remainingTransfer, nextBufferSize);
    uint64_t newBufferSize = minBufferSize;

    // second: if the protocol specified a ceiling, recalculate given their ceiling
    if (maxTransfer > 0 && maxTransfer < minBufferSize)
    {
        newBufferSize = maxTransfer;
    }

    // guard against hitting a 32-bit overflow
    FAIL_FAST_IF_MSG(
        newBufferSize > MAXDWORD,
        "ctsIOPattern internal error: next buffer size (%llu) is greater than MAXDWORD (%u)",
        newBufferSize, MAXDWORD);
    const auto verifiedNewBufferSize{static_cast<uint32_t>(newBufferSize)};

    // build the next IO request with a properly calculated buffer size
    // Send must specify the offset because we must align the patterns that we send
    // Recv must not specify an offset because will always use the entire buffer for the recv
    ctsTask returnTask;
    if (ctsTaskAction::Send == action)
    {
        // with RIO, we have preallocated only so many pre-pinned buffers for data to keep in flight
        // if that's exhausted, return no-IO yet
        if (WI_IsFlagSet(ctsConfig::g_configSettings->SocketFlags, WSA_FLAG_REGISTERED_IO) && m_sendingRioBufferIds.empty())
        {
            return ctsTask();
        }

        //
        // check to see if the send needs to be deferred into the future
        //
        returnTask.m_timeOffsetMilliseconds = 0LL;
        if (m_bytesSendingPerQuantum > 0)
        {
            const auto currentTimeMs(ctTimer::SnapQpcInMillis());
            if (m_bytesSendingThisQuantum < m_bytesSendingPerQuantum)
            {
                // adjust bytes_sending_this_quantum
                m_bytesSendingThisQuantum += verifiedNewBufferSize;

                // no need to adjust quantum_start_time_ms unless we skipped into a new quantum
                // (meaning the previous quantum had not filled the max bytes for that quantum)
                // ReSharper disable once CppRedundantParentheses
                if (currentTimeMs > (m_quantumStartTimeMs + ctsConfig::g_configSettings->TcpBytesPerSecondPeriod))
                {
                    // current time shows it's now beyond this quantum timeframe
                    // - once we see how many quantums we have skipped forward, move our quantum start time to the quantum we are actually in
                    // - then adjust the number of bytes we are to send this quantum by how many quantum we just skipped
                    const auto quantumsSkippedSinceLastSend = (currentTimeMs - m_quantumStartTimeMs) / ctsConfig::g_configSettings->TcpBytesPerSecondPeriod;
                    m_quantumStartTimeMs += quantumsSkippedSinceLastSend * ctsConfig::g_configSettings->TcpBytesPerSecondPeriod;

                    // we have to be careful making this adjustment since the remainingbytes this quantum could be very small
                    // - we only subtract out if the number of bytes skipped is >= bytes actually skipped
                    // ReSharper disable once CppTooWideScopeInitStatement
                    const auto bytesToAdjust = m_bytesSendingPerQuantum * quantumsSkippedSinceLastSend;
                    if (bytesToAdjust > m_bytesSendingThisQuantum)
                    {
                        m_bytesSendingThisQuantum = 0;
                    }
                    else
                    {
                        m_bytesSendingThisQuantum -= bytesToAdjust;
                    }
                }
            }
            else
            {
                // we have sent more than required for this quantum
                // - check if this fullfilled future quantums as well
                const auto quantumAheadToSchedule = m_bytesSendingThisQuantum / m_bytesSendingPerQuantum;

                // ms_for_quantums_to_skip = the # of quantum beyond the current quantum that will be skipped
                // - when we have already sent at least 1 additional quantum of bytes
                const auto msForQuantumsToSkip = (quantumAheadToSchedule - 1) * ctsConfig::g_configSettings->TcpBytesPerSecondPeriod;

                // carry forward extra bytes from quantums that will be filled by the bytes we have already sent
                // (including the current quantum)
                // then adding the bytes we're about to send
                m_bytesSendingThisQuantum -= m_bytesSendingPerQuantum * quantumAheadToSchedule;
                m_bytesSendingThisQuantum += verifiedNewBufferSize;

                // update the return task for when to schedule the send
                // first, calculate the time to get to the end of this time quantum
                // - only adjust if the current time isn't already outside this quantum
                if (currentTimeMs < m_quantumStartTimeMs + ctsConfig::g_configSettings->TcpBytesPerSecondPeriod)
                {
                    returnTask.m_timeOffsetMilliseconds = m_quantumStartTimeMs + ctsConfig::g_configSettings->TcpBytesPerSecondPeriod - currentTimeMs;
                }
                // then add in any quantum we need to skip
                returnTask.m_timeOffsetMilliseconds += msForQuantumsToSkip;
                PRINT_DEBUG_INFO(L"\t\tctsIOPattern : delaying the next send due to RateLimit (%llu ms)\n", returnTask.m_timeOffsetMilliseconds);

                // finally, adjust quantum_start_time_ms to the next quantum which IO will complete
                m_quantumStartTimeMs += msForQuantumsToSkip + ctsConfig::g_configSettings->TcpBytesPerSecondPeriod;
            }
        }
        else if (m_burstCount.has_value())
        {
            if (m_burstCount.value() == 0)
            {
                m_burstCount = ctsConfig::g_configSettings->BurstCount;
            }

            m_burstCount = m_burstCount.value() - 1;
            if (m_burstCount.value() == 0)
            {
                returnTask.m_timeOffsetMilliseconds = m_burstDelay.value();
                PRINT_DEBUG_INFO(L"\t\tctsIOPattern : delaying the next send due to BurstDelay (%llu ms)\n", returnTask.m_timeOffsetMilliseconds);
            }
            else
            {
                PRINT_DEBUG_INFO(L"\t\tctsIOPattern : not delaying the next send due to BurstDelay\n");
            }
        }

        returnTask.m_ioAction = ctsTaskAction::Send;
        returnTask.m_bufferType = ctsTask::BufferType::Static;
        returnTask.m_bufferLength = verifiedNewBufferSize;
        returnTask.m_bufferOffset = m_sendPatternOffset;
        returnTask.m_expectedPatternOffset = 0;
        returnTask.m_buffer = g_senderSharedBuffer;

        // every RIOSend must have unique RIO buffer IDs - it can't reuse buffers ID's like WSASend can use the same m_buffer
        if (WI_IsFlagSet(ctsConfig::g_configSettings->SocketFlags, WSA_FLAG_REGISTERED_IO))
        {
            FAIL_FAST_IF_MSG(
                m_sendingRioBufferIds.empty(),
                "m_sendingRioBufferIds is empty for a new Send task  (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)", this);
            returnTask.m_bufferType = ctsTask::BufferType::Dynamic;
            // Release successfully hands off the RIO_BUFFERID - safe to pop this object now
            returnTask.m_rioBufferid = m_sendingRioBufferIds.rbegin()->Release();
            m_sendingRioBufferIds.pop_back();
        }

        // now that we are indicating this buffer to send, increment the offset for the next send request
        m_sendPatternOffset += verifiedNewBufferSize;
        m_sendPatternOffset %= c_bufferPatternSize;

        FAIL_FAST_IF_MSG(
            m_sendPatternOffset >= c_bufferPatternSize,
            "pattern_offset being too large (larger than BufferPatternSize %lu) means we might walk off the end of our shared buffer (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)",
            c_bufferPatternSize, this);
        FAIL_FAST_IF_MSG(
            returnTask.m_bufferLength + returnTask.m_bufferOffset > g_maximumBufferSize,
            "return_task (%p) for a Send request is specifying a buffer that is larger than the static SharedBufferSize (%lu) (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)",
            &returnTask, g_maximumBufferSize, this);
    }
    else
    {
        returnTask.m_ioAction = ctsTaskAction::Recv;
        returnTask.m_bufferType = ctsTask::BufferType::Dynamic;
        returnTask.m_bufferLength = verifiedNewBufferSize;
        returnTask.m_bufferOffset = 0; // always recv to the beginning of the buffer
        returnTask.m_expectedPatternOffset = m_recvPatternOffset;

        FAIL_FAST_IF_MSG(
            m_recvBufferFreeList.empty(),
            "m_recvBufferFreeList is empty for a new Recv task  (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)", this);
        returnTask.m_buffer = *m_recvBufferFreeList.rbegin();
        m_recvBufferFreeList.pop_back();

        if (WI_IsFlagSet(ctsConfig::g_configSettings->SocketFlags, WSA_FLAG_REGISTERED_IO))
        {
            FAIL_FAST_IF_MSG(
                m_receivingRioBufferIds.empty(),
                "m_receivingRioBufferIds is empty for a new Recv task  (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)", this);
            returnTask.m_rioBufferid = m_receivingRioBufferIds.rbegin()->m_bufferId;
            // successfully handed off the RIO_BUFFERID - don't deregister it when we pop it
            m_receivingRioBufferIds.rbegin()->m_bufferId = RIO_INVALID_BUFFERID;
            m_receivingRioBufferIds.pop_back();
        }

        FAIL_FAST_IF_MSG(
            m_recvPatternOffset >= c_bufferPatternSize,
            "pattern_offset being too large means we might walk off the end of our shared buffer (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)", this);
        FAIL_FAST_IF_MSG(
            returnTask.m_bufferLength + returnTask.m_bufferOffset > verifiedNewBufferSize,
            "return_task (%p) for a Recv request is specifying a buffer that is larger than buffer_size (%lu) (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)",
            &returnTask, verifiedNewBufferSize, this);
    }

    return returnTask;
}