uint64_t ProbeRxControlProcessProbeState()

in src/cdi/adapter_efa_probe_rx.c [316:423]


uint64_t ProbeRxControlProcessProbeState(ProbeEndpointState* probe_ptr)
{
    uint64_t wait_timeout_ms = DEFAULT_TIMEOUT_MSEC;
    AdapterEndpointHandle adapter_endpoint_handle = probe_ptr->app_adapter_endpoint_handle;
    CdiEndpointHandle cdi_endpoint_handle = adapter_endpoint_handle->cdi_endpoint_handle;
    EfaConnectionState* efa_con_ptr = (EfaConnectionState*)adapter_endpoint_handle->adapter_con_state_ptr->type_specific_ptr;

    CDI_LOG_THREAD_COMPONENT(kLogDebug, kLogComponentProbe, "Probe Rx remote IP[%s:%d] state[%s].",
                             EndpointManagerEndpointRemoteIpGet(cdi_endpoint_handle),
                             EndpointManagerEndpointRemotePortGet(cdi_endpoint_handle),
                             InternalUtilityKeyEnumToString(kKeyProbeState, probe_ptr->rx_probe_state.rx_state));

    switch (probe_ptr->rx_probe_state.rx_state) {
        case kProbeStateEfaStart:
        case kProbeStateWaitForStart:
            // Not used, so nothing to do.
            break;
        case kProbeStateResetting:
            // Got timeout before these commands completed. Go to connection reset state.
            ProbeControlSendCommand(probe_ptr, kProbeCommandReset, true);
            wait_timeout_ms = SEND_RESET_COMMAND_FREQUENCY_MSEC;
            break;
        case kProbeStateEfaReset:
            // Either a reset request came from the Tx, ProbeEndpointError() was used, EFA probe timed-out, or a ping
            // was not received within the expected timeout period. Notify the application that we are disconnected and
            // send a request to reset the connection to the Endpoint Manager.
            ProbeControlEfaConnectionQueueReset(probe_ptr, NULL);
            probe_ptr->rx_probe_state.rx_state = kProbeStateResetting; // Advance to resetting state.
            wait_timeout_ms = ENDPOINT_MANAGER_COMPLETION_TIMEOUT_MSEC;
            break;
        case kProbeStateIdle:
        case kProbeStateSendReset:
            // Notify application that we are disconnected.
            EndpointManagerConnectionStateChange(cdi_endpoint_handle, kCdiConnectionStatusDisconnected, NULL);
            if (++probe_ptr->rx_probe_state.send_reset_retry_count < RX_RESET_COMMAND_MAX_RETRIES) {
                    CDI_LOG_THREAD_COMPONENT(kLogDebug, kLogComponentProbe,
                        "Probe Rx remote IP[%s:%d] sending reset #[%d].",
                        EndpointManagerEndpointRemoteIpGet(cdi_endpoint_handle),
                        EndpointManagerEndpointRemotePortGet(cdi_endpoint_handle),
                        probe_ptr->rx_probe_state.send_reset_retry_count);
                // If we have received a reset command from the remote Tx (client) connection, which contains the
                // remote IP and destination port, we can send reset commands to it.
                if (efa_con_ptr->control_interface_handle) {
                    // Send command to reset the remote Tx (client) connection. Will not expect an ACK back.
                    ProbeControlSendCommand(probe_ptr, kProbeCommandReset, false);
                }
                probe_ptr->rx_probe_state.rx_state = kProbeStateSendReset; // Ensure in send reset state.
                wait_timeout_ms = SEND_RESET_COMMAND_FREQUENCY_MSEC;
            } else {
                DestroyRxEndpoint(probe_ptr);
                wait_timeout_ms = 0; // Do immediately.
            }
            break;
        case kProbeStateResetDone:
            // If the reset was triggered by the remote connection, respond with an ACK command.
            if (probe_ptr->send_ack_command_valid) {
                ProbeControlSendAck(probe_ptr, probe_ptr->send_ack_command, probe_ptr->send_ack_control_packet_num);
                probe_ptr->send_ack_command_valid = false;
                // For Rx, the EFA endpoint has been started in ProbeEndpointResetDone(), so we can advance to the
                // kProbeStateEfaProbe state.
                probe_ptr->rx_probe_state.rx_state = kProbeStateEfaProbe; // Advance to EFA probe state.
                // If the EFA probe does not complete by this timeout, we return back to connection reset state.
                wait_timeout_ms = EFA_PROBE_MONITOR_TIMEOUT_MSEC;
            } else {
                // Reset was not triggered by the remote connection, so just setup to send another reset command to it.
                // No need to stop/start local libfabric here.
                probe_ptr->rx_probe_state.rx_state = kProbeStateSendReset;
                wait_timeout_ms = 0; // Do immediately.
            }
            break;
        case kProbeStateEfaProbe:
            // Did not complete EFA probe state within timeout. Reset the connection.
            CDI_LOG_THREAD_COMPONENT(kLogDebug, kLogComponentProbe,
                "Probe Rx EFA probe timeout. Sending reset to Tx.");
            probe_ptr->rx_probe_state.rx_state = kProbeCommandReset; // Advance to resetting state.
            wait_timeout_ms = 0; // Do immediately.
            break;
        case kProbeStateEfaConnected:
            // Notify application that we are connected.
            EndpointManagerConnectionStateChange(cdi_endpoint_handle, kCdiConnectionStatusConnected, NULL);
            // Send command to notify the remote Tx (client) that we are connected and it is ok for the remote to switch
            // to the connected state. This is done to prevent problems caused by EFA packet reordering. Without this
            // communication, the transmitter could start sending a payload and packets for it might arrive before the
            // last probe packet arrives. NOTE: We will not expect an ACK back.
            ProbeControlSendCommand(probe_ptr, kProbeCommandConnected, false);
            probe_ptr->rx_probe_state.send_reset_retry_count = 0; // Reset retry counter.
#ifdef DISABLE_PROBE_MONITORING
            wait_timeout_ms = CDI_INFINITE;
#else
            // Just connected, so advance to ping state and timeout if we miss receiving a ping.
            probe_ptr->rx_probe_state.rx_state = kProbeStateEfaConnectedPing;
            wait_timeout_ms = TX_COMMAND_ACK_TIMEOUT_MSEC;
#endif
            break;
        case kProbeStateEfaConnectedPing:
            // Did not get a ping within the timeout period. Reset the connection.
            DestroyRxEndpoint(probe_ptr);
            wait_timeout_ms = 0; // Do immediately.
            break;
        case kProbeStateDestroy:
        case kProbeStateSendProtocolVersion:
        case kProbeStateEfaTxProbeAcks:
            // Nothing special needed here.
            break;
    }

    return wait_timeout_ms;
}