in dissociated-ipc/ucx_server.cc [167:193]
void UcxServer::DriveConnections() {
while (listening_.load()) {
// wait for server to recieve connection request from client
while (ucp_worker_progress(worker_conn_->get())) {
}
{
// check for requests in queue
std::lock_guard<std::mutex> guard(pending_connections_mutex_);
while (!pending_connections_.empty()) {
ucp_conn_request_h request = pending_connections_.front();
pending_connections_.pop();
std::thread(&UcxServer::HandleConnection, this, request).detach();
}
}
// check listening_ in case we're shutting down.
// it's possible that shutdown was called while we were in
// ucp_worker_progress above, in which case if we don't check
// listening_ here, we'll enter ucp_worker_wait and get stuck.
if (!listening_.load()) break;
auto status = ucp_worker_wait(worker_conn_->get());
if (status != UCS_OK) {
ARROW_LOG(WARNING) << utils::FromUcsStatus("ucp_worker_wait", status).ToString();
}
}
}