in ctsTraffic/ctsSendRecvIocp.cpp [130:262]
static ctsSendRecvStatus ctsSendRecvProcessTask(SOCKET socket, const std::shared_ptr<ctsSocket>& sharedSocket, const std::shared_ptr<ctsIoPattern>& sharedPattern, const ctsTask& nextIo) noexcept
{
ctsSendRecvStatus returnStatus;
// if we no longer have a valid socket return early
if (INVALID_SOCKET == socket)
{
returnStatus.m_ioErrorcode = WSAECONNABORTED;
returnStatus.m_ioStarted = false;
returnStatus.m_ioDone = true;
// even if the socket was closed we still must complete the IO request
sharedPattern->CompleteIo(nextIo, 0, returnStatus.m_ioErrorcode);
return returnStatus;
}
if (ctsTaskAction::GracefulShutdown == nextIo.m_ioAction)
{
if (shutdown(socket, SD_SEND) != 0)
{
returnStatus.m_ioErrorcode = WSAGetLastError();
}
returnStatus.m_ioDone = sharedPattern->CompleteIo(nextIo, 0, returnStatus.m_ioErrorcode) != ctsIoStatus::ContinueIo;
returnStatus.m_ioStarted = false;
}
else if (ctsTaskAction::HardShutdown == nextIo.m_ioAction)
{
// pass through -1 to force an RST with the closesocket
returnStatus.m_ioErrorcode = sharedSocket->CloseSocket(static_cast<uint32_t>(SOCKET_ERROR));
returnStatus.m_ioDone = sharedPattern->CompleteIo(nextIo, 0, returnStatus.m_ioErrorcode) != ctsIoStatus::ContinueIo;
returnStatus.m_ioStarted = false;
}
else
{
try
{
// attempt to allocate an IO thread-pool object
const std::shared_ptr<ctl::ctThreadIocp>& ioThreadPool(sharedSocket->GetIocpThreadpool());
OVERLAPPED* const pOverlapped = ioThreadPool->new_request(
[weak_reference = std::weak_ptr(sharedSocket), nextIo](OVERLAPPED* pCallbackOverlapped) noexcept {
ctsSendRecvCompletionCallback(pCallbackOverlapped, weak_reference, nextIo);
});
WSABUF wsabuffer{};
wsabuffer.buf = nextIo.m_buffer + nextIo.m_bufferOffset;
wsabuffer.len = nextIo.m_bufferLength;
PCSTR functionName{};
if (ctsTaskAction::Send == nextIo.m_ioAction)
{
functionName = "WSASend";
if (WSASend(socket, &wsabuffer, 1, nullptr, 0, pOverlapped, nullptr) != 0)
{
returnStatus.m_ioErrorcode = WSAGetLastError();
}
}
else
{
functionName = "WSARecv";
DWORD flags = ctsConfig::g_configSettings->Options & ctsConfig::OptionType::MsgWaitAll ? MSG_WAITALL : 0;
if (WSARecv(socket, &wsabuffer, 1, nullptr, &flags, pOverlapped, nullptr) != 0)
{
returnStatus.m_ioErrorcode = WSAGetLastError();
}
}
//
// not calling complete_io if returned IO pended
// not calling complete_io if returned success but not handling inline completions
//
if (WSA_IO_PENDING == returnStatus.m_ioErrorcode ||
// ReSharper disable once CppRedundantParentheses
(NO_ERROR == returnStatus.m_ioErrorcode && !(ctsConfig::g_configSettings->Options & ctsConfig::OptionType::HandleInlineIocp)))
{
returnStatus.m_ioErrorcode = NO_ERROR;
returnStatus.m_ioStarted = true;
returnStatus.m_ioDone = false;
}
else
{
// process the completion if the API call failed, or if it succeeded and we're handling the completion inline,
returnStatus.m_ioStarted = false;
// determine # of bytes transferred, if any
DWORD bytesTransferred = 0;
if (NO_ERROR == returnStatus.m_ioErrorcode)
{
DWORD flags;
if (!WSAGetOverlappedResult(socket, pOverlapped, &bytesTransferred, FALSE, &flags))
{
FAIL_FAST_MSG(
"WSAGetOverlappedResult failed (%d) after the IO request (%hs) succeeded", WSAGetLastError(), functionName);
}
}
// must cancel the IOCP TP since IO is not pended
ioThreadPool->cancel_request(pOverlapped);
// call back to the socket to see if wants more IO
switch (const ctsIoStatus protocolStatus = sharedPattern->CompleteIo(nextIo, bytesTransferred, returnStatus.m_ioErrorcode))
{
case ctsIoStatus::ContinueIo:
// The protocol layer wants to transfer more data
// if prior IO failed, the protocol wants to ignore the error
returnStatus.m_ioErrorcode = NO_ERROR;
returnStatus.m_ioDone = false;
break;
case ctsIoStatus::CompletedIo:
// The protocol layer has successfully complete all IO on this connection
// if prior IO failed, the protocol wants to ignore the error
returnStatus.m_ioErrorcode = NO_ERROR;
returnStatus.m_ioDone = true;
break;
case ctsIoStatus::FailedIo:
// write out the error
ctsConfig::PrintErrorIfFailed(functionName, sharedPattern->GetLastPatternError());
// the protocol acknoledged the failure - socket is done with IO
returnStatus.m_ioErrorcode = sharedPattern->GetLastPatternError();
returnStatus.m_ioDone = true;
break;
default:
FAIL_FAST_MSG("ctsSendRecvIocp: unknown ctsSocket::IOStatus - %d\n", protocolStatus);
}
}
}
catch (...)
{
returnStatus.m_ioErrorcode = ctsConfig::PrintThrownException();
returnStatus.m_ioDone = sharedPattern->CompleteIo(nextIo, 0, returnStatus.m_ioErrorcode) != ctsIoStatus::ContinueIo;
returnStatus.m_ioStarted = false;
}
}
return returnStatus;
}