in dissociated-ipc/ucx_server.cc [73:125]
arrow::Status UcxServer::Init(const std::string& host, const int32_t port) {
struct sockaddr_storage listen_addr;
ARROW_ASSIGN_OR_RAISE(auto addrlen, utils::to_sockaddr(host, port, &listen_addr));
ARROW_ASSIGN_OR_RAISE(ucp_context_, init_ucx(listen_addr));
ARROW_ASSIGN_OR_RAISE(worker_conn_, create_listener_worker(ucp_context_));
{
ucp_listener_params_t params;
ucs_status_t status;
params.field_mask =
UCP_LISTENER_PARAM_FIELD_SOCK_ADDR | UCP_LISTENER_PARAM_FIELD_CONN_HANDLER;
params.sockaddr.addr = reinterpret_cast<const sockaddr*>(&listen_addr);
params.sockaddr.addrlen = addrlen;
params.conn_handler.cb = HandleIncomingConnection;
params.conn_handler.arg = this;
status = ucp_listener_create(worker_conn_->get(), ¶ms, &listener_);
RETURN_NOT_OK(utils::FromUcsStatus("ucp_listener_create", status));
// get real address/port
ucp_listener_attr_t attr;
attr.field_mask = UCP_LISTENER_ATTR_FIELD_SOCKADDR;
status = ucp_listener_query(listener_, &attr);
RETURN_NOT_OK(utils::FromUcsStatus("ucp_listener_query", status));
std::string raw_uri = "ucx://";
if (host.find(":") != std::string::npos) {
raw_uri += '[';
raw_uri += host;
raw_uri += ']';
} else {
raw_uri += host;
}
using arrow::internal::ToChars;
raw_uri += ":";
raw_uri +=
ToChars(ntohs(reinterpret_cast<const sockaddr_in*>(&attr.sockaddr)->sin_port));
ARROW_ASSIGN_OR_RAISE(location_, arrow::flight::Location::Parse(raw_uri));
}
{
listening_.store(true);
std::thread listener_thread(&UcxServer::DriveConnections, this);
listener_thread_.swap(listener_thread);
}
return arrow::Status::OK();
}