void ctsReadWriteIocp()

in ctsTraffic/ctsReadWriteIocp.cpp [112:268]


void ctsReadWriteIocp(const std::weak_ptr<ctsSocket>& weakSocket) noexcept
{
    // must get a reference to the socket and the IO pattern
    const auto sharedSocket(weakSocket.lock());
    if (!sharedSocket)
    {
        return;
    }

    // hold a reference on the socket
    const auto lockedSocket = sharedSocket->AcquireSocketLock();
    const auto lockedPattern = lockedSocket.GetPattern();
    if (!lockedPattern)
    {
        return;
    }

    // can't initialize to zero - zero indicates to complete_state()
    long ioCount = -1;
    uint32_t ioError = NO_ERROR;

    // ReSharper disable once CppTooWideScopeInitStatement
    auto socket = lockedSocket.GetSocket();
    if (socket != INVALID_SOCKET)
    {
        auto ioDone = false;
        // loop until failure or initiate_io returns None
        while (!ioDone && NO_ERROR == ioError)
        {
            // each loop requests the next task
            ctsTask nextIo = lockedPattern->InitiateIo();
            if (ctsTaskAction::None == nextIo.m_ioAction)
            {
                // nothing failed, just no more IO right now
                ioDone = true;
                continue;
            }

            if (ctsTaskAction::GracefulShutdown == nextIo.m_ioAction)
            {
                if (0 != shutdown(socket, SD_SEND))
                {
                    ioError = WSAGetLastError();
                }
                ioDone = lockedPattern->CompleteIo(nextIo, 0, ioError) != ctsIoStatus::ContinueIo;
                continue;
            }

            if (ctsTaskAction::HardShutdown == nextIo.m_ioAction)
            {
                // pass through -1 to force an RST with the closesocket
                ioError = sharedSocket->CloseSocket(static_cast<uint32_t>(SOCKET_ERROR));
                socket = INVALID_SOCKET;

                ioDone = lockedPattern->CompleteIo(nextIo, 0, ioError) != ctsIoStatus::ContinueIo;
                continue;
            }

            // else we need to initiate another IO
            // add-ref the IO about to start
            ioCount = sharedSocket->IncrementIo();

            std::shared_ptr<ctl::ctThreadIocp> ioThreadPool;
            OVERLAPPED* pOverlapped = nullptr;
            try
            {
                // these are the only calls which can throw in this function
                ioThreadPool = sharedSocket->GetIocpThreadpool();
                pOverlapped = ioThreadPool->new_request(
                    [weakSocket, nextIo](OVERLAPPED* pCallbackOverlapped) noexcept { ctsReadWriteIocpIoCompletionCallback(pCallbackOverlapped, weakSocket, nextIo); });
            }
            catch (...)
            {
                ioError = ctsConfig::PrintThrownException();
            }

            // if an exception prevented this IO from initiating,
            if (ioError != NO_ERROR)
            {
                ioCount = sharedSocket->DecrementIo();
                ioDone = lockedPattern->CompleteIo(nextIo, 0, ioError) != ctsIoStatus::ContinueIo;
                continue;
            }

            char* ioBuffer = nextIo.m_buffer + nextIo.m_bufferOffset;
            if (ctsTaskAction::Send == nextIo.m_ioAction)
            {
                if (!WriteFile(reinterpret_cast<HANDLE>(socket), ioBuffer, nextIo.m_bufferLength, nullptr, pOverlapped)) // NOLINT(performance-no-int-to-ptr)
                {
                    ioError = GetLastError();
                }
            }
            else
            {
                if (!ReadFile(reinterpret_cast<HANDLE>(socket), ioBuffer, nextIo.m_bufferLength, nullptr, pOverlapped)) // NOLINT(performance-no-int-to-ptr)
                {
                    ioError = GetLastError();
                }
            }
            //
            // not calling complete_io on success, since the IO completion will handle that in the callback
            //
            if (ERROR_IO_PENDING == ioError)
            {
                ioError = NO_ERROR;
            }

            if (ioError != NO_ERROR)
            {
                // must cancel the IOCP TP if the IO call fails
                ioThreadPool->cancel_request(pOverlapped);
                // decrement the IO count since it was not pended
                ioCount = sharedSocket->DecrementIo();

                const char* functionName = ctsTaskAction::Send == nextIo.m_ioAction ? "WriteFile" : "ReadFile";
                PRINT_DEBUG_INFO(L"\t\tIO Failed: %hs (%u) [ctsReadWriteIocp]\n", functionName, ioError);

                // call back to the socket that it failed to see if wants more IO
                switch (const ctsIoStatus protocolStatus = lockedPattern->CompleteIo(nextIo, 0, ioError))
                {
                    case ctsIoStatus::ContinueIo:
                        // the protocol wants to ignore the error and send more data
                        ioError = NO_ERROR;
                        ioDone = false;
                        break;

                    case ctsIoStatus::CompletedIo:
                        // the protocol wants to ignore the error but is done with IO
                        ioError = NO_ERROR;
                        ioDone = true;
                        break;

                    case ctsIoStatus::FailedIo:
                        // print the error on failure
                        ctsConfig::PrintErrorIfFailed(functionName, ioError);
                    // the protocol acknoledged the failure - socket is done with IO
                        ioError = static_cast<int>(lockedPattern->GetLastPatternError());
                        ioDone = true;
                        break;

                    default:
                        FAIL_FAST_MSG("ctsReadWriteIocp: unknown ctsSocket::IOStatus - %d\n", protocolStatus);
                }
            }
        }
    }
    else
    {
        ioError = WSAECONNABORTED;
    }

    if (0 == ioCount)
    {
        // complete the ctsSocket if we have no IO pended
        sharedSocket->CompleteState(ioError);
    }
}