in src/cdi/adapter_efa.c [168:340]
static CdiReturnStatus LibFabricEndpointOpen(EfaEndpointState* endpoint_ptr)
{
CdiReturnStatus rs = kCdiStatusOk;
EfaAdapterState* efa_adapter_state_ptr =
(EfaAdapterState*)endpoint_ptr->adapter_endpoint_ptr->adapter_con_state_ptr->adapter_state_ptr->type_specific_ptr;
bool is_socket_based = efa_adapter_state_ptr->is_socket_based;
bool is_transmitter = (kEndpointDirectionSend == endpoint_ptr->adapter_endpoint_ptr->adapter_con_state_ptr->direction);
uint64_t flags = 0;
// Start with the EFA defaults, then override if socket-based.
const char* node_str = NULL;
char* service_str = NULL;
char port_str[16];
if (is_socket_based) {
service_str = port_str;
const int data_port = 1 + endpoint_ptr->dest_control_port;
const int port_ret = snprintf(port_str, sizeof port_str, "%"PRIu16, data_port);
if (port_ret < 0 || (port_ret >= (int)(sizeof port_str))) {
return kCdiStatusFatal;
}
}
if (is_transmitter) {
// Transmitter.
flags = 0;
if (is_socket_based) {
node_str = EndpointManagerEndpointRemoteIpGet(endpoint_ptr->adapter_endpoint_ptr->cdi_endpoint_handle);
} else {
node_str = NULL;
}
} else {
// Receiver.
flags = FI_SOURCE;
node_str = NULL;
}
struct fi_info* hints_ptr = CreateHints(is_socket_based);
if (NULL == hints_ptr) {
rs = kCdiStatusAllocationFailed;
}
if (kCdiStatusOk == rs) {
int ret = fi_getinfo(FT_FIVERSION, node_str, service_str, flags, hints_ptr, &endpoint_ptr->fabric_info_ptr);
CHECK_LIBFABRIC_RC(fi_getinfo, ret);
}
if (kCdiStatusOk == rs && !is_socket_based) {
// The SDK does not expect to receive packets in order. For best performance don't require packet ordering.
endpoint_ptr->fabric_info_ptr->tx_attr->msg_order = FI_ORDER_NONE;
endpoint_ptr->fabric_info_ptr->rx_attr->msg_order = FI_ORDER_NONE;
endpoint_ptr->fabric_info_ptr->ep_attr->max_msg_size =
endpoint_ptr->adapter_endpoint_ptr->adapter_con_state_ptr->adapter_state_ptr->maximum_payload_bytes;
}
if (kCdiStatusOk == rs) {
int ret = fi_fabric(endpoint_ptr->fabric_info_ptr->fabric_attr, &endpoint_ptr->fabric_ptr, NULL);
CHECK_LIBFABRIC_RC(fi_fabric, ret);
}
if (kCdiStatusOk == rs) {
int ret = fi_domain(endpoint_ptr->fabric_ptr, endpoint_ptr->fabric_info_ptr,
&endpoint_ptr->domain_ptr, NULL);
CHECK_LIBFABRIC_RC(fi_domain, ret);
}
if (kCdiStatusOk == rs) {
struct fi_cq_attr completion_queue_attr = {
.wait_obj = FI_WAIT_NONE,
.format = FI_CQ_FORMAT_DATA
};
if (is_transmitter) {
// For transmitter.
completion_queue_attr.size = endpoint_ptr->fabric_info_ptr->tx_attr->size;
} else {
// For receiver.
completion_queue_attr.size = endpoint_ptr->fabric_info_ptr->rx_attr->size;
}
int ret = fi_cq_open(endpoint_ptr->domain_ptr, &completion_queue_attr,
&endpoint_ptr->completion_queue_ptr, &endpoint_ptr->completion_queue_ptr);
CHECK_LIBFABRIC_RC(fi_cq_open, ret);
}
if (kCdiStatusOk == rs) {
// Attributes of the address vector to associate with the endpoint.
struct fi_av_attr address_vector_attr = {
.type = FI_AV_TABLE,
.count = 1
};
int ret = fi_av_open(endpoint_ptr->domain_ptr, &address_vector_attr, &endpoint_ptr->address_vector_ptr, NULL);
CHECK_LIBFABRIC_RC(fi_av_open, ret);
// We use remote_fi_addr in EfaTxEndpointStop to check if fi_av_insert was called.
endpoint_ptr->remote_fi_addr = FI_ADDR_UNSPEC;
}
if (kCdiStatusOk == rs) {
int ret = fi_endpoint(endpoint_ptr->domain_ptr, endpoint_ptr->fabric_info_ptr,
&endpoint_ptr->endpoint_ptr, NULL);
CHECK_LIBFABRIC_RC(fi_endpoint, ret);
}
// Bind address vector.
if (kCdiStatusOk == rs) {
int ret = fi_ep_bind(endpoint_ptr->endpoint_ptr, &endpoint_ptr->address_vector_ptr->fid, 0);
CHECK_LIBFABRIC_RC(fi_ep_bind, ret);
}
if (kCdiStatusOk == rs) {
flags = is_transmitter ? FI_TRANSMIT : FI_RECV;
int ret = fi_ep_bind(endpoint_ptr->endpoint_ptr, &endpoint_ptr->completion_queue_ptr->fid, flags);
CHECK_LIBFABRIC_RC(fi_ep_bind, ret);
}
if (kCdiStatusOk == rs) {
int ret = fi_enable(endpoint_ptr->endpoint_ptr);
CHECK_LIBFABRIC_RC(fi_enable, ret);
}
if (kCdiStatusOk == rs) {
if (is_transmitter) {
CdiAdapterState* adapter_state_ptr =
endpoint_ptr->adapter_endpoint_ptr->adapter_con_state_ptr->adapter_state_ptr;
if (0 == adapter_state_ptr->adapter_data.tx_buffer_size_bytes) {
SDK_LOG_GLOBAL(kLogError, "Payload transmit buffer size cannot be zero. Set tx_buffer_size_bytes when"
" using CdiCoreNetworkAdapterInitialize().");
rs = kCdiStatusInvalidParameter;
} else {
// Register the Tx buffer with libfabric.
int ret = fi_mr_reg(endpoint_ptr->domain_ptr, adapter_state_ptr->adapter_data.ret_tx_buffer_ptr,
adapter_state_ptr->adapter_data.tx_buffer_size_bytes, FI_SEND, 0, 0, 0,
&endpoint_ptr->tx_state.memory_region_ptr, NULL);
CHECK_LIBFABRIC_RC(fi_mr_reg, ret);
if (NULL == endpoint_ptr->tx_state.memory_region_ptr) {
SDK_LOG_GLOBAL(kLogError, "fi_mr_reg failed to register Tx memory.");
rs = kCdiStatusFatal;
}
}
} else {
// The endpoint must be enabled before creating the packet pool for both socket and EFA based receivers. The
// receiver-not-ready (RNR) logic in libfabric will prevent the transmitter from sending before the receiver
// is ready.
rs = EfaRxPacketPoolCreate(endpoint_ptr);
}
}
if (kCdiStatusOk == rs) {
// Get local endpoint address. NOTE: This may not return a valid address until after fi_enable() has been used.
size_t name_length = sizeof(endpoint_ptr->local_ipv6_gid_array);
int ret = fi_getname(&endpoint_ptr->endpoint_ptr->fid, (void*)&endpoint_ptr->local_ipv6_gid_array,
&name_length);
CHECK_LIBFABRIC_RC(fi_getname, ret);
if (0 == ret) {
bool is_transmitter = (kEndpointDirectionSend == endpoint_ptr->adapter_endpoint_ptr->adapter_con_state_ptr->direction);
char gid_name_str[MAX_IPV6_ADDRESS_STRING_LENGTH];
DeviceGidToString(endpoint_ptr->local_ipv6_gid_array,
sizeof(endpoint_ptr->local_ipv6_gid_array), gid_name_str, sizeof(gid_name_str));
CDI_LOG_HANDLE(endpoint_ptr->adapter_endpoint_ptr->adapter_con_state_ptr->log_handle, kLogDebug,
"Using local EFA device GID[%s] (%s).", gid_name_str, is_transmitter ? "Tx" : "Rx");
}
}
if (hints_ptr) {
hints_ptr->fabric_attr->prov_name = NULL;
fi_freeinfo(hints_ptr);
}
return rs;
}