void UcxServer::DriveConnections()

in dissociated-ipc/ucx_server.cc [167:193]


void UcxServer::DriveConnections() {
  while (listening_.load()) {
    // wait for server to recieve connection request from client
    while (ucp_worker_progress(worker_conn_->get())) {
    }
    {
      // check for requests in queue
      std::lock_guard<std::mutex> guard(pending_connections_mutex_);
      while (!pending_connections_.empty()) {
        ucp_conn_request_h request = pending_connections_.front();
        pending_connections_.pop();

        std::thread(&UcxServer::HandleConnection, this, request).detach();
      }
    }

    // check listening_ in case we're shutting down.
    // it's possible that shutdown was called while we were in
    // ucp_worker_progress above, in which case if we don't check
    // listening_ here, we'll enter ucp_worker_wait and get stuck.
    if (!listening_.load()) break;
    auto status = ucp_worker_wait(worker_conn_->get());
    if (status != UCS_OK) {
      ARROW_LOG(WARNING) << utils::FromUcsStatus("ucp_worker_wait", status).ToString();
    }
  }
}