in src/torch_ucc.cpp [353:388]
void CommPG::ucx_disconnect_eps(
std::vector<ucp_ep_h>& eps,
std::shared_ptr<torch_ucc_oob_coll_info_t> oob) {
ucs_status_t st;
for (ucp_ep_h& ep : eps) {
ucs_status_ptr_t close_req = ucp_ep_close_nb(ep, UCP_EP_CLOSE_MODE_FLUSH);
if (UCS_PTR_IS_ERR(close_req)) {
TORCH_UCC_LOG_ERROR(
TORCH_UCC_FINALIZE,
"failed to close endpoint, ignore and continue...");
return;
}
if (UCS_PTR_IS_PTR(close_req)) {
do {
ucp_worker_progress(ucx_comm.worker);
st = ucp_request_check_status(close_req);
} while (st != UCS_OK);
ucp_request_free(close_req);
}
}
if (!eps.size()) {
return;
}
try {
auto sz = (size_t)oob->store->add(oob->getKey("epclosed"), 1);
while (sz != eps.size()) {
ucp_worker_progress(ucx_comm.worker);
std::this_thread::sleep_for(std::chrono::milliseconds(kBusyWaitMillis));
sz = (size_t)oob->store->add(oob->getKey("epclosed"), 0);
}
} catch (std::exception& ex) {
LOG(ERROR) << "(disconnect_eps) Caught error in Store Operation .. "
<< "[" << ex.what() << "]";
}
}