in ctsTraffic/ctsIOPattern.cpp [367:534]
ctsIoStatus ctsIoPattern::CompleteIo(const ctsTask& originalTask, uint32_t currentTransfer, uint32_t statusCode) noexcept // NOLINT(bugprone-exception-escape)
{
// preserve the initial state for the prior task
const bool wasIoRequestedFromPattern = m_patternState.IsCurrentStateMoreIo();
// add the recv buffer back if it was one of our dynamically allocated recv buffers
// add back the RIO BufferId if it was a RIO request
if (ctsTask::BufferType::Dynamic == originalTask.m_bufferType)
{
if (originalTask.m_ioAction == ctsTaskAction::Recv)
{
m_recvBufferFreeList.push_back(originalTask.m_buffer);
}
if (WI_IsFlagSet(ctsConfig::g_configSettings->SocketFlags, WSA_FLAG_REGISTERED_IO))
{
if (originalTask.m_ioAction == ctsTaskAction::Send)
{
m_sendingRioBufferIds.emplace_back(originalTask.m_rioBufferid);
}
else
{
m_receivingRioBufferIds.emplace_back(originalTask.m_rioBufferid);
}
}
}
switch (originalTask.m_ioAction)
{
case ctsTaskAction::None:
// ignore completions for tasks on None
break;
case ctsTaskAction::FatalAbort:
PRINT_DEBUG_INFO(L"\t\tctsIOPattern : completing a FatalAbort\n");
UpdateLastError(c_statusErrorNotAllDataTransferred);
break;
case ctsTaskAction::Abort:
PRINT_DEBUG_INFO(L"\t\tctsIOPattern : completing an Abort\n");
break;
case ctsTaskAction::GracefulShutdown:
// Fall-through to be processed like send or recv IO
PRINT_DEBUG_INFO(L"\t\tctsIOPattern : completing a GracefulShutdown\n");
[[fallthrough]];
case ctsTaskAction::HardShutdown:
// Fall-through to be processed like send or recv IO
PRINT_DEBUG_INFO(L"\t\tctsIOPattern : completing a HardShutdown\n");
[[fallthrough]];
case ctsTaskAction::Recv:
// Fall-through to Send - where the IO will be processed
[[fallthrough]];
case ctsTaskAction::Send:
{
auto verifyIo = true;
if (ctsTask::BufferType::TcpConnectionId == originalTask.m_bufferType ||
ctsTask::BufferType::CompletionMessage == originalTask.m_bufferType)
{
// not verifying the buffer since it's the connection Id - but must complete the task to update the protocol
verifyIo = false;
if (statusCode != NO_ERROR)
{
UpdateLastError(statusCode);
}
else
{
// process the TCP protocol state machine in pattern_state after receiving the connection id
UpdateLastPatternError(m_patternState.CompletedTask(originalTask, currentTransfer));
}
}
else if (statusCode != NO_ERROR)
{
//
// if the IO task failed, the entire IO pattern is now failed
// - unless this is an extra recv that was canceled once we completed the transfer
//
if (ctsTaskAction::Recv == originalTask.m_ioAction && m_patternState.IsCompleted())
{
PRINT_DEBUG_INFO(L"\t\tctsIOPattern : Recv failed after the pattern completed (error %u)\n", statusCode);
}
else
{
// ReSharper disable once CppTooWideScopeInitStatement
const auto currentStatus = UpdateLastError(statusCode);
if (currentStatus != c_statusIoRunning)
{
PRINT_DEBUG_INFO(L"\t\tctsIOPattern : Recv failed before the pattern completed (error %u, current status %u)\n", statusCode, currentStatus);
verifyIo = false;
}
}
}
if (verifyIo)
{
//
// IO succeeded - update state machine with the completed task if this task had IO
//
const auto patternStatus = m_patternState.CompletedTask(originalTask, currentTransfer);
// update the last_error if the pattern_state detected an error
UpdateLastPatternError(patternStatus);
//
// if this is a TCP receive completion
// and no IO or protocol errors
// and the user requested to verify buffers
// then actually validate the received completion
//
if (ctsConfig::g_configSettings->Protocol == ctsConfig::ProtocolType::TCP &&
ctsConfig::g_configSettings->ShouldVerifyBuffers &&
originalTask.m_ioAction == ctsTaskAction::Recv &&
originalTask.m_trackIo &&
(ctsIoPatternError::SuccessfullyCompleted == patternStatus || ctsIoPatternError::NoError == patternStatus))
{
FAIL_FAST_IF_MSG(
originalTask.m_expectedPatternOffset != m_recvPatternOffset,
"ctsIOPattern::complete_io() : ctsIOTask (%p) expected_pattern_offset (%lu) does not match the current pattern_offset (%lu)",
&originalTask, originalTask.m_expectedPatternOffset, m_recvPatternOffset);
if (!VerifyBuffer(originalTask, currentTransfer))
{
UpdateLastError(c_statusErrorDataDidNotMatchBitPattern);
}
m_recvPatternOffset += currentTransfer;
m_recvPatternOffset %= c_bufferPatternSize;
}
}
break;
}
}
//
// Notify the derived interface that the task completed
// - if this wasn't our internal connection id request
// - if there wasn't an error with the IO and an IO operation completed
// If the derived interface returns an error,
// - update the last_error status
//
if (originalTask.m_ioAction != ctsTaskAction::None &&
NO_ERROR == statusCode)
{
if (ctsTaskAction::Send == originalTask.m_ioAction)
{
ctsConfig::g_configSettings->TcpStatusDetails.m_bytesSent.Add(currentTransfer);
}
else
{
ctsConfig::g_configSettings->TcpStatusDetails.m_bytesRecv.Add(currentTransfer);
}
// only complete tasks that were requested
if (wasIoRequestedFromPattern)
{
UpdateLastPatternError(CompleteTaskBackToPattern(originalTask, currentTransfer));
}
}
//
// If the state machine has verified the connection has completed,
// - set the last error to zero in case it was not already set to an error
// but do this *after* the other possible failure points were checked
//
if (m_patternState.IsCompleted())
{
UpdateLastError(NO_ERROR);
EndStatistics();
}
return GetCurrentStatus();
}