in src/cdi/adapter_efa_probe_rx.c [316:423]
uint64_t ProbeRxControlProcessProbeState(ProbeEndpointState* probe_ptr)
{
uint64_t wait_timeout_ms = DEFAULT_TIMEOUT_MSEC;
AdapterEndpointHandle adapter_endpoint_handle = probe_ptr->app_adapter_endpoint_handle;
CdiEndpointHandle cdi_endpoint_handle = adapter_endpoint_handle->cdi_endpoint_handle;
EfaConnectionState* efa_con_ptr = (EfaConnectionState*)adapter_endpoint_handle->adapter_con_state_ptr->type_specific_ptr;
CDI_LOG_THREAD_COMPONENT(kLogDebug, kLogComponentProbe, "Probe Rx remote IP[%s:%d] state[%s].",
EndpointManagerEndpointRemoteIpGet(cdi_endpoint_handle),
EndpointManagerEndpointRemotePortGet(cdi_endpoint_handle),
InternalUtilityKeyEnumToString(kKeyProbeState, probe_ptr->rx_probe_state.rx_state));
switch (probe_ptr->rx_probe_state.rx_state) {
case kProbeStateEfaStart:
case kProbeStateWaitForStart:
// Not used, so nothing to do.
break;
case kProbeStateResetting:
// Got timeout before these commands completed. Go to connection reset state.
ProbeControlSendCommand(probe_ptr, kProbeCommandReset, true);
wait_timeout_ms = SEND_RESET_COMMAND_FREQUENCY_MSEC;
break;
case kProbeStateEfaReset:
// Either a reset request came from the Tx, ProbeEndpointError() was used, EFA probe timed-out, or a ping
// was not received within the expected timeout period. Notify the application that we are disconnected and
// send a request to reset the connection to the Endpoint Manager.
ProbeControlEfaConnectionQueueReset(probe_ptr, NULL);
probe_ptr->rx_probe_state.rx_state = kProbeStateResetting; // Advance to resetting state.
wait_timeout_ms = ENDPOINT_MANAGER_COMPLETION_TIMEOUT_MSEC;
break;
case kProbeStateIdle:
case kProbeStateSendReset:
// Notify application that we are disconnected.
EndpointManagerConnectionStateChange(cdi_endpoint_handle, kCdiConnectionStatusDisconnected, NULL);
if (++probe_ptr->rx_probe_state.send_reset_retry_count < RX_RESET_COMMAND_MAX_RETRIES) {
CDI_LOG_THREAD_COMPONENT(kLogDebug, kLogComponentProbe,
"Probe Rx remote IP[%s:%d] sending reset #[%d].",
EndpointManagerEndpointRemoteIpGet(cdi_endpoint_handle),
EndpointManagerEndpointRemotePortGet(cdi_endpoint_handle),
probe_ptr->rx_probe_state.send_reset_retry_count);
// If we have received a reset command from the remote Tx (client) connection, which contains the
// remote IP and destination port, we can send reset commands to it.
if (efa_con_ptr->control_interface_handle) {
// Send command to reset the remote Tx (client) connection. Will not expect an ACK back.
ProbeControlSendCommand(probe_ptr, kProbeCommandReset, false);
}
probe_ptr->rx_probe_state.rx_state = kProbeStateSendReset; // Ensure in send reset state.
wait_timeout_ms = SEND_RESET_COMMAND_FREQUENCY_MSEC;
} else {
DestroyRxEndpoint(probe_ptr);
wait_timeout_ms = 0; // Do immediately.
}
break;
case kProbeStateResetDone:
// If the reset was triggered by the remote connection, respond with an ACK command.
if (probe_ptr->send_ack_command_valid) {
ProbeControlSendAck(probe_ptr, probe_ptr->send_ack_command, probe_ptr->send_ack_control_packet_num);
probe_ptr->send_ack_command_valid = false;
// For Rx, the EFA endpoint has been started in ProbeEndpointResetDone(), so we can advance to the
// kProbeStateEfaProbe state.
probe_ptr->rx_probe_state.rx_state = kProbeStateEfaProbe; // Advance to EFA probe state.
// If the EFA probe does not complete by this timeout, we return back to connection reset state.
wait_timeout_ms = EFA_PROBE_MONITOR_TIMEOUT_MSEC;
} else {
// Reset was not triggered by the remote connection, so just setup to send another reset command to it.
// No need to stop/start local libfabric here.
probe_ptr->rx_probe_state.rx_state = kProbeStateSendReset;
wait_timeout_ms = 0; // Do immediately.
}
break;
case kProbeStateEfaProbe:
// Did not complete EFA probe state within timeout. Reset the connection.
CDI_LOG_THREAD_COMPONENT(kLogDebug, kLogComponentProbe,
"Probe Rx EFA probe timeout. Sending reset to Tx.");
probe_ptr->rx_probe_state.rx_state = kProbeCommandReset; // Advance to resetting state.
wait_timeout_ms = 0; // Do immediately.
break;
case kProbeStateEfaConnected:
// Notify application that we are connected.
EndpointManagerConnectionStateChange(cdi_endpoint_handle, kCdiConnectionStatusConnected, NULL);
// Send command to notify the remote Tx (client) that we are connected and it is ok for the remote to switch
// to the connected state. This is done to prevent problems caused by EFA packet reordering. Without this
// communication, the transmitter could start sending a payload and packets for it might arrive before the
// last probe packet arrives. NOTE: We will not expect an ACK back.
ProbeControlSendCommand(probe_ptr, kProbeCommandConnected, false);
probe_ptr->rx_probe_state.send_reset_retry_count = 0; // Reset retry counter.
#ifdef DISABLE_PROBE_MONITORING
wait_timeout_ms = CDI_INFINITE;
#else
// Just connected, so advance to ping state and timeout if we miss receiving a ping.
probe_ptr->rx_probe_state.rx_state = kProbeStateEfaConnectedPing;
wait_timeout_ms = TX_COMMAND_ACK_TIMEOUT_MSEC;
#endif
break;
case kProbeStateEfaConnectedPing:
// Did not get a ping within the timeout period. Reset the connection.
DestroyRxEndpoint(probe_ptr);
wait_timeout_ms = 0; // Do immediately.
break;
case kProbeStateDestroy:
case kProbeStateSendProtocolVersion:
case kProbeStateEfaTxProbeAcks:
// Nothing special needed here.
break;
}
return wait_timeout_ms;
}