in src/torch_ucc.cpp [538:567]
void CommPG::enqueue_cuda_collective(
std::unique_ptr<ProcessGroupUCC::WorkData> data,
c10::intrusive_ptr<ProcessGroupUCC::WorkUCC> work,
ucc_coll_args_t& coll,
ucc_team_h team,
ucc_ee_h ee) {
ucc_coll_req_h request;
TORCH_UCC_CHECK(
ucc_collective_init(&coll, &request, team),
"failed to init cuda collective");
ucc_ev_t comp_ev, *post_ev;
comp_ev.ev_type = UCC_EVENT_COMPUTE_COMPLETE;
comp_ev.ev_context = nullptr;
comp_ev.ev_context_size = 0;
comp_ev.req = request;
TORCH_UCC_CHECK(
ucc_collective_triggered_post(ee, &comp_ev),
"failed to post triggered collective");
ucc_status_t st = ucc_ee_get_event(ee, &post_ev);
TORCH_CHECK(st == UCC_OK && post_ev->ev_type == UCC_EVENT_COLLECTIVE_POST);
ucc_ee_ack_event(ee, post_ev);
auto entry =
std::make_shared<ProcessGroupUCC::ProgressEntry>(&ucc_comm, request);
entry->data = std::move(data);
work->entry_ = entry;
std::unique_lock<std::mutex> lock(mutex);
progress_queue.push_back(entry);
lock.unlock();
queue_produce_cv.notify_one();
}