in ctsTraffic/ctsReadWriteIocp.cpp [112:268]
void ctsReadWriteIocp(const std::weak_ptr<ctsSocket>& weakSocket) noexcept
{
// must get a reference to the socket and the IO pattern
const auto sharedSocket(weakSocket.lock());
if (!sharedSocket)
{
return;
}
// hold a reference on the socket
const auto lockedSocket = sharedSocket->AcquireSocketLock();
const auto lockedPattern = lockedSocket.GetPattern();
if (!lockedPattern)
{
return;
}
// can't initialize to zero - zero indicates to complete_state()
long ioCount = -1;
uint32_t ioError = NO_ERROR;
// ReSharper disable once CppTooWideScopeInitStatement
auto socket = lockedSocket.GetSocket();
if (socket != INVALID_SOCKET)
{
auto ioDone = false;
// loop until failure or initiate_io returns None
while (!ioDone && NO_ERROR == ioError)
{
// each loop requests the next task
ctsTask nextIo = lockedPattern->InitiateIo();
if (ctsTaskAction::None == nextIo.m_ioAction)
{
// nothing failed, just no more IO right now
ioDone = true;
continue;
}
if (ctsTaskAction::GracefulShutdown == nextIo.m_ioAction)
{
if (0 != shutdown(socket, SD_SEND))
{
ioError = WSAGetLastError();
}
ioDone = lockedPattern->CompleteIo(nextIo, 0, ioError) != ctsIoStatus::ContinueIo;
continue;
}
if (ctsTaskAction::HardShutdown == nextIo.m_ioAction)
{
// pass through -1 to force an RST with the closesocket
ioError = sharedSocket->CloseSocket(static_cast<uint32_t>(SOCKET_ERROR));
socket = INVALID_SOCKET;
ioDone = lockedPattern->CompleteIo(nextIo, 0, ioError) != ctsIoStatus::ContinueIo;
continue;
}
// else we need to initiate another IO
// add-ref the IO about to start
ioCount = sharedSocket->IncrementIo();
std::shared_ptr<ctl::ctThreadIocp> ioThreadPool;
OVERLAPPED* pOverlapped = nullptr;
try
{
// these are the only calls which can throw in this function
ioThreadPool = sharedSocket->GetIocpThreadpool();
pOverlapped = ioThreadPool->new_request(
[weakSocket, nextIo](OVERLAPPED* pCallbackOverlapped) noexcept { ctsReadWriteIocpIoCompletionCallback(pCallbackOverlapped, weakSocket, nextIo); });
}
catch (...)
{
ioError = ctsConfig::PrintThrownException();
}
// if an exception prevented this IO from initiating,
if (ioError != NO_ERROR)
{
ioCount = sharedSocket->DecrementIo();
ioDone = lockedPattern->CompleteIo(nextIo, 0, ioError) != ctsIoStatus::ContinueIo;
continue;
}
char* ioBuffer = nextIo.m_buffer + nextIo.m_bufferOffset;
if (ctsTaskAction::Send == nextIo.m_ioAction)
{
if (!WriteFile(reinterpret_cast<HANDLE>(socket), ioBuffer, nextIo.m_bufferLength, nullptr, pOverlapped)) // NOLINT(performance-no-int-to-ptr)
{
ioError = GetLastError();
}
}
else
{
if (!ReadFile(reinterpret_cast<HANDLE>(socket), ioBuffer, nextIo.m_bufferLength, nullptr, pOverlapped)) // NOLINT(performance-no-int-to-ptr)
{
ioError = GetLastError();
}
}
//
// not calling complete_io on success, since the IO completion will handle that in the callback
//
if (ERROR_IO_PENDING == ioError)
{
ioError = NO_ERROR;
}
if (ioError != NO_ERROR)
{
// must cancel the IOCP TP if the IO call fails
ioThreadPool->cancel_request(pOverlapped);
// decrement the IO count since it was not pended
ioCount = sharedSocket->DecrementIo();
const char* functionName = ctsTaskAction::Send == nextIo.m_ioAction ? "WriteFile" : "ReadFile";
PRINT_DEBUG_INFO(L"\t\tIO Failed: %hs (%u) [ctsReadWriteIocp]\n", functionName, ioError);
// call back to the socket that it failed to see if wants more IO
switch (const ctsIoStatus protocolStatus = lockedPattern->CompleteIo(nextIo, 0, ioError))
{
case ctsIoStatus::ContinueIo:
// the protocol wants to ignore the error and send more data
ioError = NO_ERROR;
ioDone = false;
break;
case ctsIoStatus::CompletedIo:
// the protocol wants to ignore the error but is done with IO
ioError = NO_ERROR;
ioDone = true;
break;
case ctsIoStatus::FailedIo:
// print the error on failure
ctsConfig::PrintErrorIfFailed(functionName, ioError);
// the protocol acknoledged the failure - socket is done with IO
ioError = static_cast<int>(lockedPattern->GetLastPatternError());
ioDone = true;
break;
default:
FAIL_FAST_MSG("ctsReadWriteIocp: unknown ctsSocket::IOStatus - %d\n", protocolStatus);
}
}
}
}
else
{
ioError = WSAECONNABORTED;
}
if (0 == ioCount)
{
// complete the ctsSocket if we have no IO pended
sharedSocket->CompleteState(ioError);
}
}