in src/torch_ucc.cpp [489:514]
c10::intrusive_ptr<ProcessGroup::Work> CommPG::enqueue_p2p(
OpType opType,
ucc_coll_req_h request,
const char* prof_title) {
auto work = c10::make_intrusive<ProcessGroupUCC::WorkUCC>(opType, prof_title);
if (torch_ucc_config.use_future) {
work->future_ = c10::make_intrusive<at::ivalue::Future>(
c10::ListType::create(c10::TensorType::get()));
}
if (request == nullptr) {
// p2p2 request completed immediately don't save it to progress queue
// and mark future completed immediately
if (torch_ucc_config.use_future) {
work->future_->markCompleted(c10::IValue(std::vector<at::Tensor>()));
}
return work;
}
auto entry =
std::make_shared<ProcessGroupUCC::ProgressEntry>(&ucx_comm, request);
work->entry_ = entry;
std::unique_lock<std::mutex> lock(mutex);
progress_queue.push_back(entry);
lock.unlock();
queue_produce_cv.notify_one();
return work;
}