static ctsSendRecvStatus ctsSendRecvProcessTask()

in ctsTraffic/ctsSendRecvIocp.cpp [130:262]


static ctsSendRecvStatus ctsSendRecvProcessTask(SOCKET socket, const std::shared_ptr<ctsSocket>& sharedSocket, const std::shared_ptr<ctsIoPattern>& sharedPattern, const ctsTask& nextIo) noexcept
{
    ctsSendRecvStatus returnStatus;

    // if we no longer have a valid socket return early
    if (INVALID_SOCKET == socket)
    {
        returnStatus.m_ioErrorcode = WSAECONNABORTED;
        returnStatus.m_ioStarted = false;
        returnStatus.m_ioDone = true;
        // even if the socket was closed we still must complete the IO request
        sharedPattern->CompleteIo(nextIo, 0, returnStatus.m_ioErrorcode);
        return returnStatus;
    }

    if (ctsTaskAction::GracefulShutdown == nextIo.m_ioAction)
    {
        if (shutdown(socket, SD_SEND) != 0)
        {
            returnStatus.m_ioErrorcode = WSAGetLastError();
        }
        returnStatus.m_ioDone = sharedPattern->CompleteIo(nextIo, 0, returnStatus.m_ioErrorcode) != ctsIoStatus::ContinueIo;
        returnStatus.m_ioStarted = false;
    }
    else if (ctsTaskAction::HardShutdown == nextIo.m_ioAction)
    {
        // pass through -1 to force an RST with the closesocket
        returnStatus.m_ioErrorcode = sharedSocket->CloseSocket(static_cast<uint32_t>(SOCKET_ERROR));
        returnStatus.m_ioDone = sharedPattern->CompleteIo(nextIo, 0, returnStatus.m_ioErrorcode) != ctsIoStatus::ContinueIo;
        returnStatus.m_ioStarted = false;
    }
    else
    {
        try
        {
            // attempt to allocate an IO thread-pool object
            const std::shared_ptr<ctl::ctThreadIocp>& ioThreadPool(sharedSocket->GetIocpThreadpool());
            OVERLAPPED* const pOverlapped = ioThreadPool->new_request(
                [weak_reference = std::weak_ptr(sharedSocket), nextIo](OVERLAPPED* pCallbackOverlapped) noexcept {
                    ctsSendRecvCompletionCallback(pCallbackOverlapped, weak_reference, nextIo);
                });

            WSABUF wsabuffer{};
            wsabuffer.buf = nextIo.m_buffer + nextIo.m_bufferOffset;
            wsabuffer.len = nextIo.m_bufferLength;

            PCSTR functionName{};
            if (ctsTaskAction::Send == nextIo.m_ioAction)
            {
                functionName = "WSASend";
                if (WSASend(socket, &wsabuffer, 1, nullptr, 0, pOverlapped, nullptr) != 0)
                {
                    returnStatus.m_ioErrorcode = WSAGetLastError();
                }
            }
            else
            {
                functionName = "WSARecv";
                DWORD flags = ctsConfig::g_configSettings->Options & ctsConfig::OptionType::MsgWaitAll ? MSG_WAITALL : 0;
                if (WSARecv(socket, &wsabuffer, 1, nullptr, &flags, pOverlapped, nullptr) != 0)
                {
                    returnStatus.m_ioErrorcode = WSAGetLastError();
                }
            }
            //
            // not calling complete_io if returned IO pended 
            // not calling complete_io if returned success but not handling inline completions
            //
            if (WSA_IO_PENDING == returnStatus.m_ioErrorcode ||
                // ReSharper disable once CppRedundantParentheses
                (NO_ERROR == returnStatus.m_ioErrorcode && !(ctsConfig::g_configSettings->Options & ctsConfig::OptionType::HandleInlineIocp)))
            {
                returnStatus.m_ioErrorcode = NO_ERROR;
                returnStatus.m_ioStarted = true;
                returnStatus.m_ioDone = false;
            }
            else
            {
                // process the completion if the API call failed, or if it succeeded and we're handling the completion inline, 
                returnStatus.m_ioStarted = false;
                // determine # of bytes transferred, if any
                DWORD bytesTransferred = 0;
                if (NO_ERROR == returnStatus.m_ioErrorcode)
                {
                    DWORD flags;
                    if (!WSAGetOverlappedResult(socket, pOverlapped, &bytesTransferred, FALSE, &flags))
                    {
                        FAIL_FAST_MSG(
                            "WSAGetOverlappedResult failed (%d) after the IO request (%hs) succeeded", WSAGetLastError(), functionName);
                    }
                }
                // must cancel the IOCP TP since IO is not pended
                ioThreadPool->cancel_request(pOverlapped);
                // call back to the socket to see if wants more IO
                switch (const ctsIoStatus protocolStatus = sharedPattern->CompleteIo(nextIo, bytesTransferred, returnStatus.m_ioErrorcode))
                {
                    case ctsIoStatus::ContinueIo:
                        // The protocol layer wants to transfer more data
                        // if prior IO failed, the protocol wants to ignore the error
                        returnStatus.m_ioErrorcode = NO_ERROR;
                        returnStatus.m_ioDone = false;
                        break;

                    case ctsIoStatus::CompletedIo:
                        // The protocol layer has successfully complete all IO on this connection
                        // if prior IO failed, the protocol wants to ignore the error
                        returnStatus.m_ioErrorcode = NO_ERROR;
                        returnStatus.m_ioDone = true;
                        break;

                    case ctsIoStatus::FailedIo:
                        // write out the error
                        ctsConfig::PrintErrorIfFailed(functionName, sharedPattern->GetLastPatternError());
                    // the protocol acknoledged the failure - socket is done with IO
                        returnStatus.m_ioErrorcode = sharedPattern->GetLastPatternError();
                        returnStatus.m_ioDone = true;
                        break;

                    default:
                        FAIL_FAST_MSG("ctsSendRecvIocp: unknown ctsSocket::IOStatus - %d\n", protocolStatus);
                }
            }
        }
        catch (...)
        {
            returnStatus.m_ioErrorcode = ctsConfig::PrintThrownException();
            returnStatus.m_ioDone = sharedPattern->CompleteIo(nextIo, 0, returnStatus.m_ioErrorcode) != ctsIoStatus::ContinueIo;
            returnStatus.m_ioStarted = false;
        }
    }

    return returnStatus;
}