ctsIoStatus ctsIoPattern::CompleteIo()

in ctsTraffic/ctsIOPattern.cpp [367:534]


ctsIoStatus ctsIoPattern::CompleteIo(const ctsTask& originalTask, uint32_t currentTransfer, uint32_t statusCode) noexcept // NOLINT(bugprone-exception-escape)
{
    // preserve the initial state for the prior task
    const bool wasIoRequestedFromPattern = m_patternState.IsCurrentStateMoreIo();

    // add the recv buffer back if it was one of our dynamically allocated recv buffers
    // add back the RIO BufferId if it was a RIO request
    if (ctsTask::BufferType::Dynamic == originalTask.m_bufferType)
    {
        if (originalTask.m_ioAction == ctsTaskAction::Recv)
        {
            m_recvBufferFreeList.push_back(originalTask.m_buffer);
        }

        if (WI_IsFlagSet(ctsConfig::g_configSettings->SocketFlags, WSA_FLAG_REGISTERED_IO))
        {
            if (originalTask.m_ioAction == ctsTaskAction::Send)
            {
                m_sendingRioBufferIds.emplace_back(originalTask.m_rioBufferid);
            }
            else
            {
                m_receivingRioBufferIds.emplace_back(originalTask.m_rioBufferid);
            }
        }
    }

    switch (originalTask.m_ioAction)
    {
        case ctsTaskAction::None:
            // ignore completions for tasks on None
            break;

        case ctsTaskAction::FatalAbort:
            PRINT_DEBUG_INFO(L"\t\tctsIOPattern : completing a FatalAbort\n");
            UpdateLastError(c_statusErrorNotAllDataTransferred);
            break;

        case ctsTaskAction::Abort:
            PRINT_DEBUG_INFO(L"\t\tctsIOPattern : completing an Abort\n");
            break;

        case ctsTaskAction::GracefulShutdown:
            // Fall-through to be processed like send or recv IO
            PRINT_DEBUG_INFO(L"\t\tctsIOPattern : completing a GracefulShutdown\n");
            [[fallthrough]];
        case ctsTaskAction::HardShutdown:
            // Fall-through to be processed like send or recv IO
            PRINT_DEBUG_INFO(L"\t\tctsIOPattern : completing a HardShutdown\n");
            [[fallthrough]];
        case ctsTaskAction::Recv:
            // Fall-through to Send - where the IO will be processed
            [[fallthrough]];
        case ctsTaskAction::Send:
        {
            auto verifyIo = true;
            if (ctsTask::BufferType::TcpConnectionId == originalTask.m_bufferType ||
                ctsTask::BufferType::CompletionMessage == originalTask.m_bufferType)
            {
                // not verifying the buffer since it's the connection Id - but must complete the task to update the protocol
                verifyIo = false;

                if (statusCode != NO_ERROR)
                {
                    UpdateLastError(statusCode);
                }
                else
                {
                    // process the TCP protocol state machine in pattern_state after receiving the connection id
                    UpdateLastPatternError(m_patternState.CompletedTask(originalTask, currentTransfer));
                }
            }
            else if (statusCode != NO_ERROR)
            {
                //
                // if the IO task failed, the entire IO pattern is now failed
                // - unless this is an extra recv that was canceled once we completed the transfer
                //
                if (ctsTaskAction::Recv == originalTask.m_ioAction && m_patternState.IsCompleted())
                {
                    PRINT_DEBUG_INFO(L"\t\tctsIOPattern : Recv failed after the pattern completed (error %u)\n", statusCode);
                }
                else
                {
                    // ReSharper disable once CppTooWideScopeInitStatement
                    const auto currentStatus = UpdateLastError(statusCode);
                    if (currentStatus != c_statusIoRunning)
                    {
                        PRINT_DEBUG_INFO(L"\t\tctsIOPattern : Recv failed before the pattern completed (error %u, current status %u)\n", statusCode, currentStatus);
                        verifyIo = false;
                    }
                }
            }

            if (verifyIo)
            {
                //
                // IO succeeded - update state machine with the completed task if this task had IO
                //
                const auto patternStatus = m_patternState.CompletedTask(originalTask, currentTransfer);
                // update the last_error if the pattern_state detected an error
                UpdateLastPatternError(patternStatus);
                //
                // if this is a TCP receive completion
                // and no IO or protocol errors
                // and the user requested to verify buffers
                // then actually validate the received completion
                //
                if (ctsConfig::g_configSettings->Protocol == ctsConfig::ProtocolType::TCP &&
                    ctsConfig::g_configSettings->ShouldVerifyBuffers &&
                    originalTask.m_ioAction == ctsTaskAction::Recv &&
                    originalTask.m_trackIo &&
                    (ctsIoPatternError::SuccessfullyCompleted == patternStatus || ctsIoPatternError::NoError == patternStatus))
                {
                    FAIL_FAST_IF_MSG(
                        originalTask.m_expectedPatternOffset != m_recvPatternOffset,
                        "ctsIOPattern::complete_io() : ctsIOTask (%p) expected_pattern_offset (%lu) does not match the current pattern_offset (%lu)",
                        &originalTask, originalTask.m_expectedPatternOffset, m_recvPatternOffset);

                    if (!VerifyBuffer(originalTask, currentTransfer))
                    {
                        UpdateLastError(c_statusErrorDataDidNotMatchBitPattern);
                    }

                    m_recvPatternOffset += currentTransfer;
                    m_recvPatternOffset %= c_bufferPatternSize;
                }
            }
            break;
        }
    }
    //
    // Notify the derived interface that the task completed
    // - if this wasn't our internal connection id request
    // - if there wasn't an error with the IO and an IO operation completed
    // If the derived interface returns an error,
    // - update the last_error status
    //
    if (originalTask.m_ioAction != ctsTaskAction::None &&
        NO_ERROR == statusCode)
    {
        if (ctsTaskAction::Send == originalTask.m_ioAction)
        {
            ctsConfig::g_configSettings->TcpStatusDetails.m_bytesSent.Add(currentTransfer);
        }
        else
        {
            ctsConfig::g_configSettings->TcpStatusDetails.m_bytesRecv.Add(currentTransfer);
        }
        // only complete tasks that were requested
        if (wasIoRequestedFromPattern)
        {
            UpdateLastPatternError(CompleteTaskBackToPattern(originalTask, currentTransfer));
        }
    }
    //
    // If the state machine has verified the connection has completed, 
    // - set the last error to zero in case it was not already set to an error
    //   but do this *after* the other possible failure points were checked
    //
    if (m_patternState.IsCompleted())
    {
        UpdateLastError(NO_ERROR);
        EndStatistics();
    }

    return GetCurrentStatus();
}