void UcxServer::HandleConnection()

in dissociated-ipc/ucx_server.cc [195:260]


void UcxServer::HandleConnection(ucp_conn_request_h request) {
  using arrow::internal::ToChars;
  std::string peer = "unknown:" + ToChars(counter_++);
  {
    ucp_conn_request_attr_t request_attr;
    std::memset(&request_attr, 0, sizeof(request_attr));
    request_attr.field_mask = UCP_CONN_REQUEST_ATTR_FIELD_CLIENT_ADDR;
    if (ucp_conn_request_query(request, &request_attr) == UCS_OK) {
      ARROW_UNUSED(utils::SockaddrToString(request_attr.client_address).Value(&peer));
    }
  }
  ARROW_LOG(DEBUG) << peer << ": Received connection request";

  auto maybe_worker = CreateWorker();
  if (!maybe_worker.ok()) {
    ARROW_LOG(ERROR) << peer << ": failed to create worker"
                     << maybe_worker.status().ToString();
    auto status = ucp_listener_reject(listener_, request);
    if (status != UCS_OK) {
      ARROW_LOG(ERROR) << peer << ": "
                       << utils::FromUcsStatus("ucp_listener_reject", status).ToString();
    }
    return;
  }

  auto worker = maybe_worker.MoveValueUnsafe();
  worker->conn_ = std::make_unique<utils::Connection>(worker->worker_);
  auto status = worker->conn_->CreateEndpoint(request);
  if (!status.ok()) {
    ARROW_LOG(ERROR) << peer << ": failed to create endpoint and connection: "
                     << status.ToString();
    return;
  }

  if (cuda_context_) {
    auto result = cuCtxPushCurrent(reinterpret_cast<CUcontext>(cuda_context_->handle()));
    if (result != CUDA_SUCCESS) {
      const char* err_name = "\0";
      const char* err_string = "\0";
      cuGetErrorName(result, &err_name);
      cuGetErrorString(result, &err_string);
      ARROW_LOG(ERROR) << peer << ": failed pushing cuda context on thread: " << err_name
                       << " - " << err_string;
      return;
    }
  }

  auto st = do_work(worker.get());
  if (!st.ok()) {
    ARROW_LOG(ERROR) << peer << ": error from do_work: " << st.ToString();
  }

  while (!worker->conn_->is_closed()) {
    worker->conn_->Progress();
  }

  // clean up
  status = worker->conn_->Close();
  if (!status.ok()) {
    ARROW_LOG(ERROR) << peer
                     << ": failed to close worker connection: " << status.ToString();
  }
  worker->worker_.reset();
  worker->conn_.reset();
  ARROW_LOG(DEBUG) << peer << ": disconnected";
}