arrow::Status UcxClient::Init()

in dissociated-ipc/ucx_client.cc [24:53]


arrow::Status UcxClient::Init(const std::string& host, const int32_t port) {
  ucp_config_t* ucp_config;
  ucp_params_t ucp_params;
  ucs_status_t status;

  status = ucp_config_read(nullptr, nullptr, &ucp_config);
  ARROW_RETURN_NOT_OK(utils::FromUcsStatus("ucp_config_read", status));

  // if location is IPv6 must adjust UCX config
  // we assume locations always resolve to IPv6 or IPv4
  // but that's not necessarily true
  ARROW_ASSIGN_OR_RAISE(addrlen_, utils::to_sockaddr(host, port, &connect_addr_));
  if (connect_addr_.ss_family == AF_INET6) {
    ARROW_RETURN_NOT_OK(utils::FromUcsStatus(
        "ucp_config_modify", ucp_config_modify(ucp_config, "AF_PRIO", "inet6")));
  }

  std::memset(&ucp_params, 0, sizeof(ucp_params));
  ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES;
  ucp_params.features = UCP_FEATURE_WAKEUP | UCP_FEATURE_AM | UCP_FEATURE_RMA |
                        UCP_FEATURE_STREAM | UCP_FEATURE_TAG;

  ucp_context_h ucp_context;
  status = ucp_init(&ucp_params, ucp_config, &ucp_context);
  ucp_config_release(ucp_config);

  ARROW_RETURN_NOT_OK(utils::FromUcsStatus("ucp_init", status));
  ucp_context_.reset(new utils::UcpContext(ucp_context));
  return arrow::Status::OK();
}