in ctsTraffic/ctsIOPattern.cpp [550:743]
ctsTask ctsIoPattern::CreateNewTask(ctsTaskAction action, uint32_t maxTransfer) noexcept
{
//
// with TCP, we need to calculate the buffer size based off bytes remaining
// with UDP, we're always posting the same size buffer
//
// first: calculate the next buffer size assuming no max ceiling specified by the protocol
const auto remainingTransfer = m_patternState.GetRemainingTransfer();
const auto nextBufferSize = ctsConfig::GetBufferSize();
const auto minBufferSize = min<uint64_t>(remainingTransfer, nextBufferSize);
uint64_t newBufferSize = minBufferSize;
// second: if the protocol specified a ceiling, recalculate given their ceiling
if (maxTransfer > 0 && maxTransfer < minBufferSize)
{
newBufferSize = maxTransfer;
}
// guard against hitting a 32-bit overflow
FAIL_FAST_IF_MSG(
newBufferSize > MAXDWORD,
"ctsIOPattern internal error: next buffer size (%llu) is greater than MAXDWORD (%u)",
newBufferSize, MAXDWORD);
const auto verifiedNewBufferSize{static_cast<uint32_t>(newBufferSize)};
// build the next IO request with a properly calculated buffer size
// Send must specify the offset because we must align the patterns that we send
// Recv must not specify an offset because will always use the entire buffer for the recv
ctsTask returnTask;
if (ctsTaskAction::Send == action)
{
// with RIO, we have preallocated only so many pre-pinned buffers for data to keep in flight
// if that's exhausted, return no-IO yet
if (WI_IsFlagSet(ctsConfig::g_configSettings->SocketFlags, WSA_FLAG_REGISTERED_IO) && m_sendingRioBufferIds.empty())
{
return ctsTask();
}
//
// check to see if the send needs to be deferred into the future
//
returnTask.m_timeOffsetMilliseconds = 0LL;
if (m_bytesSendingPerQuantum > 0)
{
const auto currentTimeMs(ctTimer::SnapQpcInMillis());
if (m_bytesSendingThisQuantum < m_bytesSendingPerQuantum)
{
// adjust bytes_sending_this_quantum
m_bytesSendingThisQuantum += verifiedNewBufferSize;
// no need to adjust quantum_start_time_ms unless we skipped into a new quantum
// (meaning the previous quantum had not filled the max bytes for that quantum)
// ReSharper disable once CppRedundantParentheses
if (currentTimeMs > (m_quantumStartTimeMs + ctsConfig::g_configSettings->TcpBytesPerSecondPeriod))
{
// current time shows it's now beyond this quantum timeframe
// - once we see how many quantums we have skipped forward, move our quantum start time to the quantum we are actually in
// - then adjust the number of bytes we are to send this quantum by how many quantum we just skipped
const auto quantumsSkippedSinceLastSend = (currentTimeMs - m_quantumStartTimeMs) / ctsConfig::g_configSettings->TcpBytesPerSecondPeriod;
m_quantumStartTimeMs += quantumsSkippedSinceLastSend * ctsConfig::g_configSettings->TcpBytesPerSecondPeriod;
// we have to be careful making this adjustment since the remainingbytes this quantum could be very small
// - we only subtract out if the number of bytes skipped is >= bytes actually skipped
// ReSharper disable once CppTooWideScopeInitStatement
const auto bytesToAdjust = m_bytesSendingPerQuantum * quantumsSkippedSinceLastSend;
if (bytesToAdjust > m_bytesSendingThisQuantum)
{
m_bytesSendingThisQuantum = 0;
}
else
{
m_bytesSendingThisQuantum -= bytesToAdjust;
}
}
}
else
{
// we have sent more than required for this quantum
// - check if this fullfilled future quantums as well
const auto quantumAheadToSchedule = m_bytesSendingThisQuantum / m_bytesSendingPerQuantum;
// ms_for_quantums_to_skip = the # of quantum beyond the current quantum that will be skipped
// - when we have already sent at least 1 additional quantum of bytes
const auto msForQuantumsToSkip = (quantumAheadToSchedule - 1) * ctsConfig::g_configSettings->TcpBytesPerSecondPeriod;
// carry forward extra bytes from quantums that will be filled by the bytes we have already sent
// (including the current quantum)
// then adding the bytes we're about to send
m_bytesSendingThisQuantum -= m_bytesSendingPerQuantum * quantumAheadToSchedule;
m_bytesSendingThisQuantum += verifiedNewBufferSize;
// update the return task for when to schedule the send
// first, calculate the time to get to the end of this time quantum
// - only adjust if the current time isn't already outside this quantum
if (currentTimeMs < m_quantumStartTimeMs + ctsConfig::g_configSettings->TcpBytesPerSecondPeriod)
{
returnTask.m_timeOffsetMilliseconds = m_quantumStartTimeMs + ctsConfig::g_configSettings->TcpBytesPerSecondPeriod - currentTimeMs;
}
// then add in any quantum we need to skip
returnTask.m_timeOffsetMilliseconds += msForQuantumsToSkip;
PRINT_DEBUG_INFO(L"\t\tctsIOPattern : delaying the next send due to RateLimit (%llu ms)\n", returnTask.m_timeOffsetMilliseconds);
// finally, adjust quantum_start_time_ms to the next quantum which IO will complete
m_quantumStartTimeMs += msForQuantumsToSkip + ctsConfig::g_configSettings->TcpBytesPerSecondPeriod;
}
}
else if (m_burstCount.has_value())
{
if (m_burstCount.value() == 0)
{
m_burstCount = ctsConfig::g_configSettings->BurstCount;
}
m_burstCount = m_burstCount.value() - 1;
if (m_burstCount.value() == 0)
{
returnTask.m_timeOffsetMilliseconds = m_burstDelay.value();
PRINT_DEBUG_INFO(L"\t\tctsIOPattern : delaying the next send due to BurstDelay (%llu ms)\n", returnTask.m_timeOffsetMilliseconds);
}
else
{
PRINT_DEBUG_INFO(L"\t\tctsIOPattern : not delaying the next send due to BurstDelay\n");
}
}
returnTask.m_ioAction = ctsTaskAction::Send;
returnTask.m_bufferType = ctsTask::BufferType::Static;
returnTask.m_bufferLength = verifiedNewBufferSize;
returnTask.m_bufferOffset = m_sendPatternOffset;
returnTask.m_expectedPatternOffset = 0;
returnTask.m_buffer = g_senderSharedBuffer;
// every RIOSend must have unique RIO buffer IDs - it can't reuse buffers ID's like WSASend can use the same m_buffer
if (WI_IsFlagSet(ctsConfig::g_configSettings->SocketFlags, WSA_FLAG_REGISTERED_IO))
{
FAIL_FAST_IF_MSG(
m_sendingRioBufferIds.empty(),
"m_sendingRioBufferIds is empty for a new Send task (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)", this);
returnTask.m_bufferType = ctsTask::BufferType::Dynamic;
// Release successfully hands off the RIO_BUFFERID - safe to pop this object now
returnTask.m_rioBufferid = m_sendingRioBufferIds.rbegin()->Release();
m_sendingRioBufferIds.pop_back();
}
// now that we are indicating this buffer to send, increment the offset for the next send request
m_sendPatternOffset += verifiedNewBufferSize;
m_sendPatternOffset %= c_bufferPatternSize;
FAIL_FAST_IF_MSG(
m_sendPatternOffset >= c_bufferPatternSize,
"pattern_offset being too large (larger than BufferPatternSize %lu) means we might walk off the end of our shared buffer (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)",
c_bufferPatternSize, this);
FAIL_FAST_IF_MSG(
returnTask.m_bufferLength + returnTask.m_bufferOffset > g_maximumBufferSize,
"return_task (%p) for a Send request is specifying a buffer that is larger than the static SharedBufferSize (%lu) (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)",
&returnTask, g_maximumBufferSize, this);
}
else
{
returnTask.m_ioAction = ctsTaskAction::Recv;
returnTask.m_bufferType = ctsTask::BufferType::Dynamic;
returnTask.m_bufferLength = verifiedNewBufferSize;
returnTask.m_bufferOffset = 0; // always recv to the beginning of the buffer
returnTask.m_expectedPatternOffset = m_recvPatternOffset;
FAIL_FAST_IF_MSG(
m_recvBufferFreeList.empty(),
"m_recvBufferFreeList is empty for a new Recv task (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)", this);
returnTask.m_buffer = *m_recvBufferFreeList.rbegin();
m_recvBufferFreeList.pop_back();
if (WI_IsFlagSet(ctsConfig::g_configSettings->SocketFlags, WSA_FLAG_REGISTERED_IO))
{
FAIL_FAST_IF_MSG(
m_receivingRioBufferIds.empty(),
"m_receivingRioBufferIds is empty for a new Recv task (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)", this);
returnTask.m_rioBufferid = m_receivingRioBufferIds.rbegin()->m_bufferId;
// successfully handed off the RIO_BUFFERID - don't deregister it when we pop it
m_receivingRioBufferIds.rbegin()->m_bufferId = RIO_INVALID_BUFFERID;
m_receivingRioBufferIds.pop_back();
}
FAIL_FAST_IF_MSG(
m_recvPatternOffset >= c_bufferPatternSize,
"pattern_offset being too large means we might walk off the end of our shared buffer (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)", this);
FAIL_FAST_IF_MSG(
returnTask.m_bufferLength + returnTask.m_bufferOffset > verifiedNewBufferSize,
"return_task (%p) for a Recv request is specifying a buffer that is larger than buffer_size (%lu) (dt ctsTraffic!ctsTraffic::ctsIOPattern %p)",
&returnTask, verifiedNewBufferSize, this);
}
return returnTask;
}