arrow::Status UcxServer::Init()

in dissociated-ipc/ucx_server.cc [73:125]


arrow::Status UcxServer::Init(const std::string& host, const int32_t port) {
  struct sockaddr_storage listen_addr;
  ARROW_ASSIGN_OR_RAISE(auto addrlen, utils::to_sockaddr(host, port, &listen_addr));

  ARROW_ASSIGN_OR_RAISE(ucp_context_, init_ucx(listen_addr));
  ARROW_ASSIGN_OR_RAISE(worker_conn_, create_listener_worker(ucp_context_));

  {
    ucp_listener_params_t params;
    ucs_status_t status;

    params.field_mask =
        UCP_LISTENER_PARAM_FIELD_SOCK_ADDR | UCP_LISTENER_PARAM_FIELD_CONN_HANDLER;
    params.sockaddr.addr = reinterpret_cast<const sockaddr*>(&listen_addr);
    params.sockaddr.addrlen = addrlen;
    params.conn_handler.cb = HandleIncomingConnection;
    params.conn_handler.arg = this;

    status = ucp_listener_create(worker_conn_->get(), &params, &listener_);
    RETURN_NOT_OK(utils::FromUcsStatus("ucp_listener_create", status));

    // get real address/port
    ucp_listener_attr_t attr;
    attr.field_mask = UCP_LISTENER_ATTR_FIELD_SOCKADDR;
    status = ucp_listener_query(listener_, &attr);
    RETURN_NOT_OK(utils::FromUcsStatus("ucp_listener_query", status));

    std::string raw_uri = "ucx://";
    if (host.find(":") != std::string::npos) {
      raw_uri += '[';
      raw_uri += host;
      raw_uri += ']';
    } else {
      raw_uri += host;
    }

    using arrow::internal::ToChars;

    raw_uri += ":";
    raw_uri +=
        ToChars(ntohs(reinterpret_cast<const sockaddr_in*>(&attr.sockaddr)->sin_port));

    ARROW_ASSIGN_OR_RAISE(location_, arrow::flight::Location::Parse(raw_uri));
  }

  {
    listening_.store(true);
    std::thread listener_thread(&UcxServer::DriveConnections, this);
    listener_thread_.swap(listener_thread);
  }

  return arrow::Status::OK();
}