in dissociated-ipc/ucx_client.cc [24:53]
arrow::Status UcxClient::Init(const std::string& host, const int32_t port) {
ucp_config_t* ucp_config;
ucp_params_t ucp_params;
ucs_status_t status;
status = ucp_config_read(nullptr, nullptr, &ucp_config);
ARROW_RETURN_NOT_OK(utils::FromUcsStatus("ucp_config_read", status));
// if location is IPv6 must adjust UCX config
// we assume locations always resolve to IPv6 or IPv4
// but that's not necessarily true
ARROW_ASSIGN_OR_RAISE(addrlen_, utils::to_sockaddr(host, port, &connect_addr_));
if (connect_addr_.ss_family == AF_INET6) {
ARROW_RETURN_NOT_OK(utils::FromUcsStatus(
"ucp_config_modify", ucp_config_modify(ucp_config, "AF_PRIO", "inet6")));
}
std::memset(&ucp_params, 0, sizeof(ucp_params));
ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES;
ucp_params.features = UCP_FEATURE_WAKEUP | UCP_FEATURE_AM | UCP_FEATURE_RMA |
UCP_FEATURE_STREAM | UCP_FEATURE_TAG;
ucp_context_h ucp_context;
status = ucp_init(&ucp_params, ucp_config, &ucp_context);
ucp_config_release(ucp_config);
ARROW_RETURN_NOT_OK(utils::FromUcsStatus("ucp_init", status));
ucp_context_.reset(new utils::UcpContext(ucp_context));
return arrow::Status::OK();
}