static CdiReturnStatus LibFabricEndpointOpen()

in src/cdi/adapter_efa.c [168:340]


static CdiReturnStatus LibFabricEndpointOpen(EfaEndpointState* endpoint_ptr)
{
    CdiReturnStatus rs = kCdiStatusOk;
    EfaAdapterState* efa_adapter_state_ptr =
        (EfaAdapterState*)endpoint_ptr->adapter_endpoint_ptr->adapter_con_state_ptr->adapter_state_ptr->type_specific_ptr;
    bool is_socket_based = efa_adapter_state_ptr->is_socket_based;
    bool is_transmitter = (kEndpointDirectionSend == endpoint_ptr->adapter_endpoint_ptr->adapter_con_state_ptr->direction);

    uint64_t flags = 0;

    // Start with the EFA defaults, then override if socket-based.
    const char* node_str = NULL;
    char* service_str = NULL;
    char port_str[16];
    if (is_socket_based) {
        service_str = port_str;

        const int data_port = 1 + endpoint_ptr->dest_control_port;
        const int port_ret = snprintf(port_str, sizeof port_str, "%"PRIu16, data_port);
        if (port_ret < 0 || (port_ret >= (int)(sizeof port_str))) {
            return kCdiStatusFatal;
        }
    }

    if (is_transmitter) {
        // Transmitter.
        flags = 0;
        if (is_socket_based) {
            node_str = EndpointManagerEndpointRemoteIpGet(endpoint_ptr->adapter_endpoint_ptr->cdi_endpoint_handle);
        } else {
            node_str = NULL;
        }
    } else {
        // Receiver.
        flags = FI_SOURCE;
        node_str = NULL;
    }

    struct fi_info* hints_ptr = CreateHints(is_socket_based);
    if (NULL == hints_ptr) {
        rs = kCdiStatusAllocationFailed;
    }

    if (kCdiStatusOk == rs) {
        int ret = fi_getinfo(FT_FIVERSION, node_str, service_str, flags, hints_ptr, &endpoint_ptr->fabric_info_ptr);
        CHECK_LIBFABRIC_RC(fi_getinfo, ret);
    }

    if (kCdiStatusOk == rs && !is_socket_based) {
        // The SDK does not expect to receive packets in order. For best performance don't require packet ordering.
        endpoint_ptr->fabric_info_ptr->tx_attr->msg_order = FI_ORDER_NONE;
        endpoint_ptr->fabric_info_ptr->rx_attr->msg_order = FI_ORDER_NONE;
        endpoint_ptr->fabric_info_ptr->ep_attr->max_msg_size =
            endpoint_ptr->adapter_endpoint_ptr->adapter_con_state_ptr->adapter_state_ptr->maximum_payload_bytes;
    }

    if (kCdiStatusOk == rs) {
        int ret = fi_fabric(endpoint_ptr->fabric_info_ptr->fabric_attr, &endpoint_ptr->fabric_ptr, NULL);
        CHECK_LIBFABRIC_RC(fi_fabric, ret);
    }

    if (kCdiStatusOk == rs) {
        int ret = fi_domain(endpoint_ptr->fabric_ptr, endpoint_ptr->fabric_info_ptr,
                        &endpoint_ptr->domain_ptr, NULL);
        CHECK_LIBFABRIC_RC(fi_domain, ret);
    }

    if (kCdiStatusOk == rs) {
        struct fi_cq_attr completion_queue_attr = {
            .wait_obj = FI_WAIT_NONE,
            .format = FI_CQ_FORMAT_DATA
        };

        if (is_transmitter) {
            // For transmitter.
            completion_queue_attr.size = endpoint_ptr->fabric_info_ptr->tx_attr->size;
        } else {
            // For receiver.
            completion_queue_attr.size = endpoint_ptr->fabric_info_ptr->rx_attr->size;
        }

        int ret = fi_cq_open(endpoint_ptr->domain_ptr, &completion_queue_attr,
                             &endpoint_ptr->completion_queue_ptr, &endpoint_ptr->completion_queue_ptr);
        CHECK_LIBFABRIC_RC(fi_cq_open, ret);
    }

    if (kCdiStatusOk == rs) {
        // Attributes of the address vector to associate with the endpoint.
        struct fi_av_attr address_vector_attr = {
            .type = FI_AV_TABLE,
            .count = 1
        };

        int ret = fi_av_open(endpoint_ptr->domain_ptr, &address_vector_attr, &endpoint_ptr->address_vector_ptr, NULL);
        CHECK_LIBFABRIC_RC(fi_av_open, ret);
        // We use remote_fi_addr in EfaTxEndpointStop to check if fi_av_insert was called.
        endpoint_ptr->remote_fi_addr = FI_ADDR_UNSPEC;
    }

    if (kCdiStatusOk == rs) {
        int ret = fi_endpoint(endpoint_ptr->domain_ptr, endpoint_ptr->fabric_info_ptr,
                          &endpoint_ptr->endpoint_ptr, NULL);
        CHECK_LIBFABRIC_RC(fi_endpoint, ret);
    }

    // Bind address vector.
    if (kCdiStatusOk == rs) {
        int ret = fi_ep_bind(endpoint_ptr->endpoint_ptr, &endpoint_ptr->address_vector_ptr->fid, 0);
        CHECK_LIBFABRIC_RC(fi_ep_bind, ret);
    }

    if (kCdiStatusOk == rs) {
        flags = is_transmitter ? FI_TRANSMIT : FI_RECV;
        int ret = fi_ep_bind(endpoint_ptr->endpoint_ptr, &endpoint_ptr->completion_queue_ptr->fid, flags);
        CHECK_LIBFABRIC_RC(fi_ep_bind, ret);
    }

    if (kCdiStatusOk == rs) {
        int ret = fi_enable(endpoint_ptr->endpoint_ptr);
        CHECK_LIBFABRIC_RC(fi_enable, ret);
    }

    if (kCdiStatusOk == rs) {
        if (is_transmitter) {
            CdiAdapterState* adapter_state_ptr =
                endpoint_ptr->adapter_endpoint_ptr->adapter_con_state_ptr->adapter_state_ptr;
            if (0 == adapter_state_ptr->adapter_data.tx_buffer_size_bytes) {
                SDK_LOG_GLOBAL(kLogError, "Payload transmit buffer size cannot be zero. Set tx_buffer_size_bytes when"
                               " using CdiCoreNetworkAdapterInitialize().");
                rs = kCdiStatusInvalidParameter;
            } else {
                // Register the Tx buffer with libfabric.
                int ret = fi_mr_reg(endpoint_ptr->domain_ptr, adapter_state_ptr->adapter_data.ret_tx_buffer_ptr,
                                adapter_state_ptr->adapter_data.tx_buffer_size_bytes, FI_SEND, 0, 0, 0,
                                &endpoint_ptr->tx_state.memory_region_ptr, NULL);
                CHECK_LIBFABRIC_RC(fi_mr_reg, ret);
                if (NULL == endpoint_ptr->tx_state.memory_region_ptr) {
                    SDK_LOG_GLOBAL(kLogError, "fi_mr_reg failed to register Tx memory.");
                    rs = kCdiStatusFatal;
                }
            }
        } else {
            // The endpoint must be enabled before creating the packet pool for both socket and EFA based receivers. The
            // receiver-not-ready (RNR) logic in libfabric will prevent the transmitter from sending before the receiver
            // is ready.
            rs = EfaRxPacketPoolCreate(endpoint_ptr);
        }
    }

    if (kCdiStatusOk == rs) {
        // Get local endpoint address. NOTE: This may not return a valid address until after fi_enable() has been used.
        size_t name_length = sizeof(endpoint_ptr->local_ipv6_gid_array);
        int ret = fi_getname(&endpoint_ptr->endpoint_ptr->fid, (void*)&endpoint_ptr->local_ipv6_gid_array,
                         &name_length);
        CHECK_LIBFABRIC_RC(fi_getname, ret);

        if (0 == ret) {
            bool is_transmitter = (kEndpointDirectionSend == endpoint_ptr->adapter_endpoint_ptr->adapter_con_state_ptr->direction);
            char gid_name_str[MAX_IPV6_ADDRESS_STRING_LENGTH];
            DeviceGidToString(endpoint_ptr->local_ipv6_gid_array,
                              sizeof(endpoint_ptr->local_ipv6_gid_array), gid_name_str, sizeof(gid_name_str));
            CDI_LOG_HANDLE(endpoint_ptr->adapter_endpoint_ptr->adapter_con_state_ptr->log_handle, kLogDebug,
                           "Using local EFA device GID[%s] (%s).", gid_name_str, is_transmitter ? "Tx" : "Rx");
        }
    }

    if (hints_ptr) {
        hints_ptr->fabric_attr->prov_name = NULL;
        fi_freeinfo(hints_ptr);
    }

    return rs;
}