void CommPG::ucx_disconnect_eps()

in src/torch_ucc.cpp [353:388]


void CommPG::ucx_disconnect_eps(
    std::vector<ucp_ep_h>& eps,
    std::shared_ptr<torch_ucc_oob_coll_info_t> oob) {
  ucs_status_t st;

  for (ucp_ep_h& ep : eps) {
    ucs_status_ptr_t close_req = ucp_ep_close_nb(ep, UCP_EP_CLOSE_MODE_FLUSH);
    if (UCS_PTR_IS_ERR(close_req)) {
      TORCH_UCC_LOG_ERROR(
          TORCH_UCC_FINALIZE,
          "failed to close endpoint, ignore and continue...");
      return;
    }
    if (UCS_PTR_IS_PTR(close_req)) {
      do {
        ucp_worker_progress(ucx_comm.worker);
        st = ucp_request_check_status(close_req);
      } while (st != UCS_OK);
      ucp_request_free(close_req);
    }
  }
  if (!eps.size()) {
    return;
  }
  try {
    auto sz = (size_t)oob->store->add(oob->getKey("epclosed"), 1);
    while (sz != eps.size()) {
      ucp_worker_progress(ucx_comm.worker);
      std::this_thread::sleep_for(std::chrono::milliseconds(kBusyWaitMillis));
      sz = (size_t)oob->store->add(oob->getKey("epclosed"), 0);
    }
  } catch (std::exception& ex) {
    LOG(ERROR) << "(disconnect_eps) Caught error in Store Operation .. "
               << "[" << ex.what() << "]";
  }
}