c10::intrusive_ptr CommPG::enqueue_p2p()

in src/torch_ucc.cpp [489:514]


c10::intrusive_ptr<ProcessGroup::Work> CommPG::enqueue_p2p(
    OpType opType,
    ucc_coll_req_h request,
    const char* prof_title) {
  auto work = c10::make_intrusive<ProcessGroupUCC::WorkUCC>(opType, prof_title);
  if (torch_ucc_config.use_future) {
    work->future_ = c10::make_intrusive<at::ivalue::Future>(
        c10::ListType::create(c10::TensorType::get()));
  }
  if (request == nullptr) {
    // p2p2 request completed immediately don't save it to progress queue
    // and mark future completed immediately
    if (torch_ucc_config.use_future) {
      work->future_->markCompleted(c10::IValue(std::vector<at::Tensor>()));
    }
    return work;
  }
  auto entry =
      std::make_shared<ProcessGroupUCC::ProgressEntry>(&ucx_comm, request);
  work->entry_ = entry;
  std::unique_lock<std::mutex> lock(mutex);
  progress_queue.push_back(entry);
  lock.unlock();
  queue_produce_cv.notify_one();
  return work;
}