std::shared_ptr ContextFactory::makeContext()

in gloo/rendezvous/context.cc [181:243]


std::shared_ptr<::gloo::Context> ContextFactory::makeContext(
    std::shared_ptr<transport::Device>& dev) {
  auto context = std::make_shared<Context>(
      backingContext_->rank,
      backingContext_->size);
  context->setTimeout(backingContext_->getTimeout());

  // Assume it's the same for all pairs on a device
  size_t addressSize = 0;

  // Create pairs
  auto transportContext = dev->createContext(context->rank, context->size);
  transportContext->setTimeout(context->getTimeout());
  for (auto i = 0; i < context->size; i++) {
    if (i == context->rank) {
      continue;
    }

    auto& pair = transportContext->createPair(i);
    auto address = pair->address().bytes();
    addressSize = address.size();

    // Send address of new pair to peer
    GLOO_ENFORCE_LE(addressSize, sendData_[i].size());
    sendData_[i].assign(address.begin(), address.end());
    sendBuffers_[i]->send(0, addressSize);
  }

  // Wait for remote addresses and connect peers
  for (auto i = 0; i < context->size; i++) {
    if (i == context->rank) {
      continue;
    }

    recvBuffers_[i]->waitRecv();
    auto& data = recvData_[i];
    auto address = std::vector<char>(data.begin(), data.begin() + addressSize);
    transportContext->getPair(i)->connect(address);

    // Notify peer that we've consumed the payload
    sendNotificationBuffers_[i]->send();
  }

  // Wait for incoming notification from peers
  for (auto i = 0; i < context->size; i++) {
    if (i == context->rank) {
      continue;
    }
    recvNotificationBuffers_[i]->waitRecv();
  }

  // Wait for outgoing notifications to be flushed
  for (auto i = 0; i < context->size; i++) {
    if (i == context->rank) {
      continue;
    }
    sendNotificationBuffers_[i]->waitSend();
  }

  context->device_ = dev;
  context->transportContext_ = std::move(transportContext);
  return std::static_pointer_cast<::gloo::Context>(context);
}